You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/01/16 00:40:32 UTC

tez git commit: TEZ-1951. Fix general findbugs warnings in tez-dag. (hitesh)

Repository: tez
Updated Branches:
  refs/heads/master b723a05da -> 2762d9b5c


TEZ-1951. Fix general findbugs warnings in tez-dag. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2762d9b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2762d9b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2762d9b5

Branch: refs/heads/master
Commit: 2762d9b5c7c4c2ea5029948eb69525b90da1e33c
Parents: b723a05
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jan 15 15:40:06 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jan 15 15:40:06 2015 -0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 tez-dag/findbugs-exclude.xml                    | 93 ++++++++++++++++++++
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  6 +-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  9 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  9 +-
 .../app/dag/RootInputInitializerManager.java    |  2 +-
 .../tez/dag/app/dag/StateChangeNotifier.java    |  2 +-
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  6 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |  6 +-
 .../dag/impl/ImmediateStartVertexManager.java   |  2 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  7 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 12 +--
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 ++++++---------
 .../tez/dag/app/dag/impl/VertexManager.java     |  4 +-
 .../dag/app/rm/LocalTaskSchedulerService.java   | 63 +++++++++++++
 .../rm/NMCommunicatorLaunchRequestEvent.java    | 31 +++++++
 .../dag/app/rm/YarnTaskSchedulerService.java    |  8 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  2 +-
 .../tez/dag/app/rm/node/AMNodeTracker.java      | 11 +--
 .../security/authorize/TezAMPolicyProvider.java |  2 +-
 .../events/VertexCommitStartedEvent.java        |  6 +-
 .../java/org/apache/tez/dag/utils/Graph.java    |  6 +-
 .../dag/utils/TaskSpecificLaunchCmdOption.java  |  1 -
 23 files changed, 268 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd183a0..dfa12cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1951. Fix general findbugs warnings in tez-dag.
   TEZ-1905. Fix findbugs warnings in tez-tests.
   TEZ-1945. Remove 2 GB memlimit restriction in MergeManager.
   TEZ-1913. Reduce deserialize cost in ValuesIterator.

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index d3d365d..77a18ec 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -22,4 +22,97 @@
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
 
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto"/>
+    <Field name="unknownFields"/>
+    <Bug pattern="SE_BAD_FIELD"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto"/>
+    <Field name="PARSER"/>
+    <Bug pattern="MS_SHOULD_BE_FINAL"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto\$Builder"/>
+    <Method name="maybeForceBuilderInitialization"/>
+    <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.DAGImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.TaskImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.TaskAttemptImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.VertexImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.rm\.container\.AMContainerImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.rm\.node\.AMNodeImpl\$.*"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.dag\.speculation\.legacy\.LegacySpeculator"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.rm\.node\.AMNodeTracker"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerEventHandler"/>
+    <Bug pattern="BC_UNCONFIRMED_CAST"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.rm.TaskSchedulerAppCallbackWrapper"/>
+    <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.DAGAppMaster$DAGAppMasterShutdownHook"/>
+    <Method name="run"/>
+    <Bug pattern="WA_NOT_IN_LOOP"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.DAGAppMaster"/>
+    <Method name="handle" params="org.apache.tez.dag.app.dag.event.DAGAppMasterEvent" returns="void"/>
+    <Bug pattern="SF_SWITCH_NO_DEFAULT"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.app.DAGAppMaster"/>
+    <Method name="&lt;init&gt;"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
+
+  <Match>
+    <Class name="org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption"/>
+    <Method name="getTaskSpecificLogParams"/>
+    <Field name="tsLogParams"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 04e3b1e..6dddf6a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -856,7 +856,7 @@ public class DAGAppMaster extends AbstractService {
       LOG.info("Writing DAG plan to: " + logFile);
       File outFile = new File(logFile);
       try {
-        PrintWriter printWriter = new PrintWriter(outFile);
+        PrintWriter printWriter = new PrintWriter(outFile, "UTF-8");
         printWriter.println(TezUtilsInternal.convertDagPlanToString(dag.getJobPlan()));
         printWriter.close();
       } catch (IOException e) {
@@ -1355,7 +1355,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private class ServiceWithDependency implements ServiceStateChangeListener {
+  private static class ServiceWithDependency implements ServiceStateChangeListener {
     ServiceWithDependency(Service service) {
       this.service = service;
     }
@@ -1423,7 +1423,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private class ServiceThread extends Thread {
+  private static class ServiceThread extends Thread {
     final ServiceWithDependency serviceWithDependency;
     Throwable error = null;
     public ServiceThread(ServiceWithDependency serviceWithDependency) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 220b5b5..1dabe27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -471,6 +471,10 @@ public class RecoveryParser {
           vertexGroupCommitStatus.put(
               vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
           break;
+        default:
+          String message = "Found invalid summary event that was not handled"
+              + ", eventType=" + eventType.name();
+          throw new IOException(message);
       }
     }
 
@@ -667,8 +671,8 @@ public class RecoveryParser {
         LOG.warn("Corrupt data found when trying to read next event", ioe);
         break;
       }
-      if (event == null || skipAllOtherEvents) {
-        // reached end of data
+      if (skipAllOtherEvents) {
+        // hit an error - skip reading other events
         break;
       }
       HistoryEventType eventType = event.getEventType();
@@ -747,6 +751,7 @@ public class RecoveryParser {
           recoveredDAGData.dagState =
               ((DAGFinishedEvent) event).getState();
           skipAllOtherEvents = true;
+          break;
         }
         case CONTAINER_LAUNCHED:
         {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 53da741..b1cb3f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -83,14 +83,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   private InetSocketAddress address;
   private Server server;
 
-  class ContainerInfo {
-    ContainerInfo(ContainerId containerId) {
-      this.containerId = containerId;
+  static class ContainerInfo {
+    ContainerInfo() {
       this.lastReponse = null;
       this.lastRequestId = 0;
       this.currentAttemptId = null;
     }
-    ContainerId containerId;
     long lastRequestId;
     TezHeartbeatResponse lastReponse;
     TezTaskAttemptID currentAttemptId;
@@ -303,8 +301,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
       LOG.debug("ContainerId: " + containerId
           + " registered with TaskAttemptListener");
     }
-    ContainerInfo oldInfo = registeredContainers.put(containerId,
-        new ContainerInfo(containerId));
+    ContainerInfo oldInfo = registeredContainers.put(containerId, new ContainerInfo());
     if(oldInfo != null) {
       throw new TezUncheckedException(
           "Multiple registrations for containerId: " + containerId);

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bdd3689..26d5306 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -405,7 +405,7 @@ public class RootInputInitializerManager {
             "AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
       }
       Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
-      Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+      Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId());
       if (successfulAttempt == null) {
         successfulAttempt = attemptId;
         vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index dc18e9b..d2b298b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -63,8 +63,8 @@ public class StateChangeNotifier {
     TezVertexID vertexId = validateAndGetVertexId(vertexName);
     writeLock.lock();
     // Read within the lock, to ensure a consistent view is seen.
-    List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
     try {
+      List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
       ListenerContainer listenerContainer = new ListenerContainer(listener, stateSet);
       Set<ListenerContainer> listenerContainers = vertexListeners.get(vertexId);
       if (listenerContainers == null || !listenerContainers.contains(listenerContainer)) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index bec3626..f4e5bad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -545,7 +545,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       case DAG_FINISHED:
         recoveryCommitInProgress = false;
         DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
-        this.finishTime = finishedEvent.getFinishTime();
+        setFinishTime(finishedEvent.getFinishTime());
         recoveredState = finishedEvent.getState();
         this.fullCounters = finishedEvent.getTezCounters();
         return recoveredState;
@@ -983,6 +983,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     finishTime = clock.getTime();
   }
 
+  synchronized void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
   private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) {
     Map<String, Integer> taskStats = new HashMap<String, Integer>();
     taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, progressBuilder.getTotalTaskCount());

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 05dfc6e..aeb94d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -46,6 +46,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.InputFailedEvent;
 import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
 import org.apache.tez.runtime.api.impl.InputSpec;
 import org.apache.tez.runtime.api.impl.OutputSpec;
 import org.apache.tez.runtime.api.impl.TezEvent;
@@ -377,9 +378,10 @@ public class Edge {
         handleCompositeDataMovementEvent(tezEvent);
         break;
       case INPUT_FAILED_EVENT:
-        isDataMovementEvent = false;
-        // fall through
       case DATA_MOVEMENT_EVENT:
+        if (tezEvent.getEventType().equals(EventType.INPUT_FAILED_EVENT)) {
+          isDataMovementEvent = false;
+        }
         Map<Integer, List<Integer>> destTaskAndInputIndices = Maps
         .newHashMap();
         TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo()

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 00b5306..49900dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -92,7 +92,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
     tasksScheduled = true;
     List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
     for (int i = 0; i < managedTasks; ++i) {
-      tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+      tasksToStart.add(new TaskWithLocationHint(i, null));
     }
 
     if (!tasksToStart.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1c8fb8d..ccac620 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1093,11 +1093,12 @@ public class TaskAttemptImpl implements TaskAttempt,
         ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
       }
       
-      // this should catch at test time if any new events are missing the error cause
-      assert event instanceof TaskAttemptEventTerminationCauseEvent;
-      
       if (event instanceof TaskAttemptEventTerminationCauseEvent) {
         ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
+      } else {
+        throw new TezUncheckedException("Invalid event received in TerminateTransition"
+            + ", requiredClass=TaskAttemptEventTerminationCauseEvent"
+            + ", eventClass=" + event.getClass().getName());
       }
 
       ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index a4c4dee..149033c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -453,12 +453,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
     readLock.lock();
 
-    if (!attempts.containsKey(attemptID)) {
-      throw new TezUncheckedException("Unknown TA: " + attemptID
-          + " asking for events from task:" + getTaskId());
-    }
-
     try {
+      if (!attempts.containsKey(attemptID)) {
+        throw new TezUncheckedException("Unknown TA: " + attemptID
+            + " asking for events from task:" + getTaskId());
+      }
+
       if (tezEventsForTaskAttempts.size() > fromEventId) {
         int actualMax = Math.min(maxEvents,
             (tezEventsForTaskAttempts.size() - fromEventId));
@@ -1424,7 +1424,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
-    if (commitAttempt != null && commitAttempt.equals(attempt)) {
+    if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
       LOG.info("Removing commit attempt: " + commitAttempt);
       commitAttempt = null;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b1c93da..f26e4ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2245,32 +2245,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         case KILLED:
         case FAILED:
         case ERROR:
-          switch (desiredState) {
-            case SUCCEEDED:
-              vertex.succeededTaskCount = vertex.numTasks;
-              vertex.completedTaskCount = vertex.numTasks;
-              break;
-            case KILLED:
-              vertex.killedTaskCount = vertex.numTasks;
-              break;
-            case FAILED:
-            case ERROR:
-              vertex.failedTaskCount = vertex.numTasks;
-              break;
+          if (desiredState == VertexState.SUCCEEDED) {
+            vertex.succeededTaskCount = vertex.numTasks;
+            vertex.completedTaskCount = vertex.numTasks;
+          } else if (desiredState == VertexState.KILLED) {
+            vertex.killedTaskCount = vertex.numTasks;
+          } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
+            vertex.failedTaskCount = vertex.numTasks;
           }
           if (vertex.tasks != null) {
             TaskState taskState = TaskState.KILLED;
-            switch (desiredState) {
-              case SUCCEEDED:
-                taskState = TaskState.SUCCEEDED;
-                break;
-              case KILLED:
-                taskState = TaskState.KILLED;
-                break;
-              case FAILED:
-              case ERROR:
-                taskState = TaskState.FAILED;
-                break;
+            if (desiredState == VertexState.SUCCEEDED) {
+              taskState = TaskState.SUCCEEDED;
+            } else if (desiredState == VertexState.KILLED) {
+              taskState = TaskState.KILLED;
+            } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
+              taskState = TaskState.FAILED;
             }
             for (Task task : vertex.tasks.values()) {
               vertex.eventHandler.handle(
@@ -2408,16 +2398,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
             // recover tasks
             if (vertex.tasks != null && vertex.numTasks != 0) {
               TaskState taskState = TaskState.KILLED;
-              switch (vertex.recoveredState) {
-                case SUCCEEDED:
-                  taskState = TaskState.SUCCEEDED;
-                  break;
-                case KILLED:
-                  taskState = TaskState.KILLED;
-                  break;
-                case FAILED:
-                  taskState = TaskState.FAILED;
-                  break;
+              if (vertex.recoveredState == VertexState.SUCCEEDED) {
+                taskState = TaskState.SUCCEEDED;
+              } else if (vertex.recoveredState == VertexState.KILLED) {
+                taskState = TaskState.KILLED;
+              } else if (vertex.recoveredState == VertexState.FAILED) {
+                taskState = TaskState.FAILED;
               }
               for (Task task : vertex.tasks.values()) {
                 vertex.eventHandler.handle(
@@ -2814,16 +2800,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
           assert vertex.tasks.size() == vertex.numTasks;
           if (vertex.tasks != null  && vertex.numTasks != 0) {
             TaskState taskState = TaskState.KILLED;
-            switch (vertex.recoveredState) {
-              case SUCCEEDED:
-                taskState = TaskState.SUCCEEDED;
-                break;
-              case KILLED:
-                taskState = TaskState.KILLED;
-                break;
-              case FAILED:
-                taskState = TaskState.FAILED;
-                break;
+            if (vertex.recoveredState == VertexState.SUCCEEDED) {
+              taskState = TaskState.SUCCEEDED;
+            } else if (vertex.recoveredState == VertexState.KILLED) {
+              taskState = TaskState.KILLED;
+            } else if (vertex.recoveredState == VertexState.FAILED) {
+              taskState = TaskState.FAILED;
             }
             for (Task task : vertex.tasks.values()) {
               vertex.eventHandler.handle(
@@ -2835,7 +2817,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
               vertex.recoveryCodeSimulatingStart();
               endState = VertexState.RUNNING;
             } catch (AMUserCodeException e) {
-              String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
+              String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
               LOG.error(msg, e);
               vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
                   msg + "," + ExceptionUtils.getStackTrace(e.getCause()));

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index f02b73b..da86151 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -348,7 +348,7 @@ public class VertexManager {
     Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
     if (completions != null && !completions.isEmpty()) {
       for (TezTaskAttemptID tezTaskAttemptID : completions) {
-        Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+        Integer taskId = Integer.valueOf(tezTaskAttemptID.getTaskID().getId());
         String vertexName =
             appContext.getCurrentDAG().getVertex(
                 tezTaskAttemptID.getTaskID().getVertexID()).getName();
@@ -370,7 +370,7 @@ public class VertexManager {
   }
 
   public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
-    Integer taskId = new Integer(tezTaskId.getId());
+    Integer taskId = Integer.valueOf(tezTaskId.getId());
     String vertexName =
         appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 2ebcebb..4af07bb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -234,6 +234,36 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
     public int compareTo(TaskRequest request) {
       return request.priority.compareTo(this.priority);
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TaskRequest that = (TaskRequest) o;
+
+      if (priority != null ? !priority.equals(that.priority) : that.priority != null) {
+        return false;
+      }
+      if (task != null ? !task.equals(that.task) : that.task != null) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = 1;
+      result = 7841 * result + (task != null ? task.hashCode() : 0);
+      result = 7841 * result + (priority != null ? priority.hashCode() : 0);
+      return result;
+    }
+
   }
 
   static class AllocateTaskRequest extends TaskRequest {
@@ -246,6 +276,39 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
       this.capability = capability;
       this.clientCookie = clientCookie;
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+
+      AllocateTaskRequest that = (AllocateTaskRequest) o;
+
+      if (capability != null ? !capability.equals(that.capability) : that.capability != null) {
+        return false;
+      }
+      if (clientCookie != null ? !clientCookie.equals(that.clientCookie) :
+          that.clientCookie != null) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = super.hashCode();
+      result = 12329 * result + (capability != null ? capability.hashCode() : 0);
+      result = 12329 * result + (clientCookie != null ? clientCookie.hashCode() : 0);
+      return result;
+    }
   }
 
   static class DeallocateTaskRequest extends TaskRequest {

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index 0ed28a2..c3b12c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -42,4 +42,35 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
     return container;
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    NMCommunicatorLaunchRequestEvent that = (NMCommunicatorLaunchRequestEvent) o;
+
+    if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
+      return false;
+    }
+    if (container != null ? !container.equals(that.container) : that.container != null) {
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = super.hashCode();
+    result = 7001 * result + (clc != null ? clc.hashCode() : 0);
+    result = 7001 * result + (container != null ? container.hashCode() : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f8fbd53..a17d5e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -153,7 +153,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
   @VisibleForTesting
   protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
 
-  class CRCookie {
+  static class CRCookie {
     // Do not use these variables directly. Can caused mocked unit tests to fail.
     private Object task;
     private Object appCookie;
@@ -1213,13 +1213,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
         if (preemptedTaskPriority == null ||
             !isHigherPriority(taskPriority, preemptedTaskPriority)) {
           // keep the lower priority
-          preemptedTaskPriority = taskPriority;
           if (taskPriority.equals(preemptedTaskPriority)) {
             numEntriesAtPreemptedPriority++;
           } else {
             // this is at a lower priority than existing
             numEntriesAtPreemptedPriority = 1;
           }
+          preemptedTaskPriority = taskPriority;
         }
       }
       if(preemptedTaskPriority != null) {
@@ -1935,8 +1935,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
      * Intended to be used in cases where new Container requests come in 
      */
     public void triggerScheduling(boolean scheduleAll) {
-      this.tryAssigningAll = scheduleAll;
       synchronized(this) {
+        this.tryAssigningAll = scheduleAll;
         this.notify();
       }
     }
@@ -2058,7 +2058,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
     LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
   }
 
-  private class ContainerIterable implements Iterable<Container> {
+  private static class ContainerIterable implements Iterable<Container> {
 
     private final Iterable<HeldContainer> delayedContainers;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d4f46b..536001c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -471,10 +471,10 @@ public class AMContainerImpl implements AMContainer {
             this.credentialsChanged ? this.credentials : null, this.credentialsChanged);
         this.additionalLocalResources = null;
         this.credentialsChanged = false;
+        this.pullAttempt = null;
         return amContainerTask;
       }
     } finally {
-      this.pullAttempt = null;
       this.writeLock.unlock();
     }
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 39a3bfc..d9ff99a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -94,15 +94,12 @@ public class AMNodeTracker extends AbstractService implements
 
   private void addToBlackList(NodeId nodeId) {
     String host = nodeId.getHost();
-    Set<NodeId> nodes;
-    
+
     if (!blacklistMap.containsKey(host)) {
-      nodes = new HashSet<NodeId>();
-      blacklistMap.put(host, nodes);
-    } else {
-      nodes = blacklistMap.get(host);
+      blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
     }
-    
+    Set<NodeId> nodes = blacklistMap.get(host);
+
     if (!nodes.contains(nodeId)) {
       nodes.add(nodeId);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
index c401e5d..a212041 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
@@ -44,7 +44,7 @@ public class TezAMPolicyProvider extends PolicyProvider {
 
   @Override
   public Service[] getServices() {
-    return tezApplicationMasterServices;
+    return tezApplicationMasterServices.clone();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index b6f4d83..c452187 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.charset.Charset;
 
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
@@ -36,6 +37,7 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
 
   private TezVertexID vertexID;
   private long commitStartTime;
+  private final Charset charSet = Charset.forName("UTF-8");
 
   public VertexCommitStartedEvent() {
   }
@@ -100,14 +102,14 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
         .setTimestamp(commitStartTime)
         .setEventType(getEventType().ordinal())
         .setEventPayload(
-            ByteString.copyFrom(vertexID.toString().getBytes()));
+            ByteString.copyFrom(vertexID.toString().getBytes(charSet)));
     builder.build().writeDelimitedTo(outputStream);
   }
 
   @Override
   public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
     this.vertexID = TezVertexID.fromString(
-        new String(proto.getEventPayload().toByteArray()));
+        new String(proto.getEventPayload().toByteArray(), charSet));
     this.commitStartTime = proto.getTimestamp();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index ecead77..cc9033d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -18,7 +18,7 @@
 
 package org.apache.tez.dag.utils;
 
-import java.io.FileWriter;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -230,8 +230,8 @@ public class Graph {
   }
 
   public void save(String filePath) throws IOException {
-    FileWriter fout = new FileWriter(filePath);
-    fout.write(generateGraphViz());
+    FileOutputStream fout = new FileOutputStream(filePath);
+    fout.write(generateGraphViz().getBytes("UTF-8"));
     fout.close();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
index 59aede8..d164efc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
@@ -110,7 +110,6 @@ public class TaskSpecificLaunchCmdOption {
    * The first element of the array is the general log level. </p>
    * The second level, if it exists, is the additional per logger configuration.
    *
-   *
    * @return parsed form of the log string specified. null if none specified
    */
   public String[] getTaskSpecificLogParams() {