You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/11/19 18:35:57 UTC
svn commit: r1411326 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
graph/src/main/java/org/apache/hama/graph/
ml/src/test/java/org/apache/hama/ml/kmeans/
yarn/src/main/java/org/apache/hama/bsp/
Author: tjungblut
Date: Mon Nov 19 17:35:55 2012
New Revision: 1411326
URL: http://svn.apache.org/viewvc?rev=1411326&view=rev
Log:
[HAMA-678]: fix input splitting
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Nov 19 17:35:55 2012
@@ -10,6 +10,7 @@ Release 0.6 (unreleased changes)
BUG FIXES
+ HAMA-678: Fix input splitting (tjungblut)
HAMA-613: Scheduler kills job too silently when out of slots (Yuesheng Hu via edwardyoon)
HAMA-647: Make the input spliter robustly (Yuesheng Hu via edwardyoon)
HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSP.java Mon Nov 19 17:35:55 2012
@@ -31,12 +31,14 @@ public abstract class BSP<K1, V1, K2, V2
/**
* {@inheritDoc}
*/
+ @Override
public abstract void bsp(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
SyncException, InterruptedException;
/**
* {@inheritDoc}
*/
+ @Override
public void setup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException,
SyncException, InterruptedException {
@@ -45,6 +47,7 @@ public abstract class BSP<K1, V1, K2, V2
/**
* {@inheritDoc}
*/
+ @Override
public void cleanup(BSPPeer<K1, V1, K2, V2, M> peer) throws IOException {
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon Nov 19 17:35:55 2012
@@ -301,20 +301,8 @@ public class BSPJobClient extends Config
throws IOException {
BSPJob job = pJob;
job.setJobID(jobId);
- int maxTasks = 0;
- int limitTasks = job.getConf().getInt(Constants.MAX_TASKS_PER_JOB, 0);
-
- ClusterStatus clusterStatus = getClusterStatus(true);
-
- if(limitTasks > 0) {
- maxTasks = limitTasks;
- } else {
- maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
- }
-
- if (maxTasks < job.getNumBspTask()) {
- throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed.");
- }
+ int limitTasks = job.getConfiguration().getInt(Constants.MAX_TASKS_PER_JOB, 0);
+ int maxTasks = checkTaskLimits(job, limitTasks);
Path submitJobDir = new Path(getSystemDir(), "submit_"
+ Integer.toString(Math.abs(r.nextInt()), 36));
@@ -337,8 +325,8 @@ public class BSPJobClient extends Config
if (job.get("bsp.input.dir") != null) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- if (job.getConf().get("bsp.input.partitioner.class") != null
- && !job.getConf()
+ if (job.getConfiguration().get("bsp.input.partitioner.class") != null
+ && !job.getConfiguration()
.getBoolean("hama.graph.runtime.partitioning", false)) {
job = partition(job, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
@@ -384,6 +372,22 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
+ protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+ int maxTasks;
+ ClusterStatus clusterStatus = getClusterStatus(true);
+
+ if(limitTasks > 0) {
+ maxTasks = limitTasks;
+ } else {
+ maxTasks = clusterStatus.getMaxTasks() - clusterStatus.getTasks();
+ }
+
+ if (maxTasks < job.getNumBspTask()) {
+ throw new IOException("Job failed! The number of tasks has exceeded the maximum allowed.");
+ }
+ return maxTasks;
+ }
+
protected RunningJob launchJob(BSPJobID jobId, BSPJob job,
Path submitJobFile, FileSystem fs) throws IOException {
//
@@ -405,7 +409,7 @@ public class BSPJobClient extends Config
(isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
: maxTasks);
- String input = job.getConf().get("bsp.input.dir");
+ String input = job.getConfiguration().get("bsp.input.dir");
if (input != null) {
InputFormat<?, ?> inputFormat = job.getInputFormat();
@@ -438,18 +442,18 @@ public class BSPJobClient extends Config
CompressionCodec codec = null;
if (outputCompressorClass != null) {
codec = ReflectionUtils.newInstance(outputCompressorClass,
- job.getConf());
+ job.getConfiguration());
}
try {
for (int i = 0; i < splits.length; i++) {
Path p = new Path(partitionedPath, getPartitionName(i));
if (codec == null) {
- writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+ writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
sampleReader.createKey().getClass(), sampleReader.createValue()
.getClass(), CompressionType.NONE));
} else {
- writers.add(SequenceFile.createWriter(fs, job.getConf(), p,
+ writers.add(SequenceFile.createWriter(fs, job.getConfiguration(), p,
sampleReader.createKey().getClass(), sampleReader.createValue()
.getClass(), compressionType, codec));
}
@@ -516,7 +520,7 @@ public class BSPJobClient extends Config
static Class<? extends CompressionCodec> getOutputCompressorClass(BSPJob job,
Class<? extends CompressionCodec> defaultValue) {
Class<? extends CompressionCodec> codecClass = defaultValue;
- Configuration conf = job.getConf();
+ Configuration conf = job.getConfiguration();
String name = conf.get("bsp.partitioning.compression.codec");
if (name != null) {
try {
@@ -537,7 +541,7 @@ public class BSPJobClient extends Config
(isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
: maxTasks);
- final DataOutputStream out = writeSplitsFileHeader(job.getConf(),
+ final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(),
submitSplitFile, splits.length);
try {
DataOutputBuffer buffer = new DataOutputBuffer();
@@ -706,7 +710,7 @@ public class BSPJobClient extends Config
public static void runJob(BSPJob job) throws FileNotFoundException,
IOException {
- BSPJobClient jc = new BSPJobClient(job.getConf());
+ BSPJobClient jc = new BSPJobClient(job.getConfiguration());
if (job.getNumBspTask() == 0
|| job.getNumBspTask() > jc.getClusterStatus(false).getMaxTasks()) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobContext.java Mon Nov 19 17:35:55 2012
@@ -102,10 +102,18 @@ public class BSPJobContext {
conf.writeXml(out);
}
+ /**
+ * @deprecated use {@link #getConfiguration()} instead.
+ */
+ @Deprecated
public Configuration getConf() {
return this.conf;
}
+ public Configuration getConfiguration() {
+ return this.conf;
+ }
+
public String get(String name) {
return conf.get(name);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPTask.java Mon Nov 19 17:35:55 2012
@@ -114,7 +114,7 @@ public final class BSPTask extends Task
private void startPingingGroom(BSPJob job, BSPPeerProtocol umbilical) {
- long pingPeriod = job.getConf().getLong(Constants.GROOM_PING_PERIOD,
+ long pingPeriod = job.getConfiguration().getLong(Constants.GROOM_PING_PERIOD,
Constants.DEFAULT_GROOM_PING_PERIOD) / 2;
try {
@@ -156,8 +156,8 @@ public final class BSPTask extends Task
throws Exception {
BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M> bsp = (BSP<KEYIN, VALUEIN, KEYOUT, VALUEOUT, M>) ReflectionUtils
- .newInstance(job.getConf().getClass("bsp.work.class", BSP.class),
- job.getConf());
+ .newInstance(job.getConfiguration().getClass("bsp.work.class", BSP.class),
+ job.getConfiguration());
// The policy is to throw the first exception and log the remaining.
Exception firstException = null;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -140,7 +140,7 @@ public abstract class CombineFileInputFo
public InputSplit[] getSplits(BSPJob bspJob, int numSplits)
throws IOException {
- Configuration job = bspJob.getConf();
+ Configuration job = bspJob.getConfiguration();
long minSizeNode = 0;
long minSizeRack = 0;
@@ -446,7 +446,7 @@ public abstract class CombineFileInputFo
this.fileSize = 0;
// get block locations from file system
- FileSystem fs = path.getFileSystem(job.getConf());
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
FileStatus stat = fs.getFileStatus(path);
BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
stat.getLen());
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/CombineFileSplit.java Mon Nov 19 17:35:55 2012
@@ -86,7 +86,7 @@ public class CombineFileSplit implements
}
public Configuration getJob() {
- return job.getConf();
+ return job.getConfiguration();
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.InvalidInputException;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
@@ -47,7 +48,6 @@ public abstract class FileInputFormat<K,
private static final double SPLIT_SLOP = 1.1; // 10% slop
- private long minSplitSize = 1;
private static final PathFilter hiddenFileFilter = new PathFilter() {
@Override
public boolean accept(Path p) {
@@ -56,10 +56,6 @@ public abstract class FileInputFormat<K,
}
};
- protected void setMinSplitSize(long minSplitSize) {
- this.minSplitSize = minSplitSize;
- }
-
/**
* Proxy PathFilter that accepts a path only if all filters given in the
* constructor do. Used by the listPaths() to apply the built-in
@@ -83,15 +79,6 @@ public abstract class FileInputFormat<K,
}
}
- /**
- * @param fs the file system that the file is on
- * @param filename the file name to check
- * @return is this file splitable?
- */
- protected boolean isSplitable(FileSystem fs, Path filename) {
- return true;
- }
-
@Override
public abstract RecordReader<K, V> getRecordReader(InputSplit split,
BSPJob job) throws IOException;
@@ -103,7 +90,7 @@ public abstract class FileInputFormat<K,
*/
public static void setInputPathFilter(BSPJob conf,
Class<? extends PathFilter> filter) {
- conf.getConf().setClass("bsp.input.pathFilter.class", filter,
+ conf.getConfiguration().setClass("bsp.input.pathFilter.class", filter,
PathFilter.class);
}
@@ -113,10 +100,10 @@ public abstract class FileInputFormat<K,
* @return the PathFilter instance set for the job, NULL if none has been set.
*/
public static PathFilter getInputPathFilter(BSPJob conf) {
- Class<? extends PathFilter> filterClass = conf.getConf().getClass(
+ Class<? extends PathFilter> filterClass = conf.getConfiguration().getClass(
"bsp.input.pathFilter.class", null, PathFilter.class);
return (filterClass != null) ? ReflectionUtils.newInstance(filterClass,
- conf.getConf()) : null;
+ conf.getConfiguration()) : null;
}
/**
@@ -147,7 +134,7 @@ public abstract class FileInputFormat<K,
PathFilter inputFilter = new MultiPathFilter(filters);
for (Path p : dirs) {
- FileSystem fs = p.getFileSystem(job.getConf());
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
@@ -175,73 +162,109 @@ public abstract class FileInputFormat<K,
}
/**
- * Splits files returned by {@link #listStatus(BSPJob)} when they're too big.
+ * Splits files returned by {@link #listStatus(BSPJob)} when they're too big. <br/>
+ * numSplits will be ignored by the framework.
*/
@Override
public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
- FileStatus[] files = listStatus(job);
-
- long totalSize = computeTotalSize(job, files);
- long goalSize = computeGoalSize(numSplits, totalSize);
-
- ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
+ long maxSize = getMaxSplitSize(job);
- // take the short circuit path if we have already partitioned
- if (numSplits == files.length) {
- for (FileStatus file : files) {
- if (file != null) {
- splits.add(new FileSplit(file.getPath(), 0, file.getLen(),
- new String[0]));
+ // generate splits
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ FileStatus[] files = listStatus(job);
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ FileSystem fs = path.getFileSystem(job.getConfiguration());
+ long length = file.getLen();
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ if ((length != 0) && isSplitable(job, path)) {
+ long blockSize = file.getBlockSize();
+ long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+ long bytesRemaining = length;
+ while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ bytesRemaining -= splitSize;
+ }
+
+ if (bytesRemaining != 0) {
+ splits
+ .add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
+ blkLocations[blkLocations.length - 1].getHosts()));
}
+ } else if (length != 0) {
+ splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+ } else {
+ // Create empty hosts array for zero length files
+ splits.add(new FileSplit(path, 0, length, new String[0]));
}
- return splits.toArray(new FileSplit[splits.size()]);
}
- LOG.info("numSplits: " + numSplits);
- long minSize = Math.max(job.getConf().getLong("bsp.min.split.size", 1),
- minSplitSize);
+ // Save the number of input files in the job-conf
+ job.getConfiguration().setLong("bsp.input.files", files.length);
- // generate splits
- NetworkTopology clusterMap = new NetworkTopology();
- for (FileStatus file : files) {
- if (file != null) {
- Path path = file.getPath();
- FileSystem fs = path.getFileSystem(job.getConf());
- long length = file.getLen();
- BlockLocation[] blkLocations = fs
- .getFileBlockLocations(file, 0, length);
- if ((length != 0) && isSplitable(fs, path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(goalSize, minSize, blockSize);
- LOG.info("computeSplitSize: " + splitSize + " (" + goalSize + ", "
- + minSize + ", " + blockSize + ")");
-
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- String[] splitHosts = getSplitHosts(blkLocations, length
- - bytesRemaining, splitSize, clusterMap);
- splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
- splitHosts));
- bytesRemaining -= splitSize;
- }
+ LOG.debug("Total # of splits: " + splits.size());
+ return splits.toArray(new InputSplit[splits.size()]);
+ }
- if (bytesRemaining != 0) {
- splits.add(new FileSplit(path, length - bytesRemaining,
- bytesRemaining, blkLocations[blkLocations.length - 1]
- .getHosts()));
- }
- } else if (length != 0) {
- String[] splitHosts = getSplitHosts(blkLocations, 0, length,
- clusterMap);
- splits.add(new FileSplit(path, 0, length, splitHosts));
- } else {
- // Create empty hosts array for zero length files
- splits.add(new FileSplit(path, 0, length, new String[0]));
- }
- }
- }
- LOG.info("Total # of splits: " + splits.size());
- return splits.toArray(new FileSplit[splits.size()]);
+ /**
+ * @return true if the file is splittable (default), false if not.
+ */
+ protected boolean isSplitable(BSPJob job, Path path) {
+ return true;
+ }
+
+ /**
+ * Get the lower bound on split size imposed by the format.
+ *
+ * @return the number of bytes of the minimal split for this format
+ */
+ protected long getFormatMinSplitSize() {
+ return 1;
+ }
+
+ /**
+ * Set the minimum input split size
+ *
+ * @param job the job to modify
+ * @param size the minimum size
+ */
+ public static void setMinInputSplitSize(Job job, long size) {
+ job.getConfiguration().setLong("bsp.min.split.size", size);
+ }
+
+ /**
+ * Get the minimum split size
+ *
+ * @param job the job
+ * @return the minimum number of bytes that can be in a split
+ */
+ public static long getMinSplitSize(BSPJob job) {
+ return job.getConfiguration().getLong("bsp.min.split.size", 1L);
+ }
+
+ /**
+ * Set the maximum split size
+ *
+ * @param job the job to modify
+ * @param size the maximum split size
+ */
+ public static void setMaxInputSplitSize(Job job, long size) {
+ job.getConfiguration().setLong("bsp.max.split.size", size);
+ }
+
+ /**
+ * Get the maximum split size.
+ *
+ * @param context the job to look at.
+ * @return the maximum number of bytes a split can include
+ */
+ public static long getMaxSplitSize(BSPJob context) {
+ return context.getConfiguration().getLong("bsp.max.split.size",
+ Long.MAX_VALUE);
}
protected long computeTotalSize(BSPJob job, FileStatus[] files)
@@ -256,7 +279,7 @@ public abstract class FileInputFormat<K,
.equals(job.get("bsp.partitioning.dir")))) {
// if we find the partitioning dir, just remove it.
LOG.warn("Removing already existing partitioning directory " + path);
- FileSystem fileSystem = path.getFileSystem(job.getConf());
+ FileSystem fileSystem = path.getFileSystem(job.getConfiguration());
if (!fileSystem.delete(path, true)) {
LOG.error("Remove failed.");
}
@@ -403,7 +426,7 @@ public abstract class FileInputFormat<K,
* @return the list of input {@link Path}s for the BSP job.
*/
public static Path[] getInputPaths(BSPJob conf) {
- String dirs = conf.getConf().get("bsp.input.dir", "");
+ String dirs = conf.getConfiguration().get("bsp.input.dir", "");
String[] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileOutputFormat.java Mon Nov 19 17:35:55 2012
@@ -36,7 +36,7 @@ public abstract class FileOutputFormat<K
* @param compress should the output of the job be compressed?
*/
public static void setCompressOutput(BSPJob conf, boolean compress) {
- conf.getConf().setBoolean("bsp.output.compress", compress);
+ conf.getConfiguration().setBoolean("bsp.output.compress", compress);
}
/**
@@ -47,7 +47,7 @@ public abstract class FileOutputFormat<K
* <code>false</code> otherwise
*/
public static boolean getCompressOutput(BSPJob conf) {
- return conf.getConf().getBoolean("bsp.output.compress", false);
+ return conf.getConfiguration().getBoolean("bsp.output.compress", false);
}
/**
@@ -60,7 +60,7 @@ public abstract class FileOutputFormat<K
public static void setOutputCompressorClass(BSPJob conf,
Class<? extends CompressionCodec> codecClass) {
setCompressOutput(conf, true);
- conf.getConf().setClass("bsp.output.compression.codec", codecClass,
+ conf.getConfiguration().setClass("bsp.output.compression.codec", codecClass,
CompressionCodec.class);
}
@@ -79,7 +79,7 @@ public abstract class FileOutputFormat<K
String name = conf.get("bsp.output.compression.codec");
if (name != null) {
try {
- codecClass = conf.getConf().getClassByName(name)
+ codecClass = conf.getConfiguration().getClassByName(name)
.asSubclass(CompressionCodec.class);
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Compression codec " + name
@@ -98,7 +98,7 @@ public abstract class FileOutputFormat<K
throw new InvalidJobConfException("Output directory not set in JobConf.");
}
if (outDir != null) {
- FileSystem fs = outDir.getFileSystem(job.getConf());
+ FileSystem fs = outDir.getFileSystem(job.getConfiguration());
// normalize the output directory
outDir = fs.makeQualified(outDir);
setOutputPath(job, outDir);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon Nov 19 17:35:55 2012
@@ -1259,7 +1259,7 @@ public class GroomServer implements Runn
try {
// use job-specified working directory
- FileSystem.get(job.getConf()).setWorkingDirectory(
+ FileSystem.get(job.getConfiguration()).setWorkingDirectory(
job.getWorkingDirectory());
// instantiate and init our peer
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon Nov 19 17:35:55 2012
@@ -155,13 +155,13 @@ public class JobInProgress {
FileSystem fs = jobDir.getFileSystem(conf);
fs.copyToLocalFile(jobFile, localJobFile);
BSPJob job = new BSPJob(jobId, localJobFile.toString());
- this.jobSplit = job.getConf().get("bsp.job.split.file");
+ this.jobSplit = job.getConfiguration().get("bsp.job.split.file");
this.numBSPTasks = job.getNumBspTask();
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numBSPTasks + 10);
- this.maxTaskAttempts = job.getConf().getInt(Constants.MAX_TASK_ATTEMPTS,
+ this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS,
Constants.DEFAULT_MAX_TASK_ATTEMPTS);
this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Nov 19 17:35:55 2012
@@ -230,7 +230,7 @@ public class LocalBSPRunner implements J
conf.set(Constants.PEER_HOST, "local");
bsp = (BSP) ReflectionUtils.newInstance(
- job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+ job.getConfiguration().getClass("bsp.work.class", BSP.class), job.getConfiguration());
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/RecoverTaskAction.java Mon Nov 19 17:35:55 2012
@@ -50,12 +50,14 @@ public class RecoverTaskAction extends G
return superstepNumber.get();
}
+ @Override
public void write(DataOutput out) throws IOException {
task.write(out);
superstepNumber.write(out);
}
+ @Override
public void readFields(DataInput in) throws IOException {
task = new BSPTask();
task.readFields(in);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileInputFormat.java Mon Nov 19 17:35:55 2012
@@ -19,17 +19,41 @@ package org.apache.hama.bsp;
import java.io.IOException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {
- public SequenceFileInputFormat() {
- setMinSplitSize(SequenceFile.SYNC_INTERVAL);
- }
-
@Override
public RecordReader<K, V> getRecordReader(InputSplit split, BSPJob job)
throws IOException {
- return new SequenceFileRecordReader<K, V>(job.getConf(), (FileSplit) split);
+ return new SequenceFileRecordReader<K, V>(job.getConfiguration(),
+ (FileSplit) split);
+ }
+
+ @Override
+ protected long getFormatMinSplitSize() {
+ return SequenceFile.SYNC_INTERVAL;
}
+
+ @Override
+ protected FileStatus[] listStatus(BSPJob job) throws IOException {
+
+ FileStatus[] files = super.listStatus(job);
+ int len = files.length;
+ for (int i = 0; i < len; ++i) {
+ FileStatus file = files[i];
+ if (file.isDir()) { // it's a MapFile
+ Path p = file.getPath();
+ FileSystem fs = p.getFileSystem(job.getConfiguration());
+ // use the data file
+ files[i] = fs.getFileStatus(new Path(p, MapFile.DATA_FILE_NAME));
+ }
+ }
+ return files;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SequenceFileRecordWriter.java Mon Nov 19 17:35:55 2012
@@ -35,7 +35,7 @@ public class SequenceFileRecordWriter<K
public SequenceFileRecordWriter(FileSystem fs, BSPJob job, String name)
throws IOException, ClassNotFoundException {
- Configuration conf = job.getConf();
+ Configuration conf = job.getConfiguration();
writer = new SequenceFile.Writer(fs, conf, new Path(
conf.get("bsp.output.dir"), name), conf.getClassByName(conf
.get("bsp.output.key.class")), conf.getClassByName(conf
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Nov 19 17:35:55 2012
@@ -327,6 +327,7 @@ class SimpleTaskScheduler extends TaskSc
return Boolean.TRUE;
}
+ @Override
public Boolean call() {
if (jip.isRecoveryPending()) {
return scheduleRecoveryTasks();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Nov 19 17:35:55 2012
@@ -95,7 +95,7 @@ public class TaskRunner extends Thread {
@Override
public Object call() throws Exception {
- final boolean consoleRedirect = bspJob.getConf().getBoolean(
+ final boolean consoleRedirect = bspJob.getConfiguration().getBoolean(
"hama.child.redirect.log.console", false);
ProcessBuilder builder = new ProcessBuilder(commands);
builder.directory(workDir);
@@ -203,7 +203,7 @@ public class TaskRunner extends Thread {
vargs.add(jvm.toString());
// bsp.child.java.opts
- String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+ String javaOpts = jobConf.getConfiguration().get("bsp.child.java.opts", "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
String[] javaOptsSplit = javaOpts.split(" ");
@@ -309,7 +309,7 @@ public class TaskRunner extends Thread {
private void logStream(InputStream input, LogType type) {
if (type == LogType.CONSOLE) {
try {
- IOUtils.copyBytes(input, System.out, bspJob.getConf());
+ IOUtils.copyBytes(input, System.out, bspJob.getConfiguration());
} catch (IOException e) {
// gracefully ignore any occuring exceptions here
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextInputFormat.java Mon Nov 19 17:35:55 2012
@@ -19,15 +19,25 @@ package org.apache.hama.bsp;
import java.io.IOException;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
BSPJob job) throws IOException {
- return new LineRecordReader(job.getConf(), (FileSplit) split);
+ return new LineRecordReader(job.getConfiguration(), (FileSplit) split);
+ }
+
+ @Override
+ protected boolean isSplitable(BSPJob job, Path path) {
+ CompressionCodec codec = new CompressionCodecFactory(job.getConfiguration())
+ .getCodec(path);
+ return codec == null;
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/TextOutputFormat.java Mon Nov 19 17:35:55 2012
@@ -104,11 +104,11 @@ public class TextOutputFormat<K, V> exte
public RecordWriter<K, V> getRecordWriter(FileSystem ignored, BSPJob job,
String name) throws IOException {
boolean isCompressed = getCompressOutput(job);
- String keyValueSeparator = job.getConf().get(
+ String keyValueSeparator = job.getConfiguration().get(
"bsp.textoutputformat.separator", "\t");
if (!isCompressed) {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
- FileSystem fs = file.getFileSystem(job.getConf());
+ FileSystem fs = file.getFileSystem(job.getConfiguration());
FSDataOutputStream fileOut = fs.create(file);
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
@@ -116,11 +116,11 @@ public class TextOutputFormat<K, V> exte
job, GzipCodec.class);
// create the named codec
CompressionCodec codec = ReflectionUtils.newInstance(codecClass,
- job.getConf());
+ job.getConfiguration());
// build the filename including the extension
Path file = FileOutputFormat.getTaskOutputPath(job,
name + codec.getDefaultExtension());
- FileSystem fs = file.getFileSystem(job.getConf());
+ FileSystem fs = file.getFileSystem(job.getConfiguration());
FSDataOutputStream fileOut = fs.create(file);
return new LineRecordWriter<K, V>(new DataOutputStream(
codec.createOutputStream(fileOut)), keyValueSeparator);
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Nov 19 17:35:55 2012
@@ -509,7 +509,7 @@ public final class GraphJobRunner<V exte
}
}
- int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+ int steps = (int) (maxSplitSize / conf.getLong( // 20 mb
"hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
for (String peerName : peer.getAllPeerNames()) {
Modified: hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java (original)
+++ hama/trunk/ml/src/test/java/org/apache/hama/ml/kmeans/TestKMeansBSP.java Mon Nov 19 17:35:55 2012
@@ -78,8 +78,8 @@ public class TestKMeansBSP extends TestC
System.out.println(centerMap);
assertEquals(1, centerMap.size());
DoubleVector doubleVector = centerMap.get(0);
- assertTrue(doubleVector.get(0) > 50 && doubleVector.get(0) < 51);
- assertTrue(doubleVector.get(1) > 50 && doubleVector.get(1) < 51);
+ assertTrue(doubleVector.get(0) >= 50 && doubleVector.get(0) < 51);
+ assertTrue(doubleVector.get(1) >= 50 && doubleVector.get(1) < 51);
} finally {
fs.delete(new Path("/tmp/clustering"), true);
}
Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java?rev=1411326&r1=1411325&r2=1411326&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java Mon Nov 19 17:35:55 2012
@@ -190,6 +190,11 @@ public class YARNBSPJobClient extends BS
}
@Override
+ protected int checkTaskLimits(BSPJob job, int limitTasks) throws IOException {
+ return Math.max(1, limitTasks);
+ }
+
+ @Override
public Path getSystemDir() {
return new Path(getConf().get("bsp.local.dir", "/tmp/hama-yarn/"));
}