You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2013/04/26 22:50:31 UTC

svn commit: r1476398 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/

Author: edwardyoon
Date: Fri Apr 26 20:50:30 2013
New Revision: 1476398

URL: http://svn.apache.org/r1476398
Log:
Fix bug of comma separated input paths

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Apr 26 20:50:30 2013
@@ -8,6 +8,7 @@ Release 0.7 (unreleased changes)
 
   BUG FIXES
 
+   HAMA-750: Fix bug of comma separated input paths (edwardyoon)
 
   IMPROVEMENTS
    

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Fri Apr 26 20:50:30 2013
@@ -235,6 +235,7 @@ public class BSPJob extends BSPJobContex
     if (state == JobState.DEFINE) {
       submit();
     }
+
     if (verbose) {
       jobClient.monitorAndPrintJob(this, info);
     } else {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri Apr 26 20:50:30 2013
@@ -325,10 +325,21 @@ public class BSPJobClient extends Config
       // Create the splits for the job
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
 
-      job = partition(job, maxTasks);
+      InputSplit[] splits = job.getInputFormat().getSplits(
+          job,
+          (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+              : maxTasks);
+
+      job = partition(job, splits, maxTasks);
       maxTasks = job.getInt("hama.partition.count", maxTasks);
 
-      job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
+      if (job.getBoolean("input.has.partitioned", false)) {
+        splits = job.getInputFormat().getSplits(
+            job,
+            (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
+                : maxTasks);
+      }
+      job.setNumBspTask(writeSplits(job, splits, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
     }
 
@@ -369,15 +380,11 @@ public class BSPJobClient extends Config
     return launchJob(jobId, job, submitJobFile, fs);
   }
 
-  protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
+  protected BSPJob partition(BSPJob job, InputSplit[] splits, int maxTasks)
+      throws IOException {
     String inputPath = job.getConfiguration().get(Constants.JOB_INPUT_DIR);
-    Path inputDir = new Path(inputPath);
-    if (fs.isFile(inputDir)) {
-      inputDir = inputDir.getParent();
-    }
-
-    Path partitionDir = new Path(inputDir + "/partitions");
 
+    Path partitionDir = new Path("/tmp/hama-parts/" + job.getJobID() + "/");
     if (fs.exists(partitionDir)) {
       fs.delete(partitionDir, true);
     }
@@ -386,11 +393,6 @@ public class BSPJobClient extends Config
       return job;
     }// Early exit for the partitioner job.
 
-    InputSplit[] splits = job.getInputFormat().getSplits(
-        job,
-        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
-            : maxTasks);
-
     if (inputPath != null) {
       int numSplits = splits.length;
       int numTasks = job.getConfiguration().getInt("bsp.peers.num", 0);
@@ -427,8 +429,7 @@ public class BSPJobClient extends Config
               job.get(Constants.RUNTIME_PARTITIONING_CLASS));
         }
         BSPJob partitioningJob = new BSPJob(conf);
-        partitioningJob.setInputPath(new Path(job.getConfiguration().get(
-            Constants.JOB_INPUT_DIR)));
+        LOG.debug("partitioningJob input: " + partitioningJob.get(Constants.JOB_INPUT_DIR));
         partitioningJob.setInputFormat(job.getInputFormat().getClass());
         partitioningJob.setInputKeyClass(job.getInputKeyClass());
         partitioningJob.setInputValueClass(job.getInputValueClass());
@@ -439,6 +440,7 @@ public class BSPJobClient extends Config
         partitioningJob.set("bsp.partitioning.runner.job", "true");
         partitioningJob.getConfiguration().setBoolean(
             Constants.ENABLE_RUNTIME_PARTITIONING, false);
+        partitioningJob.setOutputPath(partitionDir);
 
         boolean isPartitioned = false;
         try {
@@ -448,13 +450,15 @@ public class BSPJobClient extends Config
         } catch (ClassNotFoundException e) {
           LOG.error("Class not found error partitioning run-time.", e);
         }
+
         if (isPartitioned) {
           if (job.getConfiguration().get(Constants.RUNTIME_PARTITIONING_DIR) != null) {
             job.setInputPath(new Path(conf
                 .get(Constants.RUNTIME_PARTITIONING_DIR)));
           } else {
-            job.setInputPath(new Path(inputDir + "/partitions"));
+            job.setInputPath(partitionDir);
           }
+          job.setBoolean("input.has.partitioned", true);
           job.setInputFormat(SequenceFileInputFormat.class);
         } else {
           LOG.error("Error partitioning the input path.");
@@ -543,13 +547,8 @@ public class BSPJobClient extends Config
     return codecClass;
   }
 
-  private static int writeSplits(BSPJob job, Path submitSplitFile, int maxTasks)
-      throws IOException {
-    InputSplit[] splits = job.getInputFormat().getSplits(
-        job,
-        (isProperSize(job.getNumBspTask(), maxTasks)) ? job.getNumBspTask()
-            : maxTasks);
-
+  private static int writeSplits(BSPJob job, InputSplit[] splits,
+      Path submitSplitFile, int maxTasks) throws IOException {
     final DataOutputStream out = writeSplitsFileHeader(job.getConfiguration(),
         submitSplitFile, splits.length);
     try {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/FileInputFormat.java Fri Apr 26 20:50:30 2013
@@ -135,7 +135,14 @@ public abstract class FileInputFormat<K,
 
     for (Path p : dirs) {
       FileSystem fs = p.getFileSystem(job.getConfiguration());
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
+
+      FileStatus[] matches = null;
+      try {
+        matches = fs.globStatus(p, inputFilter);
+      } catch (Exception e) {
+        LOG.info(p + "\n" + e.toString());
+      }
+
       if (matches == null) {
         errors.add(new IOException("Input path does not exist: " + p));
       } else if (matches.length == 0) {
@@ -167,7 +174,7 @@ public abstract class FileInputFormat<K,
   public InputSplit[] getSplits(BSPJob job, int numSplits) throws IOException {
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
-
+    
     // generate splits
     List<InputSplit> splits = new ArrayList<InputSplit>();
     FileStatus[] files = listStatus(job);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/PartitioningRunner.java Fri Apr 26 20:50:30 2013
@@ -57,18 +57,13 @@ public class PartitioningRunner extends
 
     this.fs = FileSystem.get(conf);
 
-    Path inputDir = new Path(conf.get(Constants.JOB_INPUT_DIR));
-    if (fs.isFile(inputDir)) {
-      inputDir = inputDir.getParent();
-    }
-
     converter = ReflectionUtils.newInstance(conf.getClass(
         Constants.RUNTIME_PARTITION_RECORDCONVERTER,
         DefaultRecordConverter.class, RecordConverter.class), conf);
     converter.setup(conf);
 
     if (conf.get(Constants.RUNTIME_PARTITIONING_DIR) == null) {
-      this.partitionDir = new Path(inputDir + "/partitions");
+      this.partitionDir = new Path(conf.get("bsp.output.dir"));
     } else {
       this.partitionDir = new Path(conf.get(Constants.RUNTIME_PARTITIONING_DIR));
     }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestFileInputFormat.java Fri Apr 26 20:50:30 2013
@@ -17,8 +17,13 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.IOException;
+
 import junit.framework.TestCase;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+
 public class TestFileInputFormat extends TestCase {
 
   public void testComputeGoalSize() throws Exception {
@@ -28,4 +33,24 @@ public class TestFileInputFormat extends
         && 1200 > input.computeGoalSize(10, 10000));
 
   }
+
+  public void testSetInputPaths() throws IOException {
+    HamaConfiguration conf = new HamaConfiguration();
+    BSPJob job = new BSPJob(conf);
+
+    String[] files = new String[2];
+    files[0] = "hdfs://hadoop.uta.edu/user/hadoop/employee.txt";
+    files[1] = "hdfs://hadoop.uta.edu/user/hadoop/department.txt";
+
+    FileInputFormat.setInputPaths(job, files[0] + "," + files[1]);
+    Path[] paths = FileInputFormat.getInputPaths(job);
+
+    System.out.println(job.getConfiguration().get("bsp.input.dir"));
+    assertEquals(2, FileInputFormat.getInputPaths(job).length);
+
+    for (int i = 0; i < paths.length; i++) {
+      System.out.println(paths[i]);
+      assertEquals(paths[i].toString(), files[i]);
+    }
+  }
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1476398&r1=1476397&r2=1476398&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Fri Apr 26 20:50:30 2013
@@ -19,8 +19,6 @@ package org.apache.hama.bsp;
 
 import java.io.IOException;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -30,19 +28,54 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
-public class TestPartitioning extends TestCase {
+public class TestPartitioning extends HamaCluster {
 
   public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
 
+  public static String TMP_OUTPUT = "/tmp/test-example/";
+  public static final String TMP_OUTPUT_PATH = "/tmp/test-example/output.txt";
+  public static Path OUTPUT_PATH = new Path(TMP_OUTPUT);
+
+  protected HamaConfiguration configuration;
+
+  // these variables are preventing from rebooting the whole stuff again since
+  // setup and teardown are called per method.
+
+  public TestPartitioning() {
+    configuration = new HamaConfiguration();
+    configuration.set("bsp.master.address", "localhost");
+    configuration.set("hama.child.redirect.log.console", "true");
+    assertEquals("Make sure master addr is set to localhost:", "localhost",
+        configuration.get("bsp.master.address"));
+    configuration.set("bsp.local.dir", "/tmp/hama-test");
+    configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+    configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+    configuration.set("hama.sync.client.class",
+        org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+            .getCanonicalName());
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+  }
+
   public void testPartitioner() throws Exception {
 
     Configuration conf = new Configuration();
     conf.set("bsp.local.dir", "/tmp/hama-test/partitioning");
-    conf.set("bsp.partitioning.dir", "/tmp/hama-test/partitioning/localtest");
     conf.setBoolean("bsp.input.runtime.partitioning", true);
     BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
     bsp.setJobName("Test partitioning with input");
@@ -51,12 +84,12 @@ public class TestPartitioning extends Te
     conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
     bsp.setInputFormat(TextInputFormat.class);
     bsp.setOutputFormat(NullOutputFormat.class);
-    bsp.setInputPath(new Path("../CHANGES.txt"));
+    FileInputFormat.setInputPaths(bsp, "../CHANGES.txt,../README.txt");
     bsp.setPartitioner(HashPartitioner.class);
     assertTrue(bsp.waitForCompletion(true));
 
     FileSystem fs = FileSystem.get(conf);
-    fs.delete(new Path("/tmp/hama-test/partitioning/localtest"), true);
+    fs.delete(OUTPUT_PATH, true);
   }
 
   public static class PartionedBSP extends