You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/12/20 18:18:48 UTC

svn commit: r1552746 - in /mahout/trunk: ./ core/src/main/java/org/apache/mahout/common/ integration/src/main/java/org/apache/mahout/text/ integration/src/test/java/org/apache/mahout/text/

Author: smarthi
Date: Fri Dec 20 17:18:47 2013
New Revision: 1552746

URL: http://svn.apache.org/r1552746
Log:
MAHOUT-1319: seqdirectory -filter argument silently ignored when run as MR

Added:
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
Modified:
    mahout/trunk/CHANGELOG
    mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java

Modified: mahout/trunk/CHANGELOG
URL: http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Fri Dec 20 17:18:47 2013
@@ -42,6 +42,8 @@ Release 0.9 - unreleased
 
   MAHOUT-1333: Fixed examples bin directory permissions in distribution archives (Mike Percy via sslavic)
 
+  MAHOUT-1319: seqdirectory -filter argument silently ignored when run as MR (smarthi)
+
   MAHOUT-1317: Clarify some of the messages in Preconditions.checkArgument (Nikolai Grinko, smarthi)
 
   MAHOUT-1314: StreamingKMeansReducer throws NullPointerException when REDUCE_STREAMING_KMEANS is set to true (smarthi)

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Fri Dec 20 17:18:47 2013
@@ -352,18 +352,44 @@ public final class HadoopUtil {
    * @throws IOException - IO Exception
    */
   public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
-    boolean bContainsFiles = false;
+    boolean containsFiles = false;
     List<String> directoriesList = Lists.newArrayList();
     for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
       if (childFileStatus.isDir()) {
         String subDirectoryList = buildDirList(fs, childFileStatus);
         directoriesList.add(subDirectoryList);
       } else {
-        bContainsFiles = true;
+        containsFiles = true;
       }
     }
 
-    if (bContainsFiles) {
+    if (containsFiles) {
+      directoriesList.add(fileStatus.getPath().toUri().getPath());
+    }
+    return Joiner.on(',').skipNulls().join(directoriesList.iterator());
+  }
+
+  /**
+   * Builds a comma-separated list of input splits
+   * @param fs - File System
+   * @param fileStatus - File Status
+   * @param pathFilter - path filter
+   * @return list of directories as a comma-separated String
+   * @throws IOException - IO Exception
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus, PathFilter pathFilter) throws IOException {
+    boolean containsFiles = false;
+    List<String> directoriesList = Lists.newArrayList();
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath(), pathFilter)) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        directoriesList.add(subDirectoryList);
+      } else {
+        containsFiles = true;
+      }
+    }
+
+    if (containsFiles) {
       directoriesList.add(fileStatus.getPath().toUri().getPath());
     }
     return Joiner.on(',').skipNulls().join(directoriesList.iterator());

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Fri Dec 20 17:18:47 2013
@@ -23,10 +23,12 @@ import java.util.Map;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -51,7 +53,7 @@ public class SequenceFilesFromDirectory 
   private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
 
   private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
-  private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
+  public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
   private static final String[] CHARSET_OPTION = {"charset", "c"};
 
   private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
@@ -107,8 +109,8 @@ public class SequenceFilesFromDirectory 
         pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
       } else {
         pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
-          new Class[]{Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
-          new Object[]{conf, keyPrefix, options, writer, charset, fs});
+          new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+          new Object[] {conf, keyPrefix, options, writer, charset, fs});
       }
       fs.listStatus(input, pathFilter);
     } finally {
@@ -129,6 +131,24 @@ public class SequenceFilesFromDirectory 
       keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
     }
 
+    String fileFilterClassName = null;
+    if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
+      fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+    }
+
+    PathFilter pathFilter = null;
+    // Prefix Addition is presently handled in the Mapper and unlike runsequential()
+    // need not be done via a pathFilter
+    if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+      try {
+        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalStateException(e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException(e);
+      }
+    }
+
     // Prepare Job for submission.
     Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
       SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
@@ -136,9 +156,18 @@ public class SequenceFilesFromDirectory 
 
     Configuration jobConfig = job.getConfiguration();
     jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
+    jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
+
     FileSystem fs = FileSystem.get(jobConfig);
     FileStatus fsFileStatus = fs.getFileStatus(input);
-    String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+
+    String inputDirList;
+    if (pathFilter != null) {
+      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
+    } else {
+      inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
+    }
+
     jobConfig.set(BASE_INPUT_PATH, input.toString());
 
     long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java Fri Dec 20 17:18:47 2013
@@ -20,10 +20,13 @@ package org.apache.mahout.text;
 import java.io.IOException;
 
 import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IntWritable;
@@ -33,6 +36,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 
+import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
+
 /**
  * RecordReader used with the MultipleTextFileInputFormat class to read full files as
  * k/v pairs and groups of files as single input splits.
@@ -44,13 +49,16 @@ public class WholeFileRecordReader exten
   private Configuration configuration;
   private BytesWritable value = new BytesWritable();
   private IntWritable index;
+  private String fileFilterClassName = null;
+  private PathFilter pathFilter = null;
 
   public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
-    throws IOException {
+      throws IOException {
     this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
-       fileSplit.getLength(idx), fileSplit.getLocations());
+      fileSplit.getLength(idx), fileSplit.getLocations());
     this.configuration = taskAttemptContext.getConfiguration();
     this.index = new IntWritable(idx);
+    this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
   }
 
   @Override
@@ -71,7 +79,17 @@ public class WholeFileRecordReader exten
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
     throws IOException, InterruptedException {
-
+    if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+      try {
+        pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+      } catch (ClassNotFoundException e) {
+        throw new IllegalStateException(e);
+      } catch (InstantiationException e) {
+        throw new IllegalStateException(e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalStateException(e);
+      }
+    }
   }
 
   @Override
@@ -80,20 +98,31 @@ public class WholeFileRecordReader exten
       byte[] contents = new byte[(int) fileSplit.getLength()];
       Path file = fileSplit.getPath();
       FileSystem fs = file.getFileSystem(this.configuration);
+
+      if (!fs.isFile(file)) {
+        return false;
+      }
+
+      FileStatus[] fileStatuses;
+      if (pathFilter != null) {
+        fileStatuses = fs.listStatus(file, pathFilter);
+      } else {
+        fileStatuses = fs.listStatus(file);
+      }
+
       FSDataInputStream in = null;
-      try {
-        if (!fs.isFile(file)) {
-          return false;
+      if (fileStatuses.length == 1) {
+        try {
+          in = fs.open(fileStatuses[0].getPath());
+          IOUtils.readFully(in, contents, 0, contents.length);
+          value.setCapacity(contents.length);
+          value.set(contents, 0, contents.length);
+        } finally {
+          Closeables.close(in, false);
         }
-        in = fs.open(file);
-        IOUtils.readFully(in, contents, 0, contents.length);
-        value.setCapacity(contents.length);
-        value.set(contents, 0, contents.length);
-      } finally {
-        Closeables.close(in, false);
+        processed = true;
+        return true;
       }
-      processed = true;
-      return true;
     }
     return false;
   }

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java?rev=1552746&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestPathFilter.java Fri Dec 20 17:18:47 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * Dummy Path Filter for testing the MapReduce version of
+ * SequenceFilesFromDirectory
+ */
+public class TestPathFilter implements PathFilter {
+
+  @Override
+  public boolean accept(Path path) {
+    return path.getName().startsWith("t") || path.getName().startsWith("r") || path.getName().startsWith("f");
+  }
+}

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1552746&r1=1552745&r2=1552746&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Fri Dec 20 17:18:47 2013
@@ -30,11 +30,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.MahoutTestCase;
 import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -133,7 +133,7 @@ public final class TestSequenceFilesFrom
       "--charset", Charsets.UTF_8.name(),
       "--method", "mapreduce",
       "--keyPrefix", "UID",
-      "--fileFilterClass", ""
+      "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
     });
 
     checkMRResultFiles(conf, mrOutputDir, DATA1, "UID");
@@ -152,7 +152,9 @@ public final class TestSequenceFilesFrom
       "--chunkSize", "64",
       "--charset", Charsets.UTF_8.name(),
       "--method", "mapreduce",
-      "--keyPrefix", "UID"});
+      "--keyPrefix", "UID",
+      "--fileFilterClass", "org.apache.mahout.text.TestPathFilter"
+    });
 
     checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID");
   }
@@ -206,7 +208,7 @@ public final class TestSequenceFilesFrom
     FileSystem fs = FileSystem.get(configuration);
 
     // output exists?
-    FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter());
     assertEquals(1, fileStatuses.length); // only one
     assertEquals("chunk-0", fileStatuses[0].getPath().getName());
 
@@ -230,16 +232,6 @@ public final class TestSequenceFilesFrom
     }
   }
 
-  /**
-   * exclude hidden (starting with dot) files
-   */
-  private static class ExcludeDotFiles implements PathFilter {
-    @Override
-    public boolean accept(Path file) {
-      return !file.getName().startsWith(".") && !file.getName().startsWith("_");
-    }
-  }
-
   private static void checkRecursiveChunkFiles(Configuration configuration,
                                                Path outputDir,
                                                String[][] data,
@@ -249,7 +241,7 @@ public final class TestSequenceFilesFrom
     System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
 
     // output exists?
-    FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, PathFilters.logsCRCFilter());
     assertEquals(1, fileStatuses.length); // only one
     assertEquals("chunk-0", fileStatuses[0].getPath().getName());
 
@@ -283,7 +275,7 @@ public final class TestSequenceFilesFrom
     FileSystem fs = FileSystem.get(conf);
 
     // output exists?
-    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
     assertEquals(1, fileStatuses.length); // only one
     assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
     Map<String, String> fileToData = Maps.newHashMap();
@@ -314,7 +306,7 @@ public final class TestSequenceFilesFrom
     FileSystem fs = FileSystem.get(configuration);
 
     // output exists?
-    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), PathFilters.logsCRCFilter());
     assertEquals(1, fileStatuses.length); // only one
     assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
     Map<String, String> fileToData = Maps.newHashMap();