You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/26 22:50:31 UTC
svn commit: r1476398 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/test/java/org/apache/hama/bsp/
Author: edwardyoon
Date: Fri Apr 26 20:50:30 2013
New Revision: 1476398
URL: http://svn.apache.org/r1476398
Log:
Fix bug of comma separated input paths
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Apr 26 20:50:30 2013
@@ -8,6 +8,7 @@ Release 0.7 (unreleased changes)
BUG FIXES
+ HAMA-750: Fix bug of comma separated input paths (edwardyoon)
IMPROVEMENTS
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Fri Apr 26 20:50:30 2013
@@ -235,6 +235,7 @@ public class BSPJob extends BSPJobContex
if (state == JobState.DEFINE) {
submit();
}
+
if (verbose) {
jobClient.monitorAndPrintJob(this, info);
} else {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri Apr 26 20:50:30 2013
@@ -325,10 +325,21 @@ public class BSPJobClient extends Config
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
- job = partition(job, maxTasks);
+ InputSplit[] splits = job.getInputFormat().getSplits(
+ job,
+ (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+ : maxTasks);
+
+ job = partition(job, splits, maxTasks);
maxTasks = job.getInt("hama.partition.count", maxTasks);
- job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
+ if (job.getBoolean("input.has.partitioned", false)) {
+ splits = job.getInputFormat().getSplits(
+ job,
+ (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+ : maxTasks);
+ }
+ job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
job.set("bsp.job.split.file", submitSplitFile.toString());
}
@@ -369,15 +380,11 @@ public class BSPJobClient extends Config
return launchJob(jobId, job, submitJobFile, fs);
}
- protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+ protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
+ throws IOException {
String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
- Path inputDir = new Path(inputPath);
- if (fs.isFile(inputDir)) {
- inputDir = inputDir.getParent();
- }
-
- Path partitionDir = new Path(inputDir + "/partitions");
+ Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
if (fs.exists(partitionDir)) {
fs.delete(partitionDir, true);
}
@@ -386,11 +393,6 @@ public class BSPJobClient extends Config
return job;
}// Early exit for the partitioner job.
- InputSplit[] splits = job.getInputFormat().getSplits(
- job,
- (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
- : maxTasks);
-
if (inputPath != null) {
int numSplits = splits.length;
int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
@@ -427,8 +429,7 @@ public class BSPJobClient extends Config
job.get(Constants.RUNTIME_PARTITIONING_CLASS));
}
BSPJob partitioningJob = new BSPJob(conf);
- partitioningJob.setInputPath(new Path(job.getConfiguration().get(
- Constants.JOB_INPUT_DIR)));
+ LOG.debug("partitioningJob input: " + partitioningJob.get(Constants.JOB_INPUT_DIR));
partitioningJob.setInputFormat(job.getInputFormat().getClass());
partitioningJob.setInputKeyClass(job.getInputKeyClass());
partitioningJob.setInputValueClass(job.getInputValueClass());
@@ -439,6 +440,7 @@ public class BSPJobClient extends Config
partitioningJob.set("bsp.partitioning.runner.job", "true");
partitioningJob.getConfiguration().setBoolean(
Constants.ENABLE_RUNTIME_PARTITIONING, false);
+ partitioningJob.setOutputPath(partitionDir);
boolean isPartitioned = false;
try {
@@ -448,13 +450,15 @@ public class BSPJobClient extends Config
} catch (ClassNotFoundException e) {
LOG.error("Class not found error partitioning run-time.", e);
}
+
if (isPartitioned) {
if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
job.setInputPath(new Path(conf
.get(Constants.RUNTIME_PARTITIONING_DIR)));
} else {
- job.setInputPath(new Path(inputDir + "/partitions"));
+ job.setInputPath(partitionDir);
}
+ job.setBoolean("input.has.partitioned", true);
job.setInputFormat(SequenceFileInputFormat.class);
} else {
LOG.error("Error partitioning the input path.");
@@ -543,13 +547,8 @@ public class BSPJobClient extends Config
return codecClass;
}
- private static int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
- throws IOException {
- InputSplit[] splits = job.getInputFormat().getSplits(
- job,
- (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
- : maxTasks);
-
+ private static int writeSplits(BSPJob job, InputSplit[] splits,
+ Path submitSplitFile, int maxTasks) throws IOException {
final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(),
submitSplitFile, splits.length);
try {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Fri Apr 26 20:50:30 2013
@@ -135,7 +135,14 @@ public abstract class FileInputFormat<K,
for (Path p : dirs) {
FileSystem fs = p.getFileSystem(job.getConfiguration());
- FileStatus[] matches = fs.globStatus(p, inputFilter);
+
+ FileStatus[] matches = null;
+ try {
+ matches = fs.globStatus(p, inputFilter);
+ } catch (Exception e) {
+ LOG.info(p + "\n" + e.toString());
+ }
+
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
@@ -167,7 +174,7 @@ public abstract class FileInputFormat<K,
public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
-
+
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
FileStatus[] files = listStatus(job);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Fri Apr 26 20:50:30 2013
@@ -57,18 +57,13 @@ public class PartitioningRunner extends
this.fs = FileSystem.get(conf);
- Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR));
- if (fs.isFile(inputDir)) {
- inputDir = inputDir.getParent();
- }
-
converter = ReflectionUtils.newInstance(conf.getClass(
Constants.RUNTIME_PARTITION_RECORDCONVERTER,
DefaultRecordConverter.class, RecordConverter.class), conf);
converter.setup(conf);
if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
- this.partitionDir = new Path(inputDir + "/partitions");
+ this.partitionDir = new Path(conf.get("bsp.output.dir"));
} else {
this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java Fri Apr 26 20:50:30 2013
@@ -17,8 +17,13 @@
*/
package org.apache.hama.bsp;
+import java.io.IOException;
+
import junit.framework.TestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+
public class TestFileInputFormat extends TestCase {
public void testComputeGoalSize() throws Exception {
@@ -28,4 +33,24 @@ public class TestFileInputFormat extends
&& 1200 > input.computeGoalSize(10, 10000));
}
+
+ public void testSetInputPaths() throws IOException {
+ HamaConfiguration conf = new HamaConfiguration();
+ BSPJob job = new BSPJob(conf);
+
+ String[] files = new String[2];
+ files[0] = "hdfs://hadoop.uta.edu/user/hadoop/employee.txt";
+ files[1] = "hdfs://hadoop.uta.edu/user/hadoop/department.txt";
+
+ FileInputFormat.setInputPaths(job, files[0] + "," + files[1]);
+ Path[] paths = FileInputFormat.getInputPaths(job);
+
+ System.out.println(job.getConfiguration().get("bsp.input.dir"));
+ assertEquals(2, FileInputFormat.getInputPaths(job).length);
+
+ for (int i = 0; i < paths.length; i++) {
+ System.out.println(paths[i]);
+ assertEquals(paths[i].toString(), files[i]);
+ }
+ }
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Fri Apr 26 20:50:30 2013
@@ -19,8 +19,6 @@ package org.apache.hama.bsp;
import java.io.IOException;
-import junit.framework.TestCase;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -30,19 +28,54 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.util.KeyValuePair;
-public class TestPartitioning extends TestCase {
+public class TestPartitioning extends HamaCluster {
public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
+ public static String TMP_OUTPUT = "/tmp/test-example/";
+ public static final String TMP_OUTPUT_PATH = "/tmp/test-example/output.txt";
+ public static Path OUTPUT_PATH = new Path(TMP_OUTPUT);
+
+ protected HamaConfiguration configuration;
+
+ // these variables are preventing from rebooting the whole stuff again since
+ // setup and teardown are called per method.
+
+ public TestPartitioning() {
+ configuration = new HamaConfiguration();
+ configuration.set("bsp.master.address", "localhost");
+ configuration.set("hama.child.redirect.log.console", "true");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.set("bsp.local.dir", "/tmp/hama-test");
+ configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+ configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ configuration.set("hama.sync.client.class",
+ org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+ .getCanonicalName());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
public void testPartitioner() throws Exception {
Configuration conf = new Configuration();
conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
- conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest");
conf.setBoolean("bsp.input.runtime.partitioning", true);
BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
bsp.setJobName("Test partitioning with input");
@@ -51,12 +84,12 @@ public class TestPartitioning extends Te
conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
bsp.setInputFormat(TextInputFormat.class);
bsp.setOutputFormat(NullOutputFormat.class);
- bsp.setInputPath(new Path("../CHANGES.txt"));
+ FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
bsp.setPartitioner(HashPartitioner.class);
assertTrue(bsp.waitForCompletion(true));
FileSystem fs = FileSystem.get(conf);
- fs.delete(new Path("/tmp/hama-test/partitioning/localtest"), true);
+ fs.delete(OUTPUT_PATH, true);
}
public static class PartionedBSP extends