You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2015/01/16 00:40:32 UTC
tez git commit: TEZ-1951. Fix general findbugs warnings in tez-dag.
(hitesh)
Repository: tez
Updated Branches:
refs/heads/master b723a05da -> 2762d9b5c
TEZ-1951. Fix general findbugs warnings in tez-dag. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2762d9b5
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2762d9b5
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2762d9b5
Branch: refs/heads/master
Commit: 2762d9b5c7c4c2ea5029948eb69525b90da1e33c
Parents: b723a05
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jan 15 15:40:06 2015 -0800
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jan 15 15:40:06 2015 -0800
----------------------------------------------------------------------
CHANGES.txt | 1 +
tez-dag/findbugs-exclude.xml | 93 ++++++++++++++++++++
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +-
.../org/apache/tez/dag/app/RecoveryParser.java | 9 +-
.../dag/app/TaskAttemptListenerImpTezDag.java | 9 +-
.../app/dag/RootInputInitializerManager.java | 2 +-
.../tez/dag/app/dag/StateChangeNotifier.java | 2 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 6 +-
.../org/apache/tez/dag/app/dag/impl/Edge.java | 6 +-
.../dag/impl/ImmediateStartVertexManager.java | 2 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 7 +-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 12 +--
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 70 ++++++---------
.../tez/dag/app/dag/impl/VertexManager.java | 4 +-
.../dag/app/rm/LocalTaskSchedulerService.java | 63 +++++++++++++
.../rm/NMCommunicatorLaunchRequestEvent.java | 31 +++++++
.../dag/app/rm/YarnTaskSchedulerService.java | 8 +-
.../dag/app/rm/container/AMContainerImpl.java | 2 +-
.../tez/dag/app/rm/node/AMNodeTracker.java | 11 +--
.../security/authorize/TezAMPolicyProvider.java | 2 +-
.../events/VertexCommitStartedEvent.java | 6 +-
.../java/org/apache/tez/dag/utils/Graph.java | 6 +-
.../dag/utils/TaskSpecificLaunchCmdOption.java | 1 -
23 files changed, 268 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dd183a0..dfa12cd 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1951. Fix general findbugs warnings in tez-dag.
TEZ-1905. Fix findbugs warnings in tez-tests.
TEZ-1945. Remove 2 GB memlimit restriction in MergeManager.
TEZ-1913. Reduce deserialize cost in ValuesIterator.
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index d3d365d..77a18ec 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -22,4 +22,97 @@
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto"/>
+ <Field name="unknownFields"/>
+ <Bug pattern="SE_BAD_FIELD"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto"/>
+ <Field name="PARSER"/>
+ <Bug pattern="MS_SHOULD_BE_FINAL"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.recovery\.records\.RecoveryProtos\$.*Proto\$Builder"/>
+ <Method name="maybeForceBuilderInitialization"/>
+ <Bug pattern="UCF_USELESS_CONTROL_FLOW"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.DAGImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.TaskImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.TaskAttemptImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.dag\.impl\.VertexImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.rm\.container\.AMContainerImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.rm\.node\.AMNodeImpl\$.*"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.dag\.speculation\.legacy\.LegacySpeculator"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.rm\.node\.AMNodeTracker"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="~org\.apache\.tez\.dag\.app\.rm\.TaskSchedulerEventHandler"/>
+ <Bug pattern="BC_UNCONFIRMED_CAST"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.app.rm.TaskSchedulerAppCallbackWrapper"/>
+ <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.app.DAGAppMaster$DAGAppMasterShutdownHook"/>
+ <Method name="run"/>
+ <Bug pattern="WA_NOT_IN_LOOP"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.app.DAGAppMaster"/>
+ <Method name="handle" params="org.apache.tez.dag.app.dag.event.DAGAppMasterEvent" returns="void"/>
+ <Bug pattern="SF_SWITCH_NO_DEFAULT"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.app.DAGAppMaster"/>
+ <Method name="<init>"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption"/>
+ <Method name="getTaskSpecificLogParams"/>
+ <Field name="tsLogParams"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 04e3b1e..6dddf6a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -856,7 +856,7 @@ public class DAGAppMaster extends AbstractService {
LOG.info("Writing DAG plan to: " + logFile);
File outFile = new File(logFile);
try {
- PrintWriter printWriter = new PrintWriter(outFile);
+ PrintWriter printWriter = new PrintWriter(outFile, "UTF-8");
printWriter.println(TezUtilsInternal.convertDagPlanToString(dag.getJobPlan()));
printWriter.close();
} catch (IOException e) {
@@ -1355,7 +1355,7 @@ public class DAGAppMaster extends AbstractService {
}
}
- private class ServiceWithDependency implements ServiceStateChangeListener {
+ private static class ServiceWithDependency implements ServiceStateChangeListener {
ServiceWithDependency(Service service) {
this.service = service;
}
@@ -1423,7 +1423,7 @@ public class DAGAppMaster extends AbstractService {
}
}
- private class ServiceThread extends Thread {
+ private static class ServiceThread extends Thread {
final ServiceWithDependency serviceWithDependency;
Throwable error = null;
public ServiceThread(ServiceWithDependency serviceWithDependency) {
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 220b5b5..1dabe27 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -471,6 +471,10 @@ public class RecoveryParser {
vertexGroupCommitStatus.put(
vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
break;
+ default:
+ String message = "Found invalid summary event that was not handled"
+ + ", eventType=" + eventType.name();
+ throw new IOException(message);
}
}
@@ -667,8 +671,8 @@ public class RecoveryParser {
LOG.warn("Corrupt data found when trying to read next event", ioe);
break;
}
- if (event == null || skipAllOtherEvents) {
- // reached end of data
+ if (skipAllOtherEvents) {
+ // hit an error - skip reading other events
break;
}
HistoryEventType eventType = event.getEventType();
@@ -747,6 +751,7 @@ public class RecoveryParser {
recoveredDAGData.dagState =
((DAGFinishedEvent) event).getState();
skipAllOtherEvents = true;
+ break;
}
case CONTAINER_LAUNCHED:
{
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 53da741..b1cb3f6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -83,14 +83,12 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
private InetSocketAddress address;
private Server server;
- class ContainerInfo {
- ContainerInfo(ContainerId containerId) {
- this.containerId = containerId;
+ static class ContainerInfo {
+ ContainerInfo() {
this.lastReponse = null;
this.lastRequestId = 0;
this.currentAttemptId = null;
}
- ContainerId containerId;
long lastRequestId;
TezHeartbeatResponse lastReponse;
TezTaskAttemptID currentAttemptId;
@@ -303,8 +301,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
LOG.debug("ContainerId: " + containerId
+ " registered with TaskAttemptListener");
}
- ContainerInfo oldInfo = registeredContainers.put(containerId,
- new ContainerInfo(containerId));
+ ContainerInfo oldInfo = registeredContainers.put(containerId, new ContainerInfo());
if(oldInfo != null) {
throw new TezUncheckedException(
"Multiple registrations for containerId: " + containerId);
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
index bdd3689..26d5306 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/RootInputInitializerManager.java
@@ -405,7 +405,7 @@ public class RootInputInitializerManager {
"AttemptId is -1. This is likely caused by TEZ-1577; recovery not supported when InputInitializerEvents are used");
}
Map<Integer, Integer> vertexSuccessfulAttemptMap = firstSuccessfulAttemptMap.get(vertexName);
- Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId);
+ Integer successfulAttempt = vertexSuccessfulAttemptMap.get(taskId.getId());
if (successfulAttempt == null) {
successfulAttempt = attemptId;
vertexSuccessfulAttemptMap.put(taskId.getId(), successfulAttempt);
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
index dc18e9b..d2b298b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/StateChangeNotifier.java
@@ -63,8 +63,8 @@ public class StateChangeNotifier {
TezVertexID vertexId = validateAndGetVertexId(vertexName);
writeLock.lock();
// Read within the lock, to ensure a consistent view is seen.
- List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
try {
+ List<VertexStateUpdate> previousUpdates = lastKnowStatesMap.get(vertexId);
ListenerContainer listenerContainer = new ListenerContainer(listener, stateSet);
Set<ListenerContainer> listenerContainers = vertexListeners.get(vertexId);
if (listenerContainers == null || !listenerContainers.contains(listenerContainer)) {
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index bec3626..f4e5bad 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -545,7 +545,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
case DAG_FINISHED:
recoveryCommitInProgress = false;
DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
- this.finishTime = finishedEvent.getFinishTime();
+ setFinishTime(finishedEvent.getFinishTime());
recoveredState = finishedEvent.getState();
this.fullCounters = finishedEvent.getTezCounters();
return recoveredState;
@@ -983,6 +983,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
finishTime = clock.getTime();
}
+ synchronized void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
private Map<String, Integer> constructTaskStats(ProgressBuilder progressBuilder) {
Map<String, Integer> taskStats = new HashMap<String, Integer>();
taskStats.put(ATSConstants.NUM_COMPLETED_TASKS, progressBuilder.getTotalTaskCount());
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 05dfc6e..aeb94d7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -46,6 +46,7 @@ import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
@@ -377,9 +378,10 @@ public class Edge {
handleCompositeDataMovementEvent(tezEvent);
break;
case INPUT_FAILED_EVENT:
- isDataMovementEvent = false;
- // fall through
case DATA_MOVEMENT_EVENT:
+ if (tezEvent.getEventType().equals(EventType.INPUT_FAILED_EVENT)) {
+ isDataMovementEvent = false;
+ }
Map<Integer, List<Integer>> destTaskAndInputIndices = Maps
.newHashMap();
TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo()
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
index 00b5306..49900dd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ImmediateStartVertexManager.java
@@ -92,7 +92,7 @@ public class ImmediateStartVertexManager extends VertexManagerPlugin {
tasksScheduled = true;
List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
for (int i = 0; i < managedTasks; ++i) {
- tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+ tasksToStart.add(new TaskWithLocationHint(i, null));
}
if (!tasksToStart.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1c8fb8d..ccac620 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -1093,11 +1093,12 @@ public class TaskAttemptImpl implements TaskAttempt,
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
- // this should catch at test time if any new events are missing the error cause
- assert event instanceof TaskAttemptEventTerminationCauseEvent;
-
if (event instanceof TaskAttemptEventTerminationCauseEvent) {
ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
+ } else {
+ throw new TezUncheckedException("Invalid event received in TerminateTransition"
+ + ", requiredClass=TaskAttemptEventTerminationCauseEvent"
+ + ", eventClass=" + event.getClass().getName());
}
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index a4c4dee..149033c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -453,12 +453,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
List<TezEvent> events = EMPTY_TASK_ATTEMPT_TEZ_EVENTS;
readLock.lock();
- if (!attempts.containsKey(attemptID)) {
- throw new TezUncheckedException("Unknown TA: " + attemptID
- + " asking for events from task:" + getTaskId());
- }
-
try {
+ if (!attempts.containsKey(attemptID)) {
+ throw new TezUncheckedException("Unknown TA: " + attemptID
+ + " asking for events from task:" + getTaskId());
+ }
+
if (tezEventsForTaskAttempts.size() > fromEventId) {
int actualMax = Math.min(maxEvents,
(tezEventsForTaskAttempts.size() - fromEventId));
@@ -1424,7 +1424,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
- if (commitAttempt != null && commitAttempt.equals(attempt)) {
+ if (commitAttempt != null && commitAttempt.equals(attempt.getID())) {
LOG.info("Removing commit attempt: " + commitAttempt);
commitAttempt = null;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b1c93da..f26e4ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2245,32 +2245,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case KILLED:
case FAILED:
case ERROR:
- switch (desiredState) {
- case SUCCEEDED:
- vertex.succeededTaskCount = vertex.numTasks;
- vertex.completedTaskCount = vertex.numTasks;
- break;
- case KILLED:
- vertex.killedTaskCount = vertex.numTasks;
- break;
- case FAILED:
- case ERROR:
- vertex.failedTaskCount = vertex.numTasks;
- break;
+ if (desiredState == VertexState.SUCCEEDED) {
+ vertex.succeededTaskCount = vertex.numTasks;
+ vertex.completedTaskCount = vertex.numTasks;
+ } else if (desiredState == VertexState.KILLED) {
+ vertex.killedTaskCount = vertex.numTasks;
+ } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
+ vertex.failedTaskCount = vertex.numTasks;
}
if (vertex.tasks != null) {
TaskState taskState = TaskState.KILLED;
- switch (desiredState) {
- case SUCCEEDED:
- taskState = TaskState.SUCCEEDED;
- break;
- case KILLED:
- taskState = TaskState.KILLED;
- break;
- case FAILED:
- case ERROR:
- taskState = TaskState.FAILED;
- break;
+ if (desiredState == VertexState.SUCCEEDED) {
+ taskState = TaskState.SUCCEEDED;
+ } else if (desiredState == VertexState.KILLED) {
+ taskState = TaskState.KILLED;
+ } else if (desiredState == VertexState.FAILED || desiredState == VertexState.ERROR) {
+ taskState = TaskState.FAILED;
}
for (Task task : vertex.tasks.values()) {
vertex.eventHandler.handle(
@@ -2408,16 +2398,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
// recover tasks
if (vertex.tasks != null && vertex.numTasks != 0) {
TaskState taskState = TaskState.KILLED;
- switch (vertex.recoveredState) {
- case SUCCEEDED:
- taskState = TaskState.SUCCEEDED;
- break;
- case KILLED:
- taskState = TaskState.KILLED;
- break;
- case FAILED:
- taskState = TaskState.FAILED;
- break;
+ if (vertex.recoveredState == VertexState.SUCCEEDED) {
+ taskState = TaskState.SUCCEEDED;
+ } else if (vertex.recoveredState == VertexState.KILLED) {
+ taskState = TaskState.KILLED;
+ } else if (vertex.recoveredState == VertexState.FAILED) {
+ taskState = TaskState.FAILED;
}
for (Task task : vertex.tasks.values()) {
vertex.eventHandler.handle(
@@ -2814,16 +2800,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
assert vertex.tasks.size() == vertex.numTasks;
if (vertex.tasks != null && vertex.numTasks != 0) {
TaskState taskState = TaskState.KILLED;
- switch (vertex.recoveredState) {
- case SUCCEEDED:
- taskState = TaskState.SUCCEEDED;
- break;
- case KILLED:
- taskState = TaskState.KILLED;
- break;
- case FAILED:
- taskState = TaskState.FAILED;
- break;
+ if (vertex.recoveredState == VertexState.SUCCEEDED) {
+ taskState = TaskState.SUCCEEDED;
+ } else if (vertex.recoveredState == VertexState.KILLED) {
+ taskState = TaskState.KILLED;
+ } else if (vertex.recoveredState == VertexState.FAILED) {
+ taskState = TaskState.FAILED;
}
for (Task task : vertex.tasks.values()) {
vertex.eventHandler.handle(
@@ -2835,7 +2817,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
vertex.recoveryCodeSimulatingStart();
endState = VertexState.RUNNING;
} catch (AMUserCodeException e) {
- String msg = "Exception in " + e.getSource() +", vertex:" + vertex.getLogIdentifier();
+ String msg = "Exception in " + e.getSource() + ", vertex:" + vertex.getLogIdentifier();
LOG.error(msg, e);
vertex.finished(VertexState.FAILED, VertexTerminationCause.AM_USERCODE_FAILURE,
msg + "," + ExceptionUtils.getStackTrace(e.getCause()));
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
index f02b73b..da86151 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java
@@ -348,7 +348,7 @@ public class VertexManager {
Map<String, List<Integer>> pluginCompletionsMap = Maps.newHashMap();
if (completions != null && !completions.isEmpty()) {
for (TezTaskAttemptID tezTaskAttemptID : completions) {
- Integer taskId = new Integer(tezTaskAttemptID.getTaskID().getId());
+ Integer taskId = Integer.valueOf(tezTaskAttemptID.getTaskID().getId());
String vertexName =
appContext.getCurrentDAG().getVertex(
tezTaskAttemptID.getTaskID().getVertexID()).getName();
@@ -370,7 +370,7 @@ public class VertexManager {
}
public void onSourceTaskCompleted(TezTaskID tezTaskId) throws AMUserCodeException {
- Integer taskId = new Integer(tezTaskId.getId());
+ Integer taskId = Integer.valueOf(tezTaskId.getId());
String vertexName =
appContext.getCurrentDAG().getVertex(tezTaskId.getVertexID()).getName();
try {
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
index 2ebcebb..4af07bb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/LocalTaskSchedulerService.java
@@ -234,6 +234,36 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
public int compareTo(TaskRequest request) {
return request.priority.compareTo(this.priority);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TaskRequest that = (TaskRequest) o;
+
+ if (priority != null ? !priority.equals(that.priority) : that.priority != null) {
+ return false;
+ }
+ if (task != null ? !task.equals(that.task) : that.task != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = 1;
+ result = 7841 * result + (task != null ? task.hashCode() : 0);
+ result = 7841 * result + (priority != null ? priority.hashCode() : 0);
+ return result;
+ }
+
}
static class AllocateTaskRequest extends TaskRequest {
@@ -246,6 +276,39 @@ public class LocalTaskSchedulerService extends TaskSchedulerService {
this.capability = capability;
this.clientCookie = clientCookie;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ AllocateTaskRequest that = (AllocateTaskRequest) o;
+
+ if (capability != null ? !capability.equals(that.capability) : that.capability != null) {
+ return false;
+ }
+ if (clientCookie != null ? !clientCookie.equals(that.clientCookie) :
+ that.clientCookie != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 12329 * result + (capability != null ? capability.hashCode() : 0);
+ result = 12329 * result + (clientCookie != null ? clientCookie.hashCode() : 0);
+ return result;
+ }
}
static class DeallocateTaskRequest extends TaskRequest {
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
index 0ed28a2..c3b12c0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/NMCommunicatorLaunchRequestEvent.java
@@ -42,4 +42,35 @@ public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
return container;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+
+ NMCommunicatorLaunchRequestEvent that = (NMCommunicatorLaunchRequestEvent) o;
+
+ if (clc != null ? !clc.equals(that.clc) : that.clc != null) {
+ return false;
+ }
+ if (container != null ? !container.equals(that.container) : that.container != null) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 7001 * result + (clc != null ? clc.hashCode() : 0);
+ result = 7001 * result + (container != null ? container.hashCode() : 0);
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
index f8fbd53..a17d5e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java
@@ -153,7 +153,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
@VisibleForTesting
protected AtomicBoolean shouldUnregister = new AtomicBoolean(false);
- class CRCookie {
+ static class CRCookie {
// Do not use these variables directly. Can caused mocked unit tests to fail.
private Object task;
private Object appCookie;
@@ -1213,13 +1213,13 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
if (preemptedTaskPriority == null ||
!isHigherPriority(taskPriority, preemptedTaskPriority)) {
// keep the lower priority
- preemptedTaskPriority = taskPriority;
if (taskPriority.equals(preemptedTaskPriority)) {
numEntriesAtPreemptedPriority++;
} else {
// this is at a lower priority than existing
numEntriesAtPreemptedPriority = 1;
}
+ preemptedTaskPriority = taskPriority;
}
}
if(preemptedTaskPriority != null) {
@@ -1935,8 +1935,8 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
* Intended to be used in cases where new Container requests come in
*/
public void triggerScheduling(boolean scheduleAll) {
- this.tryAssigningAll = scheduleAll;
synchronized(this) {
+ this.tryAssigningAll = scheduleAll;
this.notify();
}
}
@@ -2058,7 +2058,7 @@ public class YarnTaskSchedulerService extends TaskSchedulerService
LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
}
- private class ContainerIterable implements Iterable<Container> {
+ private static class ContainerIterable implements Iterable<Container> {
private final Iterable<HeldContainer> delayedContainers;
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d4f46b..536001c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -471,10 +471,10 @@ public class AMContainerImpl implements AMContainer {
this.credentialsChanged ? this.credentials : null, this.credentialsChanged);
this.additionalLocalResources = null;
this.credentialsChanged = false;
+ this.pullAttempt = null;
return amContainerTask;
}
} finally {
- this.pullAttempt = null;
this.writeLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
index 39a3bfc..d9ff99a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/node/AMNodeTracker.java
@@ -94,15 +94,12 @@ public class AMNodeTracker extends AbstractService implements
private void addToBlackList(NodeId nodeId) {
String host = nodeId.getHost();
- Set<NodeId> nodes;
-
+
if (!blacklistMap.containsKey(host)) {
- nodes = new HashSet<NodeId>();
- blacklistMap.put(host, nodes);
- } else {
- nodes = blacklistMap.get(host);
+ blacklistMap.putIfAbsent(host, new HashSet<NodeId>());
}
-
+ Set<NodeId> nodes = blacklistMap.get(host);
+
if (!nodes.contains(nodeId)) {
nodes.add(nodeId);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
index c401e5d..a212041 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/security/authorize/TezAMPolicyProvider.java
@@ -44,7 +44,7 @@ public class TezAMPolicyProvider extends PolicyProvider {
@Override
public Service[] getServices() {
- return tezApplicationMasterServices;
+ return tezApplicationMasterServices.clone();
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index b6f4d83..c452187 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.Charset;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
@@ -36,6 +37,7 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
private TezVertexID vertexID;
private long commitStartTime;
+ private final Charset charSet = Charset.forName("UTF-8");
public VertexCommitStartedEvent() {
}
@@ -100,14 +102,14 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
.setTimestamp(commitStartTime)
.setEventType(getEventType().ordinal())
.setEventPayload(
- ByteString.copyFrom(vertexID.toString().getBytes()));
+ ByteString.copyFrom(vertexID.toString().getBytes(charSet)));
builder.build().writeDelimitedTo(outputStream);
}
@Override
public void fromSummaryProtoStream(SummaryEventProto proto) throws IOException {
this.vertexID = TezVertexID.fromString(
- new String(proto.getEventPayload().toByteArray()));
+ new String(proto.getEventPayload().toByteArray(), charSet));
this.commitStartTime = proto.getTimestamp();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
index ecead77..cc9033d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/Graph.java
@@ -18,7 +18,7 @@
package org.apache.tez.dag.utils;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -230,8 +230,8 @@ public class Graph {
}
public void save(String filePath) throws IOException {
- FileWriter fout = new FileWriter(filePath);
- fout.write(generateGraphViz());
+ FileOutputStream fout = new FileOutputStream(filePath);
+ fout.write(generateGraphViz().getBytes("UTF-8"));
fout.close();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/2762d9b5/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
index 59aede8..d164efc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TaskSpecificLaunchCmdOption.java
@@ -110,7 +110,6 @@ public class TaskSpecificLaunchCmdOption {
* The first element of the array is the general log level. </p>
* The second level, if it exists, is the additional per logger configuration.
*
- *
* @return parsed form of the log string specified. null if none specified
*/
public String[] getTaskSpecificLogParams() {