You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/06 17:51:06 UTC

svn commit: r1490329 - in /mahout/trunk: ./ integration/ integration/src/main/java/org/apache/mahout/text/ integration/src/test/java/org/apache/mahout/text/ src/conf/

Author: gsingers
Date: Thu Jun  6 15:51:06 2013
New Revision: 1490329

URL: http://svn.apache.org/r1490329
Log:
MAHOUT-944: progress up to main compiling except for the file name filter.  haven't run tests

Added:
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java
Modified:
    mahout/trunk/integration/pom.xml
    mahout/trunk/pom.xml
    mahout/trunk/src/conf/driver.classes.default.props

Modified: mahout/trunk/integration/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/pom.xml?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/integration/pom.xml (original)
+++ mahout/trunk/integration/pom.xml Thu Jun  6 15:51:06 2013
@@ -217,7 +217,7 @@
       <groupId>org.easymock</groupId>
       <artifactId>easymock</artifactId>
       <scope>test</scope>
-    </dependency>
+    </dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency>
 
   </dependencies>
 </project>

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.index.IndexFileNameFilter;
+import org.apache.lucene.index.IndexFileNames;
+
+/**
+ * A wrapper class to convert an IndexFileNameFilter which implements
+ * java.io.FilenameFilter to an org.apache.hadoop.fs.PathFilter.
+ */
+class LuceneIndexFileNameFilter implements PathFilter {
+
+  private static final LuceneIndexFileNameFilter singleton =
+          new LuceneIndexFileNameFilter();
+
+  /**
+   * Get a static instance.
+   *
+   * @return the static instance
+   */
+  public static LuceneIndexFileNameFilter getFilter() {
+    return singleton;
+  }
+
+  //nocommit Not sure what the alternative is here
+  private final IndexFileNameFilter luceneFilter;
+
+  private LuceneIndexFileNameFilter() {
+    luceneFilter = IndexFileNames.getFilter();
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path)
+  */
+  public boolean accept(Path path) {
+    return luceneFilter.accept(null, path.getName());
+  }
+
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,59 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link InputFormat} implementation which splits a Lucene index at the segment level.
+ */
+public class LuceneSegmentInputFormat extends InputFormat {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LuceneSegmentInputFormat.class);
+
+  @Override
+  public List<LuceneSegmentInputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+
+    LuceneStorageConfiguration lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+    List<LuceneSegmentInputSplit> inputSplits = new ArrayList<LuceneSegmentInputSplit>();
+
+    List<Path> indexPaths = lucene2SeqConfiguration.getIndexPaths();
+    for (Path indexPath : indexPaths) {
+      ReadOnlyFileSystemDirectory directory = new ReadOnlyFileSystemDirectory(FileSystem.get(configuration), indexPath, false, configuration);
+      SegmentInfos segmentInfos = new SegmentInfos();
+      segmentInfos.read(directory);
+
+      for (SegmentInfoPerCommit segmentInfo : segmentInfos) {
+        LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, segmentInfo.info.name, segmentInfo.sizeInBytes());
+        inputSplits.add(inputSplit);
+        LOG.info("Created {} byte input split for index '{}' segment {}", new Object[]{segmentInfo.sizeInBytes(), indexPath.toUri(), segmentInfo.info.name});
+      }
+    }
+
+    return inputSplits;
+  }
+
+  @Override
+  public RecordReader<Text, NullWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+    LuceneSegmentRecordReader luceneSegmentRecordReader = new LuceneSegmentRecordReader();
+    luceneSegmentRecordReader.initialize(inputSplit, context);
+    return luceneSegmentRecordReader;
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,89 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link InputSplit} implementation that represents a Lucene segment.
+ */
+public class LuceneSegmentInputSplit extends InputSplit implements Writable {
+
+  private Path indexPath;
+  private String segmentInfoName;
+  private long length;
+
+  public LuceneSegmentInputSplit() {
+    // For deserialization
+  }
+
+  public LuceneSegmentInputSplit(Path indexPath, String segmentInfoName, long length) {
+    this.indexPath = indexPath;
+    this.segmentInfoName = segmentInfoName;
+    this.length = length;
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    return length;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    return new String[]{};
+  }
+
+  public String getSegmentInfoName() {
+    return segmentInfoName;
+  }
+
+  public Path getIndexPath() {
+    return indexPath;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(indexPath.toString());
+    out.writeUTF(segmentInfoName);
+    out.writeLong(length);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.indexPath = new Path(in.readUTF());
+    this.segmentInfoName = in.readUTF();
+    this.length = in.readLong();
+  }
+
+  /**
+   * Get the {@link SegmentInfo} of this {@link InputSplit} via the given {@link Configuration}
+   *
+   * @param configuration the configuration used to locate the index
+   * @return the segment info or throws exception if not found
+   * @throws IOException if an error occurs when accessing the directory
+   */
+  public SegmentInfoPerCommit getSegment(Configuration configuration) throws IOException {
+    ReadOnlyFileSystemDirectory directory = new ReadOnlyFileSystemDirectory(FileSystem.get(configuration), indexPath, false, configuration);
+
+    SegmentInfos segmentInfos = new SegmentInfos();
+    segmentInfos.read(directory);
+
+    for (SegmentInfoPerCommit segmentInfo : segmentInfos) {
+      if (segmentInfo.info.name.equals(segmentInfoName)) {
+        return segmentInfo;
+      }
+    }
+
+    throw new IllegalArgumentException("No such segment: '" + segmentInfoName + "' in directory " + directory.toString());
+  }
+}
\ No newline at end of file

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,78 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+
+/**
+ * {@link RecordReader} implementation for Lucene segments. Each {@link InputSplit} contains a separate Lucene segment.
+ * Emits records consisting of a {@link Text} document ID and a null key.
+ */
+public class LuceneSegmentRecordReader extends RecordReader<Text, NullWritable> {
+
+  public static final boolean READ_ONLY = true;
+  public static final int USE_TERM_INFOS = 1;
+
+  private SegmentReader segmentReader;
+  private IndexSearcher searcher;
+  private Scorer scorer;
+
+  private int nextDocId;
+  private Text key = new Text();
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+    LuceneSegmentInputSplit inputSplit = (LuceneSegmentInputSplit) split;
+
+    Configuration configuration = context.getConfiguration();
+    LuceneStorageConfiguration lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+    SegmentInfoPerCommit segmentInfo = inputSplit.getSegment(configuration);
+    segmentReader = new SegmentReader(segmentInfo, USE_TERM_INFOS, IOContext.READ);//nocommit: Should we use IOContext.READONCE?
+
+
+    searcher = new IndexSearcher(segmentReader);
+    Weight weight = lucene2SeqConfiguration.getQuery().createWeight(searcher);
+    scorer = weight.scorer(segmentReader.getContext(), true, false, null);//nocommit
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    nextDocId = scorer.nextDoc();
+
+    return nextDocId != Scorer.NO_MORE_DOCS;
+  }
+
+  @Override
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    key.set(String.valueOf(nextDocId));
+    return key;
+  }
+
+  @Override
+  public NullWritable getCurrentValue() throws IOException, InterruptedException {
+    return NullWritable.get();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    segmentReader.close();
+    //searcher.close();
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.lucene.util.Version.LUCENE_43;
+
+/**
+ * Holds all the configuration for {@link SequenceFilesFromLuceneStorage}, which generates a sequence file
+ * with id as the key and a content field as value.
+ */
+public class LuceneStorageConfiguration implements Writable {
+
+  private static final Query DEFAULT_QUERY = new MatchAllDocsQuery();
+  private static final int DEFAULT_MAX_HITS = Integer.MAX_VALUE;
+
+  static final String KEY = "org.apache.mahout.text.LuceneIndexToSequenceFiles";
+
+  static final String SEPARATOR_FIELDS = ",";
+  static final String SEPARATOR_PATHS = ",";
+
+  private Configuration configuration;
+  private List<Path> indexPaths;
+  private Path sequenceFilesOutputPath;
+  private String idField;
+  private List<String> fields;
+  private Query query;
+  private int maxHits;
+
+  /**
+   * Create a configuration bean with all mandatory parameters.
+   *
+   * @param configuration           Hadoop configuration for writing sequencefiles
+   * @param indexPaths              paths to the index
+   * @param sequenceFilesOutputPath path to output the sequence file
+   * @param idField                 field used for the key of the sequence file
+   * @param fields                  field(s) used for the value of the sequence file
+   */
+  public LuceneStorageConfiguration(Configuration configuration, List<Path> indexPaths, Path sequenceFilesOutputPath, String idField, List<String> fields) {
+    Preconditions.checkArgument(configuration != null, "Parameter 'configuration' cannot be null");
+    Preconditions.checkArgument(indexPaths != null, "Parameter 'indexPaths' cannot be null");
+    Preconditions.checkArgument(indexPaths != null && !indexPaths.isEmpty(), "Parameter 'indexPaths' cannot be empty");
+    Preconditions.checkArgument(sequenceFilesOutputPath != null, "Parameter 'sequenceFilesOutputPath' cannot be null");
+    Preconditions.checkArgument(idField != null, "Parameter 'idField' cannot be null");
+    Preconditions.checkArgument(fields != null, "Parameter 'fields' cannot be null");
+    Preconditions.checkArgument(fields != null && !fields.isEmpty(), "Parameter 'fields' cannot be empty");
+
+    this.configuration = configuration;
+    this.indexPaths = indexPaths;
+    this.sequenceFilesOutputPath = sequenceFilesOutputPath;
+    this.idField = idField;
+    this.fields = fields;
+
+    setQuery(DEFAULT_QUERY);
+    setMaxHits(DEFAULT_MAX_HITS);
+  }
+
+  public LuceneStorageConfiguration() {
+    // Used during serialization. Do not use.
+  }
+
+  /**
+   * Deserializes a {@link LuceneStorageConfiguration} from a {@link Configuration}.
+   *
+   * @param conf the {@link Configuration} object with a serialized {@link LuceneStorageConfiguration}
+   * @throws IOException if deserialization fails
+   */
+  public LuceneStorageConfiguration(Configuration conf) throws IOException {
+    Preconditions.checkNotNull(conf, "Parameter 'configuration' cannot be null");
+
+    String serializedConfigString = conf.get(KEY);
+
+    if (serializedConfigString == null) {
+      throw new IllegalArgumentException("Parameter 'configuration' does not contain a serialized " + this.getClass());
+    }
+
+    LuceneStorageConfiguration luceneStorageConf = DefaultStringifier.load(conf, KEY, LuceneStorageConfiguration.class);
+
+    this.configuration = conf;
+    this.indexPaths = luceneStorageConf.getIndexPaths();
+    this.sequenceFilesOutputPath = luceneStorageConf.getSequenceFilesOutputPath();
+    this.idField = luceneStorageConf.getIdField();
+    this.fields = luceneStorageConf.getFields();
+    this.query = luceneStorageConf.getQuery();
+    this.maxHits = luceneStorageConf.getMaxHits();
+  }
+
+  /**
+   * Serializes this object in a Hadoop {@link Configuration}
+   *
+   * @return a {@link Configuration} object with a String serialization
+   * @throws IOException if serialization fails
+   */
+  public Configuration serialize() throws IOException {
+    DefaultStringifier.store(configuration, this, KEY);
+
+    return new Configuration(configuration);
+  }
+
+  /**
+   * Returns an {@link Iterator} which returns (Text, Text) {@link Pair}s of the produced sequence files.
+   *
+   * @return iterator
+   */
+  public Iterator<Pair<Text, Text>> getSequenceFileIterator() {
+    return new SequenceFileDirIterable<Text, Text>(sequenceFilesOutputPath, PathType.LIST, configuration).iterator();
+  }
+
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  public Path getSequenceFilesOutputPath() {
+    return sequenceFilesOutputPath;
+  }
+
+  public List<Path> getIndexPaths() {
+    return indexPaths;
+  }
+
+  public String getIdField() {
+    return idField;
+  }
+
+  public List<String> getFields() {
+    return fields;
+  }
+
+  public void setQuery(Query query) {
+    this.query = query;
+  }
+
+  public Query getQuery() {
+    return query;
+  }
+
+  public void setMaxHits(int maxHits) {
+    this.maxHits = maxHits;
+  }
+
+  public int getMaxHits() {
+    return maxHits;
+  }
+
+  public DocumentStoredFieldVisitor getStoredFieldVisitor() {
+    Set<String> fieldSet = Sets.newHashSet(idField);
+    fieldSet.addAll(fields);
+    return new DocumentStoredFieldVisitor(fieldSet);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeUTF(sequenceFilesOutputPath.toString());
+    out.writeUTF(StringUtils.join(indexPaths, SEPARATOR_PATHS));
+    out.writeUTF(idField);
+    out.writeUTF(StringUtils.join(fields, SEPARATOR_FIELDS));
+    out.writeUTF(query.toString());
+    out.writeInt(maxHits);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    try {
+      this.sequenceFilesOutputPath = new Path(in.readUTF());
+      this.indexPaths = new ArrayList<Path>();
+      String[] indexPaths = in.readUTF().split(SEPARATOR_PATHS);
+      for (String indexPath : indexPaths) {
+        this.indexPaths.add(new Path(indexPath));
+      }
+      this.idField = in.readUTF();
+      this.fields = Arrays.asList(in.readUTF().split(SEPARATOR_FIELDS));
+      this.query = new QueryParser(LUCENE_43, "query", new StandardAnalyzer(LUCENE_43)).parse(in.readUTF());
+      this.maxHits = in.readInt();
+    } catch (ParseException e) {
+      throw new RuntimeException("Could not deserialize " + this.getClass().getName(), e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    LuceneStorageConfiguration that = (LuceneStorageConfiguration) o;
+
+    if (maxHits != that.maxHits) return false;
+    if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false;
+    if (idField != null ? !idField.equals(that.idField) : that.idField != null) return false;
+    if (indexPaths != null ? !indexPaths.equals(that.indexPaths) : that.indexPaths != null) return false;
+    if (query != null ? !query.equals(that.query) : that.query != null) return false;
+    if (sequenceFilesOutputPath != null ? !sequenceFilesOutputPath.equals(that.sequenceFilesOutputPath) : that.sequenceFilesOutputPath != null)
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = indexPaths != null ? indexPaths.hashCode() : 0;
+    result = 31 * result + (sequenceFilesOutputPath != null ? sequenceFilesOutputPath.hashCode() : 0);
+    result = 31 * result + (idField != null ? idField.hashCode() : 0);
+    result = 31 * result + (fields != null ? fields.hashCode() : 0);
+    result = 31 * result + (query != null ? query.hashCode() : 0);
+    result = 31 * result + maxHits;
+    return result;
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.BufferedIndexOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+
+import java.io.IOException;
+import java.util.Collection;
+
+//NOCOMMIT: Not sure if there isn't a better way of doing this in 4.x.  Don't we have a Hadoop Directory impl somewhere?
+
+/**
+ * This class implements a Lucene Directory on top of a general FileSystem.
+ * Currently it does not support locking.
+ * <p/>
+ * // TODO: Rename to FileSystemReadOnlyDirectory
+ */
+public class ReadOnlyFileSystemDirectory extends Directory {
+
+  private final FileSystem fs;
+  private final Path directory;
+  private final int ioFileBufferSize;
+
+  /**
+   * Constructor
+   *
+   * @param fs
+   * @param directory
+   * @param create
+   * @param conf
+   * @throws IOException
+   */
+  public ReadOnlyFileSystemDirectory(FileSystem fs, Path directory, boolean create,
+                                     Configuration conf) throws IOException {
+
+    this.fs = fs;
+    this.directory = directory;
+    this.ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
+
+    if (create) {
+      create();
+    }
+
+    boolean isDir = false;
+    try {
+      FileStatus status = fs.getFileStatus(directory);
+      if (status != null) {
+        isDir = status.isDir();
+      }
+    } catch (IOException e) {
+      // file does not exist, isDir already set to false
+    }
+    if (!isDir) {
+      throw new IOException(directory + " is not a directory");
+    }
+  }
+
+
+  private void create() throws IOException {
+    if (!fs.exists(directory)) {
+      fs.mkdirs(directory);
+    }
+
+    boolean isDir = false;
+    try {
+      FileStatus status = fs.getFileStatus(directory);
+      if (status != null) {
+        isDir = status.isDir();
+      }
+    } catch (IOException e) {
+      // file does not exist, isDir already set to false
+    }
+    if (!isDir) {
+      throw new IOException(directory + " is not a directory");
+    }
+
+    // clear old index files
+    FileStatus[] fileStatus =
+            fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
+    for (int i = 0; i < fileStatus.length; i++) {
+      if (!fs.delete(fileStatus[i].getPath(), true)) {
+        throw new IOException("Cannot delete index file "
+                + fileStatus[i].getPath());
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#list()
+  */
+  public String[] list() throws IOException {
+    FileStatus[] fileStatus =
+            fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
+    String[] result = new String[fileStatus.length];
+    for (int i = 0; i < fileStatus.length; i++) {
+      result[i] = fileStatus[i].getPath().getName();
+    }
+    return result;
+  }
+
+  @Override
+  public String[] listAll() throws IOException {
+    return list();
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#fileExists(java.lang.String)
+  */
+  public boolean fileExists(String name) throws IOException {
+    return fs.exists(new Path(directory, name));
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#fileModified(java.lang.String)
+  */
+  public long fileModified(String name) {
+    throw new UnsupportedOperationException();
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#touchFile(java.lang.String)
+  */
+  public void touchFile(String name) {
+    throw new UnsupportedOperationException();
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#fileLength(java.lang.String)
+  */
+  public long fileLength(String name) throws IOException {
+    return fs.getFileStatus(new Path(directory, name)).getLen();
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#deleteFile(java.lang.String)
+  */
+  public void deleteFile(String name) throws IOException {
+    if (!fs.delete(new Path(directory, name), true)) {
+      throw new IOException("Cannot delete index file " + name);
+    }
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#renameFile(java.lang.String, java.lang.String)
+  */
+  public void renameFile(String from, String to) throws IOException {
+    fs.rename(new Path(directory, from), new Path(directory, to));
+  }
+
+  @Override
+  public IndexOutput createOutput(String name, IOContext context) throws IOException {
+    //nocommit What should we be doing with the IOContext here?
+    Path file = new Path(directory, name);
+    if (fs.exists(file) && !fs.delete(file, true)) {
+      // delete the existing one if applicable
+      throw new IOException("Cannot overwrite index file " + file);
+    }
+
+    return new FileSystemIndexOutput(file, ioFileBufferSize);
+  }
+
+  @Override
+  public void sync(Collection<String> names) throws IOException {
+
+  }
+
+  @Override
+  public IndexInput openInput(String name, IOContext context) throws IOException {
+    return new FileSystemIndexInput(new Path(directory, name), ioFileBufferSize);
+  }
+
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#makeLock(java.lang.String)
+  */
+  public Lock makeLock(final String name) {
+    return new Lock() {
+      public boolean obtain() {
+        return true;
+      }
+
+      public void release() {
+      }
+
+      public boolean isLocked() {
+        throw new UnsupportedOperationException();
+      }
+
+      public String toString() {
+        return "Lock@" + new Path(directory, name);
+      }
+    };
+  }
+
+  /* (non-Javadoc)
+  * @see org.apache.lucene.store.Directory#close()
+  */
+  public void close() throws IOException {
+    // do not close the file system
+  }
+
+  /* (non-Javadoc)
+  * @see java.lang.Object#toString()
+  */
+  public String toString() {
+    return this.getClass().getName() + "@" + directory;
+  }
+
+  private class FileSystemIndexInput extends BufferedIndexInput {
+
+    // shared by clones
+    private class Descriptor {
+      public final FSDataInputStream in;
+      public long position; // cache of in.getPos()
+
+      public Descriptor(Path file, int ioFileBufferSize) throws IOException {
+        this.in = fs.open(file, ioFileBufferSize);
+      }
+    }
+
+    private final Path filePath; // for debugging
+    private final Descriptor descriptor;
+    private final long length;
+    private boolean isOpen;
+    private boolean isClone;
+
+    public FileSystemIndexInput(Path path, int ioFileBufferSize)
+            throws IOException {
+      super("FSII_" + path.getName(), ioFileBufferSize);
+      filePath = path;
+      descriptor = new Descriptor(path, ioFileBufferSize);
+      length = fs.getFileStatus(path).getLen();
+      isOpen = true;
+    }
+
+    protected void readInternal(byte[] b, int offset, int len)
+            throws IOException {
+      long position = getFilePointer();
+      if (position != descriptor.position) {
+        descriptor.in.seek(position);
+        descriptor.position = position;
+      }
+      int total = 0;
+      do {
+        int i = descriptor.in.read(b, offset + total, len - total);
+        if (i == -1) {
+          throw new IOException("Read past EOF");
+        }
+        descriptor.position += i;
+        total += i;
+      } while (total < len);
+    }
+
+    public void close() throws IOException {
+      if (!isClone) {
+        if (isOpen) {
+          descriptor.in.close();
+          isOpen = false;
+        } else {
+          throw new IOException("Index file " + filePath + " already closed");
+        }
+      }
+    }
+
+    protected void seekInternal(long position) {
+      // handled in readInternal()
+    }
+
+    public long length() {
+      return length;
+    }
+
+    protected void finalize() throws IOException {
+      if (!isClone && isOpen) {
+        close(); // close the file
+      }
+    }
+
+    public BufferedIndexInput clone() {
+      FileSystemIndexInput clone = (FileSystemIndexInput) super.clone();
+      clone.isClone = true;
+      return clone;
+    }
+  }
+
+  private class FileSystemIndexOutput extends BufferedIndexOutput {
+
+    private final Path filePath; // for debugging
+    private final FSDataOutputStream out;
+    private boolean isOpen;
+
+    public FileSystemIndexOutput(Path path, int ioFileBufferSize)
+            throws IOException {
+      filePath = path;
+      // overwrite is true by default
+      out = fs.create(path, true, ioFileBufferSize);
+      isOpen = true;
+    }
+
+    public void flushBuffer(byte[] b, int offset, int size) throws IOException {
+      out.write(b, offset, size);
+    }
+
+    public void close() throws IOException {
+      if (isOpen) {
+        super.close();
+        out.close();
+        isOpen = false;
+      } else {
+        throw new IOException("Index file " + filePath + " already closed");
+      }
+    }
+
+    public void seek(long pos) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    public long length() throws IOException {
+      return out.getPos();
+    }
+
+    protected void finalize() throws IOException {
+      if (isOpen) {
+        close(); // close the file
+      }
+    }
+  }
+
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+/**
+ * Generates a sequence file from a Lucene index with a specified id field as the key and a content field as the value.
+ * Configure this class with a {@link LuceneStorageConfiguration} bean.
+ */
+public class SequenceFilesFromLuceneStorage {
+
+  public static final String SEPARATOR_FIELDS = " ";
+
+  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromLuceneStorage.class);
+
+  /**
+   * Generates a sequence files from a Lucene index via the given {@link LuceneStorageConfiguration}
+   *
+   * @param lucene2seqConf configuration bean
+   * @throws java.io.IOException if index cannot be opened or sequence file could not be written
+   */
+  public void run(LuceneStorageConfiguration lucene2seqConf) throws IOException {
+    List<Path> indexPaths = lucene2seqConf.getIndexPaths();
+
+    for (Path indexPath : indexPaths) {
+      Directory directory = FSDirectory.open(new File(indexPath.toString()));
+      IndexReader reader = DirectoryReader.open(directory);
+      IndexSearcher searcher = new IndexSearcher(reader);
+      Configuration configuration = lucene2seqConf.getConfiguration();
+      FileSystem fileSystem = FileSystem.get(configuration);
+      Path sequenceFilePath = new Path(lucene2seqConf.getSequenceFilesOutputPath(), indexPath);
+      SequenceFile.Writer sequenceFileWriter = new SequenceFile.Writer(fileSystem, configuration, sequenceFilePath, Text.class, Text.class);
+
+      Text key = new Text();
+      Text value = new Text();
+
+      Weight weight = lucene2seqConf.getQuery().createWeight(searcher);
+      //TODO: as the name implies, this is slow, but this is sequential anyway, so not a big deal.  Better perf. would be by looping on the segments
+      AtomicReaderContext context = SlowCompositeReaderWrapper.wrap(reader).getContext();
+
+      Scorer scorer = weight.scorer(context, true, false, null);
+
+      if (scorer != null) {
+        int processedDocs = 0;
+        int docId;
+
+        while ((docId = scorer.nextDoc()) != NO_MORE_DOCS && processedDocs < lucene2seqConf.getMaxHits()) {
+          DocumentStoredFieldVisitor storedFieldVisitor = lucene2seqConf.getStoredFieldVisitor();
+          reader.document(docId, storedFieldVisitor);
+          Document doc = storedFieldVisitor.getDocument();
+          String idValue = doc.get(lucene2seqConf.getIdField());
+
+          StringBuilder fieldValueBuilder = new StringBuilder();
+          List<String> fields = lucene2seqConf.getFields();
+          for (int i = 0; i < fields.size(); i++) {
+            String field = fields.get(i);
+            String fieldValue = doc.get(field);
+            if (isNotBlank(fieldValue)) {
+              fieldValueBuilder.append(fieldValue);
+              if (i != fields.size() - 1) {
+                fieldValueBuilder.append(SEPARATOR_FIELDS);
+              }
+            }
+          }
+
+          if (isBlank(idValue) || isBlank(fieldValueBuilder.toString())) {
+            continue;
+          }
+
+          key.set(idValue);
+          value.set(fieldValueBuilder.toString());
+
+          sequenceFileWriter.append(key, value);
+
+          processedDocs++;
+        }
+
+        log.info("Wrote " + processedDocs + " documents in " + sequenceFilePath.toUri());
+      } else {
+        Closeables.close(sequenceFileWriter, true);
+        directory.close();
+        //searcher.close();
+        reader.close();
+        throw new RuntimeException("Could not write sequence files. Could not create scorer");
+      }
+
+      Closeables.close(sequenceFileWriter, true);
+      directory.close();
+      //searcher.close();
+      reader.close();
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Driver class for the lucene2seq program. Converts text contents of stored fields of a lucene index into a Hadoop
+ * SequenceFile. The key of the sequence file is the document ID and the value is the concatenated text of the specified
+ * stored field(s).
+ */
+public class SequenceFilesFromLuceneStorageDriver extends AbstractJob {
+
+  static final String OPTION_LUCENE_DIRECTORY = "dir";
+  static final String OPTION_ID_FIELD = "idField";
+  static final String OPTION_FIELD = "fields";
+  static final String OPTION_QUERY = "query";
+  static final String OPTION_MAX_HITS = "maxHits";
+
+  static final Query DEFAULT_QUERY = new MatchAllDocsQuery();
+  static final int DEFAULT_MAX_HITS = Integer.MAX_VALUE;
+
+  static final String SEPARATOR_FIELDS = ",";
+  static final String QUERY_DELIMITER = "'";
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SequenceFilesFromLuceneStorageDriver(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addOutputOption();
+
+    addOption(OPTION_LUCENE_DIRECTORY, "d", "Lucene directory / directories. Comma separated.", true);
+    addOption(OPTION_ID_FIELD, "i", "The field in the index containing the id", true);
+    addOption(OPTION_FIELD, "f", "The stored field(s) in the index containing text", true);
+
+    addOption(OPTION_QUERY, "q", "(Optional) Lucene query. Defaults to " + DEFAULT_QUERY.getClass().getSimpleName());
+    addOption(OPTION_MAX_HITS, "n", "(Optional) Max hits. Defaults to " + DEFAULT_MAX_HITS);
+    addOption(DefaultOptionCreator.methodOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Configuration configuration = getConf();
+    if (configuration == null) {
+      configuration = new Configuration();
+    }
+
+    String[] paths = getOption(OPTION_LUCENE_DIRECTORY).split(",");
+    List<Path> indexPaths = new ArrayList<Path>();
+    for (String path : paths) {
+      indexPaths.add(new Path(path));
+    }
+
+    Path sequenceFilesOutputPath = new Path((getOption(DefaultOptionCreator.OUTPUT_OPTION)));
+
+    String idField = getOption(OPTION_ID_FIELD);
+    String fields = getOption(OPTION_FIELD);
+
+    LuceneStorageConfiguration lucene2SeqConf = newLucene2SeqConfiguration(configuration,
+            indexPaths,
+            sequenceFilesOutputPath,
+            idField,
+            asList(fields.split(SEPARATOR_FIELDS)));
+
+    Query query = DEFAULT_QUERY;
+    if (hasOption(OPTION_QUERY)) {
+      try {
+        String queryString = getOption(OPTION_QUERY).replaceAll(QUERY_DELIMITER, "");
+        QueryParser queryParser = new QueryParser(Version.LUCENE_35, queryString, new StandardAnalyzer(Version.LUCENE_35));
+        query = queryParser.parse(queryString);
+      } catch (ParseException e) {
+        throw new IllegalArgumentException(e.getMessage(), e);
+      }
+    }
+    lucene2SeqConf.setQuery(query);
+
+    int maxHits = DEFAULT_MAX_HITS;
+    if (hasOption(OPTION_MAX_HITS)) {
+      String maxHitsString = getOption(OPTION_MAX_HITS);
+      maxHits = Integer.valueOf(maxHitsString);
+    }
+    lucene2SeqConf.setMaxHits(maxHits);
+
+    if (hasOption(DefaultOptionCreator.METHOD_OPTION) && getOption(DefaultOptionCreator.METHOD_OPTION).equals("sequential")) {
+      new SequenceFilesFromLuceneStorage().run(lucene2SeqConf);
+    } else {
+      new SequenceFilesFromLuceneStorageMRJob().run(lucene2SeqConf);
+    }
+    return 0;
+  }
+
+  public LuceneStorageConfiguration newLucene2SeqConfiguration(Configuration configuration,
+                                                               List<Path> indexPaths,
+                                                               Path sequenceFilesOutputPath,
+                                                               String idField,
+                                                               List<String> fields) {
+    return new LuceneStorageConfiguration(
+            configuration,
+            indexPaths,
+            sequenceFilesOutputPath,
+            idField,
+            fields);
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,55 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Generates a sequence file from a Lucene index via MapReduce. Uses a specified id field as the key and a content field as the value.
+ * Configure this class with a {@link LuceneStorageConfiguration} bean.
+ */
+public class SequenceFilesFromLuceneStorageMRJob {
+
+  public void run(LuceneStorageConfiguration lucene2seqConf) {
+    try {
+      Configuration configuration = lucene2seqConf.serialize();
+
+      Job job = new Job(configuration, "LuceneIndexToSequenceFiles: " + lucene2seqConf.getIndexPaths() + " -> M/R -> " + lucene2seqConf.getSequenceFilesOutputPath());
+
+      job.setMapOutputKeyClass(Text.class);
+      job.setMapOutputValueClass(Text.class);
+
+      job.setOutputKeyClass(Text.class);
+      job.setOutputValueClass(Text.class);
+
+      job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+      job.setMapperClass(SequenceFilesFromLuceneStorageMapper.class);
+
+      job.setInputFormatClass(LuceneSegmentInputFormat.class);
+
+      for (Path indexPath : lucene2seqConf.getIndexPaths()) {
+        FileInputFormat.addInputPath(job, indexPath);
+      }
+
+      FileOutputFormat.setOutputPath(job, lucene2seqConf.getSequenceFilesOutputPath());
+
+      job.setJarByClass(SequenceFilesFromLuceneStorageMRJob.class);
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,85 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+/**
+ * Maps document IDs to key value pairs with ID field as the key and the concatenated stored field(s)
+ * as value.
+ */
+public class SequenceFilesFromLuceneStorageMapper extends Mapper<Text, NullWritable, Text, Text> {
+
+  public static final String SEPARATOR_FIELDS = " ";
+  public static final int USE_TERM_INFOS = 1;
+
+  private LuceneStorageConfiguration lucene2SeqConfiguration;
+  private SegmentReader segmentReader;
+
+  private Text idKey;
+  private Text fieldValue;
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+
+    lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+    LuceneSegmentInputSplit inputSplit = (LuceneSegmentInputSplit) context.getInputSplit();
+
+    SegmentInfoPerCommit segmentInfo = inputSplit.getSegment(configuration);
+    segmentReader = new SegmentReader(segmentInfo, USE_TERM_INFOS, IOContext.READ);//nocommit: Should we use IOContext.READONCE?
+
+    idKey = new Text();
+    fieldValue = new Text();
+  }
+
+  @Override
+  protected void map(Text key, NullWritable text, Context context) throws IOException, InterruptedException {
+    int docId = Integer.valueOf(key.toString());
+    Document document = segmentReader.document(docId);
+
+    String idString = document.get(lucene2SeqConfiguration.getIdField());
+
+    StringBuilder valueBuilder = new StringBuilder();
+    List<String> fields = lucene2SeqConfiguration.getFields();
+    for (int i = 0; i < fields.size(); i++) {
+      String field = fields.get(i);
+      String fieldValue = document.get(field);
+      if (isNotBlank(fieldValue)) {
+        valueBuilder.append(fieldValue);
+        if (i != fields.size() - 1) {
+          valueBuilder.append(SEPARATOR_FIELDS);
+        }
+      }
+    }
+
+    idKey.set(nullSafe(idString));
+    fieldValue.set(nullSafe(valueBuilder.toString()));
+
+    context.write(idKey, fieldValue);
+  }
+
+  @Override
+  protected void cleanup(Context context) throws IOException, InterruptedException {
+    segmentReader.close();
+  }
+
+  private String nullSafe(String value) {
+    if (value == null) {
+      return "";
+    } else {
+      return value;
+    }
+  }
+}
\ No newline at end of file

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,60 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.MultipleFieldsDocument;
+import org.apache.mahout.text.doc.NumericFieldDocument;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Abstract test for working with Lucene storage.
+ */
+public abstract class AbstractLuceneStorageTest {
+
+  private Path indexPath = new Path("index");
+
+  protected void commitDocuments(SingleFieldDocument... documents) throws IOException {
+    IndexWriter indexWriter = new IndexWriter(getDirectory(), new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer()));
+
+    for (SingleFieldDocument singleFieldDocument : documents) {
+      indexWriter.addDocument(singleFieldDocument.asLuceneDocument());
+    }
+
+    indexWriter.commit();
+    indexWriter.close();
+  }
+
+  protected void assertSimpleDocumentEquals(SingleFieldDocument expected, Pair<Text, Text> actual) {
+    assertEquals(expected.getId(), actual.getFirst().toString());
+    assertEquals(expected.getField(), actual.getSecond().toString());
+  }
+
+  protected void assertMultipleFieldsDocumentEquals(MultipleFieldsDocument expected, Pair<Text, Text> actual) {
+    assertEquals(expected.getId(), actual.getFirst().toString());
+    assertEquals(expected.getField() + " " + expected.getField1() + " " + expected.getField2(), actual.getSecond().toString());
+  }
+
+  protected void assertNumericFieldEquals(NumericFieldDocument expected, Pair<Text, Text> actual) {
+    assertEquals(expected.getId(), actual.getFirst().toString());
+    assertEquals(expected.getField() + " " + expected.getNumericField(), actual.getSecond().toString());
+  }
+
+  protected FSDirectory getDirectory() throws IOException {
+    return FSDirectory.open(new File(indexPath.toString()));
+  }
+
+  protected Path getIndexPath() {
+    return indexPath;
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,72 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static junit.framework.Assert.assertEquals;
+
+public class LuceneSegmentInputFormatTest {
+
+  private LuceneSegmentInputFormat inputFormat;
+  private JobContext jobContext;
+  private Path indexPath;
+  private Configuration conf;
+  private FSDirectory directory;
+
+  @Before
+  public void before() throws IOException {
+    inputFormat = new LuceneSegmentInputFormat();
+    indexPath = new Path("index");
+
+    LuceneStorageConfiguration lucene2SeqConf = new LuceneStorageConfiguration(new Configuration(), asList(indexPath), new Path("output"), "id", asList("field"));
+    conf = lucene2SeqConf.serialize();
+
+    jobContext = new JobContext(conf, new JobID());
+    directory = FSDirectory.open(new File(indexPath.toString()));
+  }
+  
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(conf, indexPath);
+  }
+
+  @Test
+  public void testGetSplits() throws IOException, InterruptedException {
+    SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+    SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+    SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+    List<SingleFieldDocument> documents = asList(doc1, doc2, doc3);
+
+    for (SingleFieldDocument singleFieldDocument : documents) {
+      commitDocument(singleFieldDocument);
+    }
+
+    List<LuceneSegmentInputSplit> splits = inputFormat.getSplits(jobContext);
+    assertEquals(3, splits.size());
+  }
+
+  private void commitDocument(SingleFieldDocument doc) throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer());
+    IndexWriter indexWriter = new IndexWriter(directory, conf);
+    indexWriter.addDocument(doc.asLuceneDocument());
+    indexWriter.commit();
+    indexWriter.close();
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,89 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static junit.framework.Assert.assertEquals;
+
+public class LuceneSegmentInputSplitTest {
+
+  private FSDirectory directory;
+  private Path indexPath;
+  private Configuration conf;
+
+  @Before
+  public void before() throws IOException {
+    indexPath = new Path("index");
+    directory = FSDirectory.open(new File(indexPath.toString()));
+    conf = new Configuration();
+  }
+
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(conf, indexPath);
+  }
+
+  @Test
+  public void testGetSegment() throws Exception {
+    SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+    SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+    SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+    List<SingleFieldDocument> docs = asList(doc1, doc2, doc3);
+    for (SingleFieldDocument doc : docs) {
+      addDocument(doc);
+    }
+
+    assertSegmentContainsOneDoc("_0");
+    assertSegmentContainsOneDoc("_1");
+    assertSegmentContainsOneDoc("_2");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetSegment_nonExistingSegment() throws Exception {
+    SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+    SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+    SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+    List<SingleFieldDocument> docs = asList(doc1, doc2, doc3);
+    for (SingleFieldDocument doc : docs) {
+      addDocument(doc);
+    }
+
+    LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, "_3", 1000);
+    inputSplit.getSegment(conf);
+  }
+
+  private void assertSegmentContainsOneDoc(String segmentName) throws IOException {
+    LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, segmentName, 1000);
+    SegmentInfo segment = inputSplit.getSegment(conf);
+    SegmentReader segmentReader = SegmentReader.get(true, segment, 1);
+    assertEquals(segmentName, segment.name);
+    assertEquals(1, segmentReader.numDocs());
+  }
+
+  private void addDocument(SingleFieldDocument doc) throws IOException {
+    IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer());
+    IndexWriter indexWriter = new IndexWriter(directory, conf);
+    indexWriter.addDocument(doc.asLuceneDocument());
+    indexWriter.commit();
+    indexWriter.close();
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,72 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.index.*;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class LuceneSegmentRecordReaderTest extends AbstractLuceneStorageTest {
+
+  private LuceneSegmentRecordReader recordReader;
+  private Configuration configuration;
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    LuceneStorageConfiguration lucene2SeqConf = new LuceneStorageConfiguration(new Configuration(), asList(getIndexPath()), new Path("output"), "id", asList("field"));
+    configuration = lucene2SeqConf.serialize();
+
+    SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+    SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+    SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+    commitDocuments(doc1, doc2, doc3);
+
+    SegmentInfos segmentInfos = new SegmentInfos();
+    segmentInfos.read(getDirectory());
+
+    SegmentInfo segmentInfo = segmentInfos.asList().get(0);
+    LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(getIndexPath(), segmentInfo.name, segmentInfo.sizeInBytes(true));
+
+    TaskAttemptContext context = new TaskAttemptContext(configuration, new TaskAttemptID());
+
+    recordReader = new LuceneSegmentRecordReader();
+    recordReader.initialize(inputSplit, context);
+  }
+
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(configuration, getIndexPath());
+  }
+
+  @Test
+  public void testKey() throws Exception {
+    recordReader.nextKeyValue();
+    assertEquals("0", recordReader.getCurrentKey().toString());
+    recordReader.nextKeyValue();
+    assertEquals("1", recordReader.getCurrentKey().toString());
+    recordReader.nextKeyValue();
+    assertEquals("2", recordReader.getCurrentKey().toString());
+  }
+
+  @Test
+  public void testGetCurrentValue() throws Exception {
+    assertEquals(NullWritable.get(), recordReader.getCurrentValue());
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,32 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class LuceneStorageConfigurationTest {
+  
+  @Test
+  public void testSerialization() throws Exception {
+    Configuration conf = new Configuration();
+    Path indexPath = new Path("indexPath");
+    Path outputPath = new Path("outputPath");
+    LuceneStorageConfiguration luceneStorageConf = new LuceneStorageConfiguration(conf, asList(indexPath), outputPath, "id", asList("field"));
+
+    Configuration serializedConf = luceneStorageConf.serialize();
+
+    LuceneStorageConfiguration deserializedConf = new LuceneStorageConfiguration(serializedConf);
+
+    assertEquals(luceneStorageConf, deserializedConf);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testSerialization_notSerialized() throws IOException {
+    new LuceneStorageConfiguration(new Configuration());
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.search.TermQuery;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SequenceFilesFromLuceneStorageDriverTest extends AbstractLuceneStorageTest {
+
+  private SequenceFilesFromLuceneStorageDriver driver;
+  private LuceneStorageConfiguration lucene2SeqConf;
+  private String idField;
+  private List<String> fields;
+  private Path seqFilesOutputPath;
+  private Configuration conf;
+
+  @Before
+  public void before() throws Exception {
+    conf = new Configuration();
+    conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+      + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+    seqFilesOutputPath = new Path("seqfiles");
+    idField = "id";
+    fields = asList("field");
+
+    driver = new SequenceFilesFromLuceneStorageDriver() {
+      @Override
+      public LuceneStorageConfiguration newLucene2SeqConfiguration(Configuration configuration, List<Path> indexPaths, Path seqPath, String idField, List<String> fields) {
+        lucene2SeqConf = new LuceneStorageConfiguration(configuration, indexPaths, seqPath, idField, fields);
+        return lucene2SeqConf;
+      }
+    };
+
+    commitDocuments(new SingleFieldDocument("1", "Mahout is cool"));
+    commitDocuments(new SingleFieldDocument("2", "Mahout is cool"));
+  }
+
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(conf, seqFilesOutputPath);
+    HadoopUtil.delete(conf, getIndexPath());
+  }
+
+  @Test
+  public void testNewLucene2SeqConfiguration() {
+    lucene2SeqConf = driver.newLucene2SeqConfiguration(conf,
+      asList(new Path(getIndexPath().toString())),
+      seqFilesOutputPath,
+      idField,
+      fields);
+
+    assertEquals(conf, lucene2SeqConf.getConfiguration());
+    assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+    assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+    assertEquals(idField, lucene2SeqConf.getIdField());
+    assertEquals(fields, lucene2SeqConf.getFields());
+  }
+
+  @Test
+  public void testRun() throws Exception {
+    String queryField = "queryfield";
+    String queryTerm = "queryterm";
+    String maxHits = "500";
+    String field1 = "field1";
+    String field2 = "field2";
+
+    String[] args = new String[]{
+      "-d", getIndexPath().toString(),
+      "-o", seqFilesOutputPath.toString(),
+      "-i", idField,
+      "-f", field1 + "," + field2,
+      "-q", queryField + ":" + queryTerm,
+      "-n", maxHits,
+      "-xm", "sequential"
+    };
+
+    driver.setConf(conf);
+    driver.run(args);
+
+    assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+    assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+    assertEquals(idField, lucene2SeqConf.getIdField());
+    assertEquals(asList(field1, field2), lucene2SeqConf.getFields());
+
+    assertTrue(lucene2SeqConf.getQuery() instanceof TermQuery);
+    assertEquals(queryField, ((TermQuery) lucene2SeqConf.getQuery()).getTerm().field());
+    assertEquals(queryTerm, ((TermQuery) lucene2SeqConf.getQuery()).getTerm().text());
+    assertEquals(new Integer(maxHits), (Integer) lucene2SeqConf.getMaxHits());
+  }
+
+  @Test
+  public void testRun_optionalArguments() throws Exception {
+    String[] args = new String[]{
+      "-d", getIndexPath().toString(),
+      "-o", seqFilesOutputPath.toString(),
+      "-i", idField,
+      "-f", StringUtils.join(fields, SequenceFilesFromLuceneStorageDriver.SEPARATOR_FIELDS)
+    };
+
+    driver.setConf(conf);
+    driver.run(args);
+
+    assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+    assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+    assertEquals(idField, lucene2SeqConf.getIdField());
+    assertEquals(fields, lucene2SeqConf.getFields());
+    assertEquals(conf, lucene2SeqConf.getConfiguration());
+
+    assertEquals(SequenceFilesFromLuceneStorageDriver.DEFAULT_QUERY, lucene2SeqConf.getQuery());
+    assertEquals(SequenceFilesFromLuceneStorageDriver.DEFAULT_MAX_HITS, lucene2SeqConf.getMaxHits());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testRun_invalidQuery() throws Exception {
+    String[] args = new String[]{
+      "-d", getIndexPath().toString(),
+      "-o", seqFilesOutputPath.toString(),
+      "-i", idField,
+      "-f", StringUtils.join(fields, SequenceFilesFromLuceneStorageDriver.SEPARATOR_FIELDS),
+      "-q", "inva:lid:query"
+    };
+
+    driver.setConf(conf);
+    driver.run(args);
+  }
+
+  @Test
+  public void testHelp() throws Exception {
+    driver = new SequenceFilesFromLuceneStorageDriver();
+    driver.run(new String[]{"--help"});
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,64 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SequenceFilesFromLuceneStorageMRJobTest extends AbstractLuceneStorageTest {
+
+  private SequenceFilesFromLuceneStorageMRJob lucene2seq;
+  private LuceneStorageConfiguration lucene2SeqConf;
+  private SingleFieldDocument document1;
+  private SingleFieldDocument document2;
+  private SingleFieldDocument document3;
+  private SingleFieldDocument document4;
+
+  @Before
+  public void before() {
+    lucene2seq = new SequenceFilesFromLuceneStorageMRJob();
+
+    Configuration configuration = new Configuration();
+    Path seqOutputPath = new Path("seqOutputPath");
+
+    lucene2SeqConf = new LuceneStorageConfiguration(configuration, asList(getIndexPath()), seqOutputPath, SingleFieldDocument.ID_FIELD, asList(SingleFieldDocument.FIELD));
+
+    document1 = new SingleFieldDocument("1", "This is test document 1");
+    document2 = new SingleFieldDocument("2", "This is test document 2");
+    document3 = new SingleFieldDocument("3", "This is test document 3");
+    document4 = new SingleFieldDocument("4", "This is test document 4");
+  }
+
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getSequenceFilesOutputPath());
+    HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getIndexPaths());
+  }
+
+  @Test
+  public void testRun() throws IOException {
+    commitDocuments(document1, document2, document3, document4);
+
+    lucene2seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(document1, iterator.next());
+    assertSimpleDocumentEquals(document2, iterator.next());
+    assertSimpleDocumentEquals(document3, iterator.next());
+    assertSimpleDocumentEquals(document4, iterator.next());
+  }
+}

Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java Thu Jun  6 15:51:06 2013
@@ -0,0 +1,197 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.MultipleFieldsDocument;
+import org.apache.mahout.text.doc.NumericFieldDocument;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.text.doc.UnstoredFieldsDocument;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertFalse;
+
+public class SequenceFilesFromLuceneStorageTest extends AbstractLuceneStorageTest {
+
+  private SequenceFilesFromLuceneStorage lucene2Seq;
+  private LuceneStorageConfiguration lucene2SeqConf;
+
+  private SingleFieldDocument document1;
+  private SingleFieldDocument document2;
+  private SingleFieldDocument document3;
+  private Path seqFilesOutputPath;
+  private Configuration configuration;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void before() throws IOException {
+    configuration = new Configuration();
+    seqFilesOutputPath = new Path("seqfiles");
+
+    lucene2Seq = new SequenceFilesFromLuceneStorage();
+    lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+      asList(getIndexPath()),
+      seqFilesOutputPath,
+      SingleFieldDocument.ID_FIELD,
+      asList(SingleFieldDocument.FIELD));
+
+    document1 = new SingleFieldDocument("1", "This is test document 1");
+    document2 = new SingleFieldDocument("2", "This is test document 2");
+    document3 = new SingleFieldDocument("3", "This is test document 3");
+  }
+
+  @After
+  public void after() throws IOException {
+    HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getSequenceFilesOutputPath());
+    HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getIndexPaths());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun() throws Exception {
+    commitDocuments(document1, document2, document3);
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(document1, iterator.next());
+    assertSimpleDocumentEquals(document2, iterator.next());
+    assertSimpleDocumentEquals(document3, iterator.next());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun_skipEmptyIdFieldDocs() throws IOException {
+    commitDocuments(document1, new SingleFieldDocument("", "This is a test document with no id"), document2);
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(document1, iterator.next());
+    assertSimpleDocumentEquals(document2, iterator.next());
+    assertFalse(iterator.hasNext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun_skipEmptyFieldDocs() throws IOException {
+    commitDocuments(document1, new SingleFieldDocument("4", ""), document2);
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(document1, iterator.next());
+    assertSimpleDocumentEquals(document2, iterator.next());
+    assertFalse(iterator.hasNext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun_skipUnstoredFields() throws IOException {
+    commitDocuments(new UnstoredFieldsDocument("5", "This is test document 5"));
+
+    lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+      asList(getIndexPath()),
+      seqFilesOutputPath,
+      SingleFieldDocument.ID_FIELD,
+      asList(UnstoredFieldsDocument.FIELD, UnstoredFieldsDocument.UNSTORED_FIELD));
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertFalse(iterator.next().getSecond().toString().contains("null"));
+    assertFalse(iterator.hasNext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun_maxHits() throws IOException {
+    commitDocuments(document1, document2, document3, new SingleFieldDocument("4", "This is test document 4"));
+
+    lucene2SeqConf.setMaxHits(3);
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(document1, iterator.next());
+    assertSimpleDocumentEquals(document2, iterator.next());
+    assertSimpleDocumentEquals(document3, iterator.next());
+    assertFalse(iterator.hasNext());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testRun_query() throws IOException {
+    commitDocuments(document1, document2, document3, new SingleFieldDocument("4", "Mahout is cool"));
+
+    Query query = new TermQuery(new Term(lucene2SeqConf.getFields().get(0), "mahout"));
+
+    lucene2SeqConf.setQuery(query);
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertSimpleDocumentEquals(new SingleFieldDocument("4", "Mahout is cool"), iterator.next());
+    assertFalse(iterator.hasNext());
+  }
+
+  @Test
+  public void testRun_multipleFields() throws IOException {
+    lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+      asList(getIndexPath()),
+      seqFilesOutputPath,
+      SingleFieldDocument.ID_FIELD,
+      asList(MultipleFieldsDocument.FIELD, MultipleFieldsDocument.FIELD1, MultipleFieldsDocument.FIELD2));
+
+    MultipleFieldsDocument multipleFieldsDocument1 = new MultipleFieldsDocument("1", "This is field 1-1", "This is field 1-2", "This is field 1-3");
+    MultipleFieldsDocument multipleFieldsDocument2 = new MultipleFieldsDocument("2", "This is field 2-1", "This is field 2-2", "This is field 2-3");
+    MultipleFieldsDocument multipleFieldsDocument3 = new MultipleFieldsDocument("3", "This is field 3-1", "This is field 3-2", "This is field 3-3");
+    commitDocuments(multipleFieldsDocument1, multipleFieldsDocument2, multipleFieldsDocument3);
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertMultipleFieldsDocumentEquals(multipleFieldsDocument1, iterator.next());
+    assertMultipleFieldsDocumentEquals(multipleFieldsDocument2, iterator.next());
+    assertMultipleFieldsDocumentEquals(multipleFieldsDocument3, iterator.next());
+  }
+
+  @Test
+  public void testRun_numericField() throws IOException {
+    lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+      asList(getIndexPath()),
+      seqFilesOutputPath,
+      SingleFieldDocument.ID_FIELD,
+      asList(NumericFieldDocument.FIELD, NumericFieldDocument.NUMERIC_FIELD));
+
+    NumericFieldDocument doc1 = new NumericFieldDocument("1", "This is field 1", 100);
+    NumericFieldDocument doc2 = new NumericFieldDocument("2", "This is field 2", 200);
+    NumericFieldDocument doc3 = new NumericFieldDocument("3", "This is field 3", 300);
+
+    commitDocuments(doc1, doc2, doc3);
+
+    lucene2Seq.run(lucene2SeqConf);
+
+    Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+    assertNumericFieldEquals(doc1, iterator.next());
+    assertNumericFieldEquals(doc2, iterator.next());
+    assertNumericFieldEquals(doc3, iterator.next());
+  }
+}

Modified: mahout/trunk/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/pom.xml?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/pom.xml (original)
+++ mahout/trunk/pom.xml Thu Jun  6 15:51:06 2013
@@ -100,7 +100,7 @@
     <maven.clover.multiproject>true</maven.clover.multiproject>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <hadoop.version>1.1.2</hadoop.version>
-    <lucene.version>4.2.1</lucene.version>
+    <lucene.version>4.3.0</lucene.version>
   </properties>
   <issueManagement>
     <system>Jira</system>

Modified: mahout/trunk/src/conf/driver.classes.default.props
URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/driver.classes.default.props?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/src/conf/driver.classes.default.props (original)
+++ mahout/trunk/src/conf/driver.classes.default.props Thu Jun  6 15:51:06 2013
@@ -13,6 +13,7 @@ org.apache.mahout.vectorizer.SparseVecto
 org.apache.mahout.vectorizer.EncodedVectorsFromSequenceFiles = seq2encoded: Encoded Sparse Vector generation from Text sequence files
 org.apache.mahout.text.WikipediaToSequenceFile = seqwiki : Wikipedia xml dump to sequence file
 org.apache.mahout.text.SequenceFilesFromMailArchives = seqmailarchives : Creates SequenceFile from a directory containing gzipped mail archives
+org.apache.mahout.text.LuceneIndexToSequenceFilesDriver = lucene2seq : Generate Text SequenceFiles from a Lucene index
 
 #Math
 org.apache.mahout.math.hadoop.TransposeJob = transpose : Take the transpose of a matrix