You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/12/20 18:18:48 UTC
svn commit: r1552746 - in /mahout/trunk: ./
core/src/main/java/org/apache/mahout/common/
integration/src/main/java/org/apache/mahout/text/
integration/src/test/java/org/apache/mahout/text/
Author: smarthi
Date: Fri Dec 20 17:18:47 2013
New Revision: 1552746
URL: http://svn.apache.org/r1552746
Log:
MAHOUT-1319: seqdirectory -filter argument silently ignored when run as MR
Added:
mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Fri Dec 20 17:18:47 2013
@@ -42,6 +42,8 @@ Release 0.9 - unreleased
MAHOUT-1333: Fixed examples bin directory permissions in distribution archives (Mike Percy via sslavic)
+ MAHOUT-1319: seqdirectory -filter argument silently ignored when run as MR (smarthi)
+
MAHOUT-1317: Clarify some of the messages in Preconditions.checkArgument (Nikolai Grinko, smarthi)
MAHOUT-1314: StreamingKMeansReducer throws NullPointerException when REDUCE_STREAMING_KMEANS is set to true (smarthi)
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Fri Dec 20 17:18:47 2013
@@ -352,18 +352,44 @@ public final class HadoopUtil {
* @throws IOException - IO Exception
*/
public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
- boolean bContainsFiles = false;
+ boolean containsFiles = false;
List<String> directoriesList = Lists.newArrayList();
for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
if (childFileStatus.isDir()) {
String subDirectoryList = buildDirList(fs, childFileStatus);
directoriesList.add(subDirectoryList);
} else {
- bContainsFiles = true;
+ containsFiles = true;
}
}
- if (bContainsFiles) {
+ if (containsFiles) {
+ directoriesList.add(fileStatus.getPath().toUri().getPath());
+ }
+ return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+ }
+
+ /**
+ * Builds a comma-separated list of input splits
+ * @param fs - File System
+ * @param fileStatus - File Status
+ * @param pathFilter - path filter
+ * @return list of directories as a comma-separated String
+ * @throws IOException - IO Exception
+ */
+ public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException {
+ boolean containsFiles = false;
+ List<String> directoriesList = Lists.newArrayList();
+ for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) {
+ if (childFileStatus.isDir()) {
+ String subDirectoryList = buildDirList(fs, childFileStatus);
+ directoriesList.add(subDirectoryList);
+ } else {
+ containsFiles = true;
+ }
+ }
+
+ if (containsFiles) {
directoriesList.add(fileStatus.getPath().toUri().getPath());
}
return Joiner.on(',').skipNulls().join(directoriesList.iterator());
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Fri Dec 20 17:18:47 2013
@@ -23,10 +23,12 @@ import java.util.Map;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -51,7 +53,7 @@ public class SequenceFilesFromDirectory
private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
- private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
+ public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
private static final String[] CHARSET_OPTION = {"charset", "c"};
private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
@@ -107,8 +109,8 @@ public class SequenceFilesFromDirectory
pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
} else {
pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
- new Class[]{Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
- new Object[]{conf, keyPrefix, options, writer, charset, fs});
+ new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+ new Object[] {conf, keyPrefix, options, writer, charset, fs});
}
fs.listStatus(input, pathFilter);
} finally {
@@ -129,6 +131,24 @@ public class SequenceFilesFromDirectory
keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
}
+ String fileFilterClassName = null;
+ if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
+ fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+ }
+
+ PathFilter pathFilter = null;
+ // Prefix Addition is presently handled in the Mapper and unlike runsequential()
+ // need not be done via a pathFilter
+ if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ try {
+ pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
// Prepare Job for submission.
Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
@@ -136,9 +156,18 @@ public class SequenceFilesFromDirectory
Configuration jobConfig = job.getConfiguration();
jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
+ jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
+
FileSystem fs = FileSystem.get(jobConfig);
FileStatus fsFileStatus = fs.getFileStatus(input);
- String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+
+ String inputDirList;
+ if (pathFilter != null) {
+ inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
+ } else {
+ inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+ }
+
jobConfig.set(BASE_INPUT_PATH, input.toString());
long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java Fri Dec 20 17:18:47 2013
@@ -20,10 +20,13 @@ package org.apache.mahout.text;
import java.io.IOException;
import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
@@ -33,6 +36,8 @@ import org.apache.hadoop.mapreduce.TaskA
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
+
/**
* RecordReader used with the MultipleTextFileInputFormat class to read full files as
* k/v pairs and groups of files as single input splits.
@@ -44,13 +49,16 @@ public class WholeFileRecordReader exten
private Configuration configuration;
private BytesWritable value = new BytesWritable();
private IntWritable index;
+ private String fileFilterClassName = null;
+ private PathFilter pathFilter = null;
public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
- throws IOException {
+ throws IOException {
this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
- fileSplit.getLength(idx), fileSplit.getLocations());
+ fileSplit.getLength(idx), fileSplit.getLocations());
this.configuration = taskAttemptContext.getConfiguration();
this.index = new IntWritable(idx);
+ this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
}
@Override
@@ -71,7 +79,17 @@ public class WholeFileRecordReader exten
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
-
+ if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ try {
+ pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ } catch (InstantiationException e) {
+ throw new IllegalStateException(e);
+ } catch (IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
}
@Override
@@ -80,20 +98,31 @@ public class WholeFileRecordReader exten
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(this.configuration);
+
+ if (!fs.isFile(file)) {
+ return false;
+ }
+
+ FileStatus[] fileStatuses;
+ if (pathFilter != null) {
+ fileStatuses = fs.listStatus(file, pathFilter);
+ } else {
+ fileStatuses = fs.listStatus(file);
+ }
+
FSDataInputStream in = null;
- try {
- if (!fs.isFile(file)) {
- return false;
+ if (fileStatuses.length == 1) {
+ try {
+ in = fs.open(fileStatuses[0].getPath());
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.setCapacity(contents.length);
+ value.set(contents, 0, contents.length);
+ } finally {
+ Closeables.close(in, false);
}
- in = fs.open(file);
- IOUtils.readFully(in, contents, 0, contents.length);
- value.setCapacity(contents.length);
- value.set(contents, 0, contents.length);
- } finally {
- Closeables.close(in, false);
+ processed = true;
+ return true;
}
- processed = true;
- return true;
}
return false;
}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java?rev=1552746&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java Fri Dec 20 17:18:47 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Dummy Path Filter for testing the MapReduce version of
+ * SequenceFilesFromDirectory
+ */
+public class TestPathFilter implements PathFilter {
+
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("t") || path.getName().startsWith("r") || path.getName().startsWith("f");
+ }
+}
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Fri Dec 20 17:18:47 2013
@@ -30,11 +30,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
import org.junit.Test;
import org.slf4j.Logger;
@@ -133,7 +133,7 @@ public final class TestSequenceFilesFrom
"--charset", Charsets.UTF_8.name(),
"--method", "mapreduce",
"--keyPrefix", "UID",
- "--fileFilterClass", ""
+ "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
});
checkMRResultFiles(conf, mrOutputDir, DATA1, "UID");
@@ -152,7 +152,9 @@ public final class TestSequenceFilesFrom
"--chunkSize", "64",
"--charset", Charsets.UTF_8.name(),
"--method", "mapreduce",
- "--keyPrefix", "UID"});
+ "--keyPrefix", "UID",
+ "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
+ });
checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID");
}
@@ -206,7 +208,7 @@ public final class TestSequenceFilesFrom
FileSystem fs = FileSystem.get(configuration);
// output exists?
- FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+ FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter());
assertEquals(1, fileStatuses.length); // only one
assertEquals("chunk-0", fileStatuses[0].getPath().getName());
@@ -230,16 +232,6 @@ public final class TestSequenceFilesFrom
}
}
- /**
- * exclude hidden (starting with dot) files
- */
- private static class ExcludeDotFiles implements PathFilter {
- @Override
- public boolean accept(Path file) {
- return !file.getName().startsWith(".") && !file.getName().startsWith("_");
- }
- }
-
private static void checkRecursiveChunkFiles(Configuration configuration,
Path outputDir,
String[][] data,
@@ -249,7 +241,7 @@ public final class TestSequenceFilesFrom
System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
// output exists?
- FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+ FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter());
assertEquals(1, fileStatuses.length); // only one
assertEquals("chunk-0", fileStatuses[0].getPath().getName());
@@ -283,7 +275,7 @@ public final class TestSequenceFilesFrom
FileSystem fs = FileSystem.get(conf);
// output exists?
- FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+ FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
assertEquals(1, fileStatuses.length); // only one
assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
Map<String, String> fileToData = Maps.newHashMap();
@@ -314,7 +306,7 @@ public final class TestSequenceFilesFrom
FileSystem fs = FileSystem.get(configuration);
// output exists?
- FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+ FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
assertEquals(1, fileStatuses.length); // only one
assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
Map<String, String> fileToData = Maps.newHashMap();