You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/07/23 23:35:27 UTC
git commit: TEZ-272. Address part of findbugs warnings caught after
running findbugs on tez codebase. (hitesh)
Updated Branches:
refs/heads/master c56db3a44 -> e1f64bf59
TEZ-272. Address part of findbugs warnings caught after running findbugs on tez codebase. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e1f64bf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e1f64bf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e1f64bf5
Branch: refs/heads/master
Commit: e1f64bf59ef2353207bc548ad45660419ceb1ef5
Parents: c56db3a
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Jul 23 14:33:49 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Jul 23 14:33:49 2013 -0700
----------------------------------------------------------------------
pom.xml | 43 +++++++++
tez-common/findbugs-exclude.xml | 3 +
.../tez/common/counters/ResourceBundles.java | 7 +-
tez-dag-api/findbugs-exclude.xml | 3 +
.../main/java/org/apache/tez/dag/api/DAG.java | 25 ++++--
.../apache/tez/dag/api/DagTypeConverters.java | 9 +-
.../java/org/apache/tez/dag/api/Vertex.java | 4 +-
.../apache/tez/dag/api/VertexLocationHint.java | 92 +++++++++++---------
.../org/apache/tez/dag/api/client/Progress.java | 17 ++--
.../dag/api/client/rpc/DAGClientRPCImpl.java | 9 +-
tez-dag/findbugs-exclude.xml | 12 +++
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 4 +-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 25 +++++-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 35 +++++---
.../tez/dag/app/dag/impl/TestTaskImpl.java | 2 +-
tez-engine-api/findbugs-exclude.xml | 3 +
tez-engine/findbugs-exclude.xml | 3 +
tez-mapreduce-examples/findbugs-exclude.xml | 3 +
.../tez/mapreduce/examples/MRRSleepJob.java | 25 +++---
tez-mapreduce/findbugs-exclude.xml | 3 +
.../tez/mapreduce/hadoop/InputSplitInfo.java | 8 +-
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 18 ++--
.../tez/mapreduce/hadoop/TestMRHelpers.java | 18 ++--
tez-yarn-client/findbugs-exclude.xml | 3 +
.../org/apache/tez/mapreduce/DAGJobStatus.java | 2 +-
.../org/apache/tez/mapreduce/YARNRunner.java | 25 ++++--
28 files changed, 289 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d5ac0da..0a3579f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -351,6 +351,11 @@
</systemPropertyVariables>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>
@@ -423,6 +428,44 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>findbugs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ <property>
+ <name>findbugs</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>site</id>
+ <phase>pre-site</phase>
+ <goals>
+ <goal>findbugs</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludeFilterFile>findbugs-exclude.xml</excludeFilterFile>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <version>2.5.2</version>
+ </plugin>
+ </plugins>
+ </reporting>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-common/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-common/findbugs-exclude.xml b/tez-common/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-common/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java b/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java
index daf972e..8113cab 100644
--- a/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java
+++ b/tez-common/src/main/java/org/apache/tez/common/counters/ResourceBundles.java
@@ -55,11 +55,14 @@ public class ResourceBundles {
try {
ResourceBundle bundle = getBundle(bundleName);
value = (T) bundle.getObject(getLookupKey(key, suffix));
+ if (value != null) {
+ return value;
+ }
}
catch (Exception e) {
- return defaultValue;
+ // Ignore
}
- return value == null ? defaultValue : value;
+ return defaultValue;
}
private static String getLookupKey(String key, String suffix) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag-api/findbugs-exclude.xml b/tez-dag-api/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-dag-api/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 8fb665a..e027922 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -97,7 +97,7 @@ public class DAG { // FIXME rename to Topology
}
// AnnotatedVertex is used by verify()
- private class AnnotatedVertex {
+ private static class AnnotatedVertex {
Vertex v;
int index; //for Tarjan's algorithm
@@ -273,16 +273,23 @@ public class DAG { // FIXME rename to Topology
taskConfigBuilder.setTaskModule(vertex.getVertexName());
PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
Map<String,LocalResource> lrs = vertex.getTaskLocalResources();
- for(String key : lrs.keySet()){
- LocalResource lr = lrs.get(key);
+ for(Entry<String, LocalResource> entry : lrs.entrySet()){
+ String key = entry.getKey();
+ LocalResource lr = entry.getValue();
localResourcesBuilder.setName(key);
- localResourcesBuilder.setUri(DagTypeConverters.convertToDAGPlan(lr.getResource()));
+ localResourcesBuilder.setUri(
+ DagTypeConverters.convertToDAGPlan(lr.getResource()));
localResourcesBuilder.setSize(lr.getSize());
localResourcesBuilder.setTimeStamp(lr.getTimestamp());
- localResourcesBuilder.setType(DagTypeConverters.convertToDAGPlan(lr.getType()));
- localResourcesBuilder.setVisibility(DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
+ localResourcesBuilder.setType(
+ DagTypeConverters.convertToDAGPlan(lr.getType()));
+ localResourcesBuilder.setVisibility(
+ DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
if(lr.getType() == LocalResourceType.PATTERN){
- assert lr.getPattern() != null : "resourceType=PATTERN but pattern is null";
+ if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
+ throw new TezUncheckedException("LocalResource type set to pattern"
+ + " but pattern is null or empty");
+ }
localResourcesBuilder.setPattern(lr.getPattern());
}
taskConfigBuilder.addLocalResource(localResourcesBuilder);
@@ -303,10 +310,10 @@ public class DAG { // FIXME rename to Topology
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
if(hint.getDataLocalHosts() != null){
- taskLocationHintBuilder.addAllHost(Arrays.asList(hint.getDataLocalHosts()));
+ taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
}
if(hint.getRacks() != null){
- taskLocationHintBuilder.addAllRack(Arrays.asList(hint.getRacks()));
+ taskLocationHintBuilder.addAllRack(hint.getRacks());
}
vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index c58280d..ac6e610 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -19,6 +19,7 @@ package org.apache.tez.dag.api;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -127,12 +128,12 @@ public class DagTypeConverters {
List<TaskLocationHint> outputList = new ArrayList<TaskLocationHint>();
for(PlanTaskLocationHint inputHint : locationHints){
- TaskLocationHint outputHint = new TaskLocationHint();
- outputHint.setRacks(inputHint.getRackList().toArray(new String[inputHint.getRackList().size()]));
- outputHint.setDataLocalHosts(inputHint.getHostList().toArray(new String[inputHint.getHostList().size()]));
+ TaskLocationHint outputHint = new TaskLocationHint(
+ new HashSet<String>(inputHint.getHostList()),
+ new HashSet<String>(inputHint.getRackList()));
outputList.add(outputHint);
}
- return new VertexLocationHint(outputList.size(), outputList.toArray(new TaskLocationHint[outputList.size()]));
+ return new VertexLocationHint(outputList.size(), outputList);
}
// notes re HDFS URL handling:
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
index 8803db9..900822b 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/Vertex.java
@@ -76,11 +76,11 @@ public class Vertex { // FIXME rename to Task
return taskResource;
}
- public Vertex setTaskLocationsHint(TaskLocationHint[] locations) {
+ public Vertex setTaskLocationsHint(List<TaskLocationHint> locations) {
if (locations == null) {
return this;
}
- assert locations.length == parallelism;
+ assert locations.size() == parallelism;
taskLocationsHint = new VertexLocationHint(parallelism, locations);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 3941165..d92250c 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -19,44 +19,41 @@
package org.apache.tez.dag.api;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
public class VertexLocationHint {
- private int numTasks;
- private TaskLocationHint[] taskLocationHints;
-
- public VertexLocationHint() {
- this(0);
- }
-
- public VertexLocationHint(int numTasks) {
- this(numTasks, new TaskLocationHint[numTasks]);
- }
+ private final int numTasks;
+ private final List<TaskLocationHint> taskLocationHints;
public VertexLocationHint(int numTasks,
- TaskLocationHint[] taskLocationHints) {
+ List<TaskLocationHint> taskLocationHints) {
this.numTasks = numTasks;
- this.taskLocationHints = taskLocationHints;
+ if (taskLocationHints != null) {
+ this.taskLocationHints = Collections.unmodifiableList(taskLocationHints);
+ } else {
+ this.taskLocationHints = null;
+ }
}
public int getNumTasks() {
return numTasks;
}
- public TaskLocationHint[] getTaskLocationHints() {
+ public List<TaskLocationHint> getTaskLocationHints() {
return taskLocationHints;
}
- public void setTaskLocationHints(TaskLocationHint[] taskLocationHints) {
- this.taskLocationHints = taskLocationHints;
- }
-
@Override
public int hashCode() {
final int prime = 7883;
int result = 1;
result = prime * result + numTasks;
- result = prime * result + Arrays.hashCode(taskLocationHints);
+ if (taskLocationHints != null) {
+ result = prime * result + taskLocationHints.hashCode();
+ }
return result;
}
@@ -75,7 +72,11 @@ public class VertexLocationHint {
if (numTasks != other.numTasks) {
return false;
}
- if (!Arrays.equals(taskLocationHints, other.taskLocationHints)) {
+ if (taskLocationHints != null) {
+ if (!taskLocationHints.equals(other.taskLocationHints)) {
+ return false;
+ }
+ } else if (other.taskLocationHints != null) {
return false;
}
return true;
@@ -84,38 +85,41 @@ public class VertexLocationHint {
public static class TaskLocationHint {
// Host names if any to be used
- private String[] hosts;
+ private final Set<String> hosts;
// Rack names if any to be used
- private String[] racks;
-
- public TaskLocationHint() {
- this(new String[0], new String[0]);
- }
+ private final Set<String> racks;
- public TaskLocationHint(String[] hosts, String[] racks) {
- this.hosts = hosts;
- this.racks = racks;
+ public TaskLocationHint(Set<String> hosts, Set<String> racks) {
+ if (hosts != null) {
+ this.hosts = Collections.unmodifiableSet(hosts);
+ } else {
+ this.hosts = null;
+ }
+ if (racks != null) {
+ this.racks = Collections.unmodifiableSet(racks);
+ } else {
+ this.racks = null;
+ }
}
- public String[] getDataLocalHosts() {
+ public Set<String> getDataLocalHosts() {
return hosts;
}
- public void setDataLocalHosts(String[] hosts) {
- this.hosts = hosts;
- }
- public String[] getRacks() {
+
+ public Set<String> getRacks() {
return racks;
}
- public void setRacks(String[] racks) {
- this.racks = racks;
- }
@Override
public int hashCode() {
final int prime = 9397;
int result = 1;
- result = prime * result + Arrays.hashCode(hosts);
- result = prime * result + Arrays.hashCode(racks);
+ result = ( hosts != null) ?
+ prime * result + hosts.hashCode() :
+ result + prime;
+ result = ( racks != null) ?
+ prime * result + racks.hashCode() :
+ result + prime;
return result;
}
@@ -131,10 +135,18 @@ public class VertexLocationHint {
return false;
}
TaskLocationHint other = (TaskLocationHint) obj;
- if (!Arrays.equals(hosts, other.hosts)) {
+ if (hosts != null) {
+ if (!hosts.equals(other.hosts)) {
+ return false;
+ }
+ } else if (other.hosts != null) {
return false;
}
- if (!Arrays.equals(racks, other.racks)) {
+ if (racks != null) {
+ if (!racks.equals(other.racks)) {
+ return false;
+ }
+ } else if (other.racks != null) {
return false;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
index 42839df..9577320 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/Progress.java
@@ -50,11 +50,18 @@ public class Progress {
@Override
public String toString() {
- return new String("TotalTasks: " + getTotalTaskCount() +
- " Succeeded: " + getSucceededTaskCount() +
- " Running: " + getRunningTaskCount() +
- " Failed: " + getFailedTaskCount() +
- " Killed: " + getKilledTaskCount());
+ StringBuilder sb = new StringBuilder();
+ sb.append("TotalTasks: ");
+ sb.append(getTotalTaskCount());
+ sb.append(" Succeeded: ");
+ sb.append(getSucceededTaskCount());
+ sb.append(" Running: ");
+ sb.append(getRunningTaskCount());
+ sb.append(" Failed: ");
+ sb.append(getFailedTaskCount());
+ sb.append(" Killed: ");
+ sb.append(getKilledTaskCount());
+ return sb.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index b405de3..e7f9396 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
@@ -202,7 +203,13 @@ public class DAGClientRPCImpl implements DAGClient {
dagState = DAGStatusStateProto.DAG_SUCCEEDED;
break;
}
- break;
+ throw new TezUncheckedException("Encountered unknown final application"
+ + " status from YARN"
+ + ", appState=" + appReport.getYarnApplicationState()
+ + ", finalStatus=" + appReport.getFinalApplicationStatus());
+ default:
+ throw new TezUncheckedException("Encountered unknown application state"
+ + " from YARN, appState=" + appReport.getYarnApplicationState());
}
builder.setState(dagState);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
new file mode 100644
index 0000000..0d01dad
--- /dev/null
+++ b/tez-dag/findbugs-exclude.xml
@@ -0,0 +1,12 @@
+<FindBugsFilter>
+
+ <Match>
+ <Class name="org.apache.tez.dag.app.rm.node.AMNodeMap" />
+ <Or>
+ <Field name="blacklistDisablePercent" />
+ <Field name="maxTaskFailuresPerNode" />
+ </Or>
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 4412436..a29cb5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -704,13 +704,17 @@ public class DAGAppMaster extends AbstractService {
break;
case KILLED:
state = DAGAppMasterState.KILLED;
+ break;
case ERROR:
state = DAGAppMasterState.ERROR;
+ break;
default:
state = DAGAppMasterState.ERROR;
+ break;
}
}
- LOG.info("On DAG completion. Old state: " + oldState + " new state: " + state);
+ LOG.info("On DAG completion. Old state: "
+ + oldState + " new state: " + state);
}
public class DAGClientHandler {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 422b02b..b02bc37 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -952,9 +952,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
protected void setup(DAGImpl job) throws IOException {
job.initTime = job.clock.getTime();
- String dagIdString = job.dagId.toString();
-
- dagIdString.replace("application", "job");
+ String dagIdString = job.dagId.toString().replace("application", "job");
// TODO remove - TEZ-71
String user =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 7019516..e85be2a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -882,13 +882,15 @@ public class TaskAttemptImpl implements TaskAttempt,
Set<String> racks = new HashSet<String>();
if (ta.locationHint != null) {
if (ta.locationHint.getRacks() != null) {
- racks.addAll(Arrays.asList(ta.locationHint.getRacks()));
+ racks.addAll(ta.locationHint.getRacks());
}
if (ta.locationHint.getDataLocalHosts() != null) {
for (String host : ta.locationHint.getDataLocalHosts()) {
racks.add(RackResolver.resolve(host).getNetworkLocation());
}
- hostArray = ta.resolveHosts(ta.locationHint.getDataLocalHosts());
+ hostArray = ta.resolveHosts(
+ ta.locationHint.getDataLocalHosts().toArray(
+ new String[ta.locationHint.getDataLocalHosts().size()]));
}
}
rackArray = racks.toArray(new String[racks.size()]);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 3a7818b..9785de7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -964,14 +964,14 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
boolean useNullLocationHint = true;
if (vertex.vertexLocationHint != null
&& vertex.vertexLocationHint.getTaskLocationHints() != null
- && vertex.vertexLocationHint.getTaskLocationHints().length ==
+ && vertex.vertexLocationHint.getTaskLocationHints().size() ==
vertex.numTasks) {
useNullLocationHint = false;
}
for (int i=0; i < vertex.numTasks; ++i) {
TaskLocationHint locHint = null;
if (!useNullLocationHint) {
- locHint = vertex.vertexLocationHint.getTaskLocationHints()[i];
+ locHint = vertex.vertexLocationHint.getTaskLocationHints().get(i);
}
TaskImpl task =
new TaskImpl(vertex.getVertexId(), i,
@@ -1333,6 +1333,27 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
}
@Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Vertex other = (Vertex) obj;
+ return this.vertexId.equals(other.getVertexId());
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 11239;
+ return prime + prime * this.vertexId.hashCode();
+ }
+
+ @Override
public Map<Vertex, EdgeProperty> getInputVertices() {
return Collections.unmodifiableMap(this.sourceVertices);
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 50f93a9..72d2cc8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -30,9 +30,12 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -121,10 +124,10 @@ public class TestTaskAttempt {
new TaskAttemptImpl.ScheduleTaskattemptTransition();
EventHandler eventHandler = mock(EventHandler.class);
- String[] hosts = new String[3];
- hosts[0] = "host1";
- hosts[1] = "host2";
- hosts[2] = "host3";
+ Set<String> hosts = new HashSet<String>(3);
+ hosts.add("host1");
+ hosts.add("host2");
+ hosts.add("host3");
TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
TezTaskID taskID = new TezTaskID(
@@ -168,8 +171,10 @@ public class TestTaskAttempt {
EventHandler eventHandler = mock(EventHandler.class);
String hosts[] = new String[] { "192.168.1.1", "host2", "host3" };
- String resolved[] = new String[] { "host1", "host2", "host3" };
- TaskLocationHint locationHint = new TaskLocationHint(hosts, null);
+ Set<String> resolved = new HashSet<String>(
+ Arrays.asList(new String[]{ "host1", "host2", "host3" }));
+ TaskLocationHint locationHint = new TaskLocationHint(
+ new HashSet<String>(Arrays.asList(hosts)), null);
TezTaskID taskID = new TezTaskID(
new TezVertexID(new TezDAGID("1", 1, 1), 1), 1);
@@ -181,7 +186,8 @@ public class TestTaskAttempt {
new HashMap<String, LocalResource>(), new HashMap<String, String>(),
"", false);
TaskAttemptImpl spyTa = spy(taImpl);
- when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
+ when(spyTa.resolveHosts(hosts)).thenReturn(
+ resolved.toArray(new String[3]));
TaskAttemptEventSchedule mockTAEvent = mock(TaskAttemptEventSchedule.class);
@@ -314,7 +320,7 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
@@ -368,7 +374,7 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
@@ -463,7 +469,7 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
@@ -528,7 +534,7 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
@@ -595,7 +601,7 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
@@ -688,9 +694,10 @@ public class TestTaskAttempt {
tezConf.setBoolean("fs.file.impl.disable.cache", true);
TaskLocationHint locationHint = new TaskLocationHint(
- new String[] { "127.0.0.1" }, null);
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
Resource resource = Resource.newInstance(1024, 1);
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
Map<String, String> environment = new HashMap<String, String>();
String javaOpts = "";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 54a62c8..8a48c6c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -102,7 +102,7 @@ public class TestTaskImpl {
jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
credentials = null;
clock = new SystemClock();
- locationHint = new TaskLocationHint(new String[1], new String[1]);
+ locationHint = new TaskLocationHint(null, null);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
dagId = new TezDAGID(appId, 1);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-engine-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/findbugs-exclude.xml b/tez-engine-api/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-engine-api/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-engine/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-engine/findbugs-exclude.xml b/tez-engine/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-engine/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce-examples/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/findbugs-exclude.xml b/tez-mapreduce-examples/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-mapreduce-examples/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 71cab97..43a447d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -94,27 +94,28 @@ import com.google.common.annotations.VisibleForTesting;
*/
public class MRRSleepJob extends Configured implements Tool {
- public static Log LOG = LogFactory.getLog(MRRSleepJob.class);
+ private static final Log LOG = LogFactory.getLog(MRRSleepJob.class);
- public static String MAP_SLEEP_COUNT = "mrr.sleepjob.map.sleep.count";
- public static String REDUCE_SLEEP_COUNT =
+ public static final String MAP_SLEEP_COUNT = "mrr.sleepjob.map.sleep.count";
+ public static final String REDUCE_SLEEP_COUNT =
"mrr.sleepjob.reduce.sleep.count";
- public static String MAP_SLEEP_TIME = "mrr.sleepjob.map.sleep.time";
- public static String REDUCE_SLEEP_TIME =
+ public static final String MAP_SLEEP_TIME = "mrr.sleepjob.map.sleep.time";
+ public static final String REDUCE_SLEEP_TIME =
"mrr.sleepjob.reduce.sleep.time";
- public static String IREDUCE_SLEEP_COUNT =
+ public static final String IREDUCE_SLEEP_COUNT =
"mrr.sleepjob.ireduce.sleep.count";
- public static String IREDUCE_SLEEP_TIME =
+ public static final String IREDUCE_SLEEP_TIME =
"mrr.sleepjob.ireduce.sleep.time";
- public static String IREDUCE_STAGES_COUNT =
+ public static final String IREDUCE_STAGES_COUNT =
"mrr.sleepjob.ireduces.stages.count";
- public static String IREDUCE_TASKS_COUNT =
+ public static final String IREDUCE_TASKS_COUNT =
"mrr.sleepjob.ireduces.tasks.count";
// Flags to inject failures
- public static String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
- public static String MAP_FATAL_ERROR = "mrr.sleepjob.map.fatal.error";
- public static String MAP_ERROR_TASK_IDS = "mrr.sleepjob.map.error.task.ids";
+ public static final String MAP_THROW_ERROR = "mrr.sleepjob.map.throw.error";
+ public static final String MAP_FATAL_ERROR = "mrr.sleepjob.map.fatal.error";
+ public static final String MAP_ERROR_TASK_IDS =
+ "mrr.sleepjob.map.error.task.ids";
public static class MRRSleepJobPartitioner extends
Partitioner<IntWritable, IntWritable> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/findbugs-exclude.xml b/tez-mapreduce/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-mapreduce/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
index 44013c6..3eef766 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -18,6 +18,8 @@
package org.apache.tez.mapreduce.hadoop;
+import java.util.List;
+
import org.apache.hadoop.fs.Path;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -39,12 +41,12 @@ public class InputSplitInfo {
/// Meta info file for all the splits information
private final Path splitsMetaInfoFile;
/// Location hints to determine where to run the tasks
- private final TaskLocationHint[] taskLocationHints;
+ private final List<TaskLocationHint> taskLocationHints;
/// The num of tasks - same as number of splits generated.
private final int numTasks;
public InputSplitInfo(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
- TaskLocationHint[] taskLocationHints) {
+ List<TaskLocationHint> taskLocationHints) {
this.splitsFile = splitsFile;
this.splitsMetaInfoFile = splitsMetaInfoFile;
this.taskLocationHints = taskLocationHints;
@@ -54,7 +56,7 @@ public class InputSplitInfo {
/**
* Get the TaskLocationHints for each task
*/
- public TaskLocationHint[] getTaskLocationHints() {
+ public List<TaskLocationHint> getTaskLocationHints() {
return taskLocationHints;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index ad63f7d..7d6d0b5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -21,8 +21,10 @@ package org.apache.tez.mapreduce.hadoop;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Vector;
@@ -151,10 +153,12 @@ public class MRHelpers {
JobSplitWriter.createSplitFiles(inputSplitDir, conf,
inputSplitDir.getFileSystem(conf), splits);
- TaskLocationHint[] locationHints =
- new TaskLocationHint[splits.length];
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splits.length);
for (int i = 0; i < splits.length; ++i) {
- locationHints[i] = new TaskLocationHint(splits[i].getLocations(), null);
+ locationHints.add(
+ new TaskLocationHint(new HashSet<String>(
+ Arrays.asList(splits[i].getLocations())), null));
}
return new InputSplitInfo(
@@ -184,10 +188,12 @@ public class MRHelpers {
JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
inputSplitDir.getFileSystem(jobConf), splits);
- TaskLocationHint[] locationHints =
- new TaskLocationHint[splits.length];
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splits.length);
for (int i = 0; i < splits.length; ++i) {
- locationHints[i] = new TaskLocationHint(splits[i].getLocations(), null);
+ locationHints.add(
+ new TaskLocationHint(new HashSet<String>(
+ Arrays.asList(splits[i].getLocations())), null));
}
return new InputSplitInfo(
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
index d20a36c..cd3ef88 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
@@ -23,7 +23,11 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -104,21 +108,21 @@ public class TestMRHelpers {
}
private void verifyLocationHints(Path inputSplitsDir,
- TaskLocationHint[] actual) throws Exception {
+ List<TaskLocationHint> actual) throws Exception {
JobID jobId = new JobID("dummy", 1);
TaskSplitMetaInfo[] splitsInfo =
SplitMetaInfoReader.readSplitMetaInfo(jobId , remoteFs,
conf, inputSplitsDir);
int splitsCount = splitsInfo.length;
- TaskLocationHint[] locationHints =
- new TaskLocationHint[splitsCount];
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splitsCount);
for (int i = 0; i < splitsCount; ++i) {
- TaskLocationHint locationHint =
- new TaskLocationHint(splitsInfo[i].getLocations(), null);
- locationHints[i] = locationHint;
+ locationHints.add(
+ new TaskLocationHint(new HashSet<String>(
+ Arrays.asList(splitsInfo[i].getLocations())), null));
}
- Assert.assertArrayEquals(locationHints, actual);
+ Assert.assertEquals(locationHints, actual);
}
private InputSplitInfo generateNewSplits(Path inputSplitsDir)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-yarn-client/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-yarn-client/findbugs-exclude.xml b/tez-yarn-client/findbugs-exclude.xml
new file mode 100644
index 0000000..be58a39
--- /dev/null
+++ b/tez-yarn-client/findbugs-exclude.xml
@@ -0,0 +1,3 @@
+<FindBugsFilter>
+
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
index 4919a49..1b264c0 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/DAGJobStatus.java
@@ -167,7 +167,7 @@ public class DAGJobStatus extends JobStatus {
@Override
public synchronized float getReduceProgress() {
- if(dagStatus != null) {
+ if(dagStatus.getVertexProgress() != null) {
return getProgress(MultiStageMRConfigUtil.getFinalReduceVertexName());
}
if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e1f64bf5/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 736563b..75a79c3 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -19,11 +19,15 @@
package org.apache.tez.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -319,19 +323,21 @@ public class YARNRunner implements ClientProtocol {
// FIXME isn't this a nice mess of a client?
// read input, write splits, read splits again
- private TaskLocationHint[] getMapLocationHintsFromInputSplits(JobID jobId,
+ private List<TaskLocationHint> getMapLocationHintsFromInputSplits(JobID jobId,
FileSystem fs, Configuration conf,
String jobSubmitDir) throws IOException {
TaskSplitMetaInfo[] splitsInfo =
SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf,
new Path(jobSubmitDir));
int splitsCount = splitsInfo.length;
- TaskLocationHint[] locationHints =
- new TaskLocationHint[splitsCount];
+ List<TaskLocationHint> locationHints =
+ new ArrayList<TaskLocationHint>(splitsCount);
for (int i = 0; i < splitsCount; ++i) {
TaskLocationHint locationHint =
- new TaskLocationHint(splitsInfo[i].getLocations(), null);
- locationHints[i] = locationHint;
+ new TaskLocationHint(
+ new HashSet<String>(
+ Arrays.asList(splitsInfo[i].getLocations())), null);
+ locationHints.add(locationHint);
}
return locationHints;
}
@@ -368,7 +374,7 @@ public class YARNRunner implements ClientProtocol {
private Vertex createVertexForStage(Configuration stageConf,
Map<String, LocalResource> jobLocalResources,
- TaskLocationHint[] locations, int stageNum, int totalStages)
+ List<TaskLocationHint> locations, int stageNum, int totalStages)
throws IOException {
// stageNum starts from 0, goes till numStages - 1
boolean isMap = false;
@@ -438,9 +444,10 @@ public class YARNRunner implements ClientProtocol {
LOG.info("Number of stages: " + stageConfs.length);
- TaskLocationHint[] mapInputLocations = getMapLocationHintsFromInputSplits(
- jobId, fs, stageConfs[0], jobSubmitDir);
- TaskLocationHint[] reduceInputLocations = null;
+ List<TaskLocationHint> mapInputLocations =
+ getMapLocationHintsFromInputSplits(
+ jobId, fs, stageConfs[0], jobSubmitDir);
+ List<TaskLocationHint> reduceInputLocations = null;
Vertex[] vertices = new Vertex[stageConfs.length];
for (int i = 0; i < stageConfs.length; i++) {