You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/06/24 07:35:28 UTC
svn commit: r1495927 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/common/
core/src/main/java/org/apache/mahout/common/iterator/
integration/src/main/java/org/apache/mahout/text/
integration/src/test/java/org/apache/mahout/text/
Author: smarthi
Date: Mon Jun 24 05:35:27 2013
New Revision: 1495927
URL: http://svn.apache.org/r1495927
Log:
MAHOUT-833: Make conversion to sequence files map-reduce - first round of Code cleanup based on feedback from code review
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Mon Jun 24 05:35:27 2013
@@ -36,7 +36,6 @@ import org.apache.commons.cli2.builder.G
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -117,34 +116,6 @@ public abstract class AbstractJob extend
options = Lists.newLinkedList();
}
- /**
- * Builds a comma-separated list of input splits
- */
- public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
- StringBuilder dirList = new StringBuilder();
- boolean bContainsFiles = false;
-
- for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
- if (childFileStatus.isDir()) {
- String subDirectoryList = buildDirList(fs, childFileStatus);
- if (subDirectoryList.length() > 0 && dirList.length() > 0) {
- dirList.append(",");
- }
- dirList.append(subDirectoryList);
- } else {
- bContainsFiles = true;
- }
- }
-
- if (bContainsFiles) {
- if (dirList.length() > 0) {
- dirList.append(",");
- }
- dirList.append(fileStatus.getPath().toUri().getPath());
- }
- return dirList.toString();
- }
-
/** Returns the input path established by a call to {@link #parseArguments(String[])}.
* The source of the path may be an input option added using {@link #addInputOption()}
* or it may be the value of the {@code mapred.input.dir} configuration
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Mon Jun 24 05:35:27 2013
@@ -340,4 +340,59 @@ public final class HadoopUtil {
Closeables.close(in, true);
}
}
+
+ /**
+ * Builds a comma-separated list of input splits
+ */
+ public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
+ StringBuilder dirList = new StringBuilder();
+ boolean bContainsFiles = false;
+
+ for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+ if (childFileStatus.isDir()) {
+ String subDirectoryList = buildDirList(fs, childFileStatus);
+ if (subDirectoryList.length() > 0 && dirList.length() > 0) {
+ dirList.append(",");
+ }
+ dirList.append(subDirectoryList);
+ } else {
+ bContainsFiles = true;
+ }
+ }
+
+ if (bContainsFiles) {
+ if (dirList.length() > 0) {
+ dirList.append(",");
+ }
+ dirList.append(fileStatus.getPath().toUri().getPath());
+ }
+ return dirList.toString();
+ }
+
+ /**
+ *
+ * @param conf - configuration
+ * @param filePath - Input File Path
+ * @return relative file Path
+ * @throws IOException
+ */
+ public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
+ FileSystem fs = filePath.getFileSystem(conf);
+ FileStatus fst = fs.getFileStatus(filePath);
+ String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+ String basePath = conf.get("baseinputpath");
+ if (!basePath.endsWith("/")) {
+ basePath += "/";
+ }
+ basePath = basePath.replaceFirst("file:", "");
+ String[] parts = currentPath.split(basePath);
+
+ if (parts.length == 2) {
+ return parts[1];
+ } else if (parts.length == 1) {
+ return parts[0];
+ }
+ return currentPath;
+ }
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java Mon Jun 24 05:35:27 2013
@@ -31,10 +31,7 @@ import com.google.common.base.Charsets;
* defines lines.
*
* This class will uncompress files that end in .zip or .gz accordingly, too.
- *
- * In case that you pass the class a stream of compressed bytes, simply pass the
- * original file name as well in the constructor and the system will decompress the bytes.
- *
+ *
*/
public final class FileLineIterable implements Iterable<String> {
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Mon Jun 24 05:35:27 2013
@@ -133,8 +133,8 @@ public class SequenceFilesFromDirectory
Configuration jobConfig = job.getConfiguration();
jobConfig.set("keyPrefix", keyPrefix);
FileSystem fs = FileSystem.get(jobConfig);
- FileStatus fsFileStatus = fs.getFileStatus(input);
- String inputDirList = buildDirList(fs, fsFileStatus);
+ FileStatus fsFileStatus = HadoopUtil.listStatus(fs, input)[0];
+ String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
jobConfig.set("baseinputpath", input.toString());
long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Mon Jun 24 05:35:27 2013
@@ -17,6 +17,8 @@
package org.apache.mahout.text;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -24,8 +26,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import java.io.IOException;
+import org.apache.mahout.common.HadoopUtil;
/**
* Map class for SequenceFilesFromDirectory MR job
@@ -46,7 +47,7 @@ public class SequenceFilesFromDirectoryM
Configuration configuration = context.getConfiguration();
Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
- String relativeFilePath = SequenceFilesFromMailArchivesMapper.calcRelativeFilePath(configuration, filePath);
+ String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
String filename = this.keyPrefix.length() > 0 ?
this.keyPrefix + Path.SEPARATOR + relativeFilePath :
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Mon Jun 24 05:35:27 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.utils.email.MailOptions;
import org.apache.mahout.utils.email.MailProcessor;
@@ -322,10 +323,10 @@ public final class SequenceFilesFromMail
}
FileSystem fs = FileSystem.get(jobConfig);
- FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+ FileStatus fsFileStatus = HadoopUtil.listStatus(fs, inputPath)[0];
jobConfig.set("baseinputpath", inputPath.toString());
- String inputDirList = buildDirList(fs, fsFileStatus);
+ String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
FileInputFormat.setInputPaths(job, inputDirList);
long chunkSizeInBytes = chunkSize * 1024 * 1024;
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Mon Jun 24 05:35:27 2013
@@ -34,11 +34,10 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.FileLineIterable;
import org.apache.mahout.utils.email.MailOptions;
import org.apache.mahout.utils.email.MailProcessor;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -230,33 +229,12 @@ public class SequenceFilesFromMailArchiv
}
contents.append(body);
}
-
- public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
- FileSystem fs = filePath.getFileSystem(conf);
- FileStatus fst = fs.getFileStatus(filePath);
- String currentPath = fst.getPath().toString().replaceFirst("file:", "");
-
- String basePath = conf.get("baseinputpath");
- if (!basePath.endsWith("/")) {
- basePath += "/";
- }
- basePath = basePath.replaceFirst("file:", "");
- String[] parts = currentPath.split(basePath);
-
- String hdfsStuffRemoved = currentPath; // default value
- if (parts.length == 2) {
- hdfsStuffRemoved = parts[1];
- } else if (parts.length == 1) {
- hdfsStuffRemoved = parts[0];
- }
- return hdfsStuffRemoved;
- }
public void map(IntWritable key, BytesWritable value, Context context)
throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
- String relativeFilePath = calcRelativeFilePath(configuration, filePath);
+ String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
parseMboxLineByLine(relativeFilePath, is, context);
}
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Mon Jun 24 05:35:27 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
-import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
import org.apache.mahout.utils.MahoutTestCase;
@@ -90,7 +90,7 @@ public final class TestSequenceFilesFrom
createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2);
FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
- String dirs = AbstractJob.buildDirList(fs, fstInputPath);
+ String dirs = HadoopUtil.buildDirList(fs, fstInputPath);
System.out.println("\n\n ----- recursive dirs: " + dirs);
SequenceFilesFromDirectory.main(new String[]{
@@ -137,7 +137,7 @@ public final class TestSequenceFilesFrom
createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2);
FileStatus fst_input_path = fs.getFileStatus(inputDirRecur);
- String dirs = AbstractJob.buildDirList(fs, fst_input_path);
+ String dirs = HadoopUtil.buildDirList(fs, fst_input_path);
logger.info("\n\n ---- recursive dirs: {}", dirs);