You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by gs...@apache.org on 2013/06/06 17:51:06 UTC
svn commit: r1490329 - in /mahout/trunk: ./ integration/
integration/src/main/java/org/apache/mahout/text/
integration/src/test/java/org/apache/mahout/text/ src/conf/
Author: gsingers
Date: Thu Jun 6 15:51:06 2013
New Revision: 1490329
URL: http://svn.apache.org/r1490329
Log:
MAHOUT-944: progress up to main compiling except for the file name filter. haven't run tests
Added:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java
Modified:
mahout/trunk/integration/pom.xml
mahout/trunk/pom.xml
mahout/trunk/src/conf/driver.classes.default.props
Modified: mahout/trunk/integration/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/pom.xml?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/integration/pom.xml (original)
+++ mahout/trunk/integration/pom.xml Thu Jun 6 15:51:06 2013
@@ -217,7 +217,7 @@
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
- </dependency>
+ </dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency><dependency><groupId>org.apache.lucene</groupId><artifactId>lucene-core</artifactId><version>4.3.0</version></dependency>
</dependencies>
</project>
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneIndexFileNameFilter.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.lucene.index.IndexFileNameFilter;
+import org.apache.lucene.index.IndexFileNames;
+
+/**
+ * A wrapper class to convert an IndexFileNameFilter which implements
+ * java.io.FilenameFilter to an org.apache.hadoop.fs.PathFilter.
+ */
+class LuceneIndexFileNameFilter implements PathFilter {
+
+ private static final LuceneIndexFileNameFilter singleton =
+ new LuceneIndexFileNameFilter();
+
+ /**
+ * Get a static instance.
+ *
+ * @return the static instance
+ */
+ public static LuceneIndexFileNameFilter getFilter() {
+ return singleton;
+ }
+
+ //nocommit Not sure what the alternative is here
+ private final IndexFileNameFilter luceneFilter;
+
+ private LuceneIndexFileNameFilter() {
+ luceneFilter = IndexFileNames.getFilter();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.fs.PathFilter#accept(org.apache.hadoop.fs.Path)
+ */
+ public boolean accept(Path path) {
+ return luceneFilter.accept(null, path.getName());
+ }
+
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputFormat.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,59 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * {@link InputFormat} implementation which splits a Lucene index at the segment level.
+ */
+public class LuceneSegmentInputFormat extends InputFormat {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LuceneSegmentInputFormat.class);
+
+ @Override
+ public List<LuceneSegmentInputSplit> getSplits(JobContext context) throws IOException, InterruptedException {
+ Configuration configuration = context.getConfiguration();
+
+ LuceneStorageConfiguration lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+ List<LuceneSegmentInputSplit> inputSplits = new ArrayList<LuceneSegmentInputSplit>();
+
+ List<Path> indexPaths = lucene2SeqConfiguration.getIndexPaths();
+ for (Path indexPath : indexPaths) {
+ ReadOnlyFileSystemDirectory directory = new ReadOnlyFileSystemDirectory(FileSystem.get(configuration), indexPath, false, configuration);
+ SegmentInfos segmentInfos = new SegmentInfos();
+ segmentInfos.read(directory);
+
+ for (SegmentInfoPerCommit segmentInfo : segmentInfos) {
+ LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, segmentInfo.info.name, segmentInfo.sizeInBytes());
+ inputSplits.add(inputSplit);
+ LOG.info("Created {} byte input split for index '{}' segment {}", new Object[]{segmentInfo.sizeInBytes(), indexPath.toUri(), segmentInfo.info.name});
+ }
+ }
+
+ return inputSplits;
+ }
+
+ @Override
+ public RecordReader<Text, NullWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {
+ LuceneSegmentRecordReader luceneSegmentRecordReader = new LuceneSegmentRecordReader();
+ luceneSegmentRecordReader.initialize(inputSplit, context);
+ return luceneSegmentRecordReader;
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentInputSplit.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,89 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentInfos;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link InputSplit} implementation that represents a Lucene segment.
+ */
+public class LuceneSegmentInputSplit extends InputSplit implements Writable {
+
+ private Path indexPath;
+ private String segmentInfoName;
+ private long length;
+
+ public LuceneSegmentInputSplit() {
+ // For deserialization
+ }
+
+ public LuceneSegmentInputSplit(Path indexPath, String segmentInfoName, long length) {
+ this.indexPath = indexPath;
+ this.segmentInfoName = segmentInfoName;
+ this.length = length;
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[]{};
+ }
+
+ public String getSegmentInfoName() {
+ return segmentInfoName;
+ }
+
+ public Path getIndexPath() {
+ return indexPath;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(indexPath.toString());
+ out.writeUTF(segmentInfoName);
+ out.writeLong(length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.indexPath = new Path(in.readUTF());
+ this.segmentInfoName = in.readUTF();
+ this.length = in.readLong();
+ }
+
+ /**
+ * Get the {@link SegmentInfo} of this {@link InputSplit} via the given {@link Configuration}
+ *
+ * @param configuration the configuration used to locate the index
+ * @return the segment info or throws exception if not found
+ * @throws IOException if an error occurs when accessing the directory
+ */
+ public SegmentInfoPerCommit getSegment(Configuration configuration) throws IOException {
+ ReadOnlyFileSystemDirectory directory = new ReadOnlyFileSystemDirectory(FileSystem.get(configuration), indexPath, false, configuration);
+
+ SegmentInfos segmentInfos = new SegmentInfos();
+ segmentInfos.read(directory);
+
+ for (SegmentInfoPerCommit segmentInfo : segmentInfos) {
+ if (segmentInfo.info.name.equals(segmentInfoName)) {
+ return segmentInfo;
+ }
+ }
+
+ throw new IllegalArgumentException("No such segment: '" + segmentInfoName + "' in directory " + directory.toString());
+ }
+}
\ No newline at end of file
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneSegmentRecordReader.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,78 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+
+/**
+ * {@link RecordReader} implementation for Lucene segments. Each {@link InputSplit} contains a separate Lucene segment.
+ * Emits records consisting of a {@link Text} document ID and a null key.
+ */
+public class LuceneSegmentRecordReader extends RecordReader<Text, NullWritable> {
+
+ public static final boolean READ_ONLY = true;
+ public static final int USE_TERM_INFOS = 1;
+
+ private SegmentReader segmentReader;
+ private IndexSearcher searcher;
+ private Scorer scorer;
+
+ private int nextDocId;
+ private Text key = new Text();
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ LuceneSegmentInputSplit inputSplit = (LuceneSegmentInputSplit) split;
+
+ Configuration configuration = context.getConfiguration();
+ LuceneStorageConfiguration lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+ SegmentInfoPerCommit segmentInfo = inputSplit.getSegment(configuration);
+ segmentReader = new SegmentReader(segmentInfo, USE_TERM_INFOS, IOContext.READ);//nocommit: Should we use IOContext.READONCE?
+
+
+ searcher = new IndexSearcher(segmentReader);
+ Weight weight = lucene2SeqConfiguration.getQuery().createWeight(searcher);
+ scorer = weight.scorer(segmentReader.getContext(), true, false, null);//nocommit
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ nextDocId = scorer.nextDoc();
+
+ return nextDocId != Scorer.NO_MORE_DOCS;
+ }
+
+ @Override
+ public Text getCurrentKey() throws IOException, InterruptedException {
+ key.set(String.valueOf(nextDocId));
+ return key;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return NullWritable.get();
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ segmentReader.close();
+ //searcher.close();
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/LuceneStorageConfiguration.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.lucene.util.Version.LUCENE_43;
+
+/**
+ * Holds all the configuration for {@link SequenceFilesFromLuceneStorage}, which generates a sequence file
+ * with id as the key and a content field as value.
+ */
+public class LuceneStorageConfiguration implements Writable {
+
+ private static final Query DEFAULT_QUERY = new MatchAllDocsQuery();
+ private static final int DEFAULT_MAX_HITS = Integer.MAX_VALUE;
+
+ static final String KEY = "org.apache.mahout.text.LuceneIndexToSequenceFiles";
+
+ static final String SEPARATOR_FIELDS = ",";
+ static final String SEPARATOR_PATHS = ",";
+
+ private Configuration configuration;
+ private List<Path> indexPaths;
+ private Path sequenceFilesOutputPath;
+ private String idField;
+ private List<String> fields;
+ private Query query;
+ private int maxHits;
+
+ /**
+ * Create a configuration bean with all mandatory parameters.
+ *
+ * @param configuration Hadoop configuration for writing sequencefiles
+ * @param indexPaths paths to the index
+ * @param sequenceFilesOutputPath path to output the sequence file
+ * @param idField field used for the key of the sequence file
+ * @param fields field(s) used for the value of the sequence file
+ */
+ public LuceneStorageConfiguration(Configuration configuration, List<Path> indexPaths, Path sequenceFilesOutputPath, String idField, List<String> fields) {
+ Preconditions.checkArgument(configuration != null, "Parameter 'configuration' cannot be null");
+ Preconditions.checkArgument(indexPaths != null, "Parameter 'indexPaths' cannot be null");
+ Preconditions.checkArgument(indexPaths != null && !indexPaths.isEmpty(), "Parameter 'indexPaths' cannot be empty");
+ Preconditions.checkArgument(sequenceFilesOutputPath != null, "Parameter 'sequenceFilesOutputPath' cannot be null");
+ Preconditions.checkArgument(idField != null, "Parameter 'idField' cannot be null");
+ Preconditions.checkArgument(fields != null, "Parameter 'fields' cannot be null");
+ Preconditions.checkArgument(fields != null && !fields.isEmpty(), "Parameter 'fields' cannot be empty");
+
+ this.configuration = configuration;
+ this.indexPaths = indexPaths;
+ this.sequenceFilesOutputPath = sequenceFilesOutputPath;
+ this.idField = idField;
+ this.fields = fields;
+
+ setQuery(DEFAULT_QUERY);
+ setMaxHits(DEFAULT_MAX_HITS);
+ }
+
+ public LuceneStorageConfiguration() {
+ // Used during serialization. Do not use.
+ }
+
+ /**
+ * Deserializes a {@link LuceneStorageConfiguration} from a {@link Configuration}.
+ *
+ * @param conf the {@link Configuration} object with a serialized {@link LuceneStorageConfiguration}
+ * @throws IOException if deserialization fails
+ */
+ public LuceneStorageConfiguration(Configuration conf) throws IOException {
+ Preconditions.checkNotNull(conf, "Parameter 'configuration' cannot be null");
+
+ String serializedConfigString = conf.get(KEY);
+
+ if (serializedConfigString == null) {
+ throw new IllegalArgumentException("Parameter 'configuration' does not contain a serialized " + this.getClass());
+ }
+
+ LuceneStorageConfiguration luceneStorageConf = DefaultStringifier.load(conf, KEY, LuceneStorageConfiguration.class);
+
+ this.configuration = conf;
+ this.indexPaths = luceneStorageConf.getIndexPaths();
+ this.sequenceFilesOutputPath = luceneStorageConf.getSequenceFilesOutputPath();
+ this.idField = luceneStorageConf.getIdField();
+ this.fields = luceneStorageConf.getFields();
+ this.query = luceneStorageConf.getQuery();
+ this.maxHits = luceneStorageConf.getMaxHits();
+ }
+
+ /**
+ * Serializes this object in a Hadoop {@link Configuration}
+ *
+ * @return a {@link Configuration} object with a String serialization
+ * @throws IOException if serialization fails
+ */
+ public Configuration serialize() throws IOException {
+ DefaultStringifier.store(configuration, this, KEY);
+
+ return new Configuration(configuration);
+ }
+
+ /**
+ * Returns an {@link Iterator} which returns (Text, Text) {@link Pair}s of the produced sequence files.
+ *
+ * @return iterator
+ */
+ public Iterator<Pair<Text, Text>> getSequenceFileIterator() {
+ return new SequenceFileDirIterable<Text, Text>(sequenceFilesOutputPath, PathType.LIST, configuration).iterator();
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public Path getSequenceFilesOutputPath() {
+ return sequenceFilesOutputPath;
+ }
+
+ public List<Path> getIndexPaths() {
+ return indexPaths;
+ }
+
+ public String getIdField() {
+ return idField;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+
+ public void setQuery(Query query) {
+ this.query = query;
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ public void setMaxHits(int maxHits) {
+ this.maxHits = maxHits;
+ }
+
+ public int getMaxHits() {
+ return maxHits;
+ }
+
+ public DocumentStoredFieldVisitor getStoredFieldVisitor() {
+ Set<String> fieldSet = Sets.newHashSet(idField);
+ fieldSet.addAll(fields);
+ return new DocumentStoredFieldVisitor(fieldSet);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(sequenceFilesOutputPath.toString());
+ out.writeUTF(StringUtils.join(indexPaths, SEPARATOR_PATHS));
+ out.writeUTF(idField);
+ out.writeUTF(StringUtils.join(fields, SEPARATOR_FIELDS));
+ out.writeUTF(query.toString());
+ out.writeInt(maxHits);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ try {
+ this.sequenceFilesOutputPath = new Path(in.readUTF());
+ this.indexPaths = new ArrayList<Path>();
+ String[] indexPaths = in.readUTF().split(SEPARATOR_PATHS);
+ for (String indexPath : indexPaths) {
+ this.indexPaths.add(new Path(indexPath));
+ }
+ this.idField = in.readUTF();
+ this.fields = Arrays.asList(in.readUTF().split(SEPARATOR_FIELDS));
+ this.query = new QueryParser(LUCENE_43, "query", new StandardAnalyzer(LUCENE_43)).parse(in.readUTF());
+ this.maxHits = in.readInt();
+ } catch (ParseException e) {
+ throw new RuntimeException("Could not deserialize " + this.getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ LuceneStorageConfiguration that = (LuceneStorageConfiguration) o;
+
+ if (maxHits != that.maxHits) return false;
+ if (fields != null ? !fields.equals(that.fields) : that.fields != null) return false;
+ if (idField != null ? !idField.equals(that.idField) : that.idField != null) return false;
+ if (indexPaths != null ? !indexPaths.equals(that.indexPaths) : that.indexPaths != null) return false;
+ if (query != null ? !query.equals(that.query) : that.query != null) return false;
+ if (sequenceFilesOutputPath != null ? !sequenceFilesOutputPath.equals(that.sequenceFilesOutputPath) : that.sequenceFilesOutputPath != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = indexPaths != null ? indexPaths.hashCode() : 0;
+ result = 31 * result + (sequenceFilesOutputPath != null ? sequenceFilesOutputPath.hashCode() : 0);
+ result = 31 * result + (idField != null ? idField.hashCode() : 0);
+ result = 31 * result + (fields != null ? fields.hashCode() : 0);
+ result = 31 * result + (query != null ? query.hashCode() : 0);
+ result = 31 * result + maxHits;
+ return result;
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/ReadOnlyFileSystemDirectory.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.store.BufferedIndexInput;
+import org.apache.lucene.store.BufferedIndexOutput;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexInput;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.Lock;
+
+import java.io.IOException;
+import java.util.Collection;
+
+//NOCOMMIT: Not sure if there isn't a better way of doing this in 4.x. Don't we have a Hadoop Directory impl somewhere?
+
+/**
+ * This class implements a Lucene Directory on top of a general FileSystem.
+ * Currently it does not support locking.
+ * <p/>
+ * // TODO: Rename to FileSystemReadOnlyDirectory
+ */
+public class ReadOnlyFileSystemDirectory extends Directory {
+
+ private final FileSystem fs;
+ private final Path directory;
+ private final int ioFileBufferSize;
+
+ /**
+ * Constructor
+ *
+ * @param fs
+ * @param directory
+ * @param create
+ * @param conf
+ * @throws IOException
+ */
+ public ReadOnlyFileSystemDirectory(FileSystem fs, Path directory, boolean create,
+ Configuration conf) throws IOException {
+
+ this.fs = fs;
+ this.directory = directory;
+ this.ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
+
+ if (create) {
+ create();
+ }
+
+ boolean isDir = false;
+ try {
+ FileStatus status = fs.getFileStatus(directory);
+ if (status != null) {
+ isDir = status.isDir();
+ }
+ } catch (IOException e) {
+ // file does not exist, isDir already set to false
+ }
+ if (!isDir) {
+ throw new IOException(directory + " is not a directory");
+ }
+ }
+
+
+ private void create() throws IOException {
+ if (!fs.exists(directory)) {
+ fs.mkdirs(directory);
+ }
+
+ boolean isDir = false;
+ try {
+ FileStatus status = fs.getFileStatus(directory);
+ if (status != null) {
+ isDir = status.isDir();
+ }
+ } catch (IOException e) {
+ // file does not exist, isDir already set to false
+ }
+ if (!isDir) {
+ throw new IOException(directory + " is not a directory");
+ }
+
+ // clear old index files
+ FileStatus[] fileStatus =
+ fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
+ for (int i = 0; i < fileStatus.length; i++) {
+ if (!fs.delete(fileStatus[i].getPath(), true)) {
+ throw new IOException("Cannot delete index file "
+ + fileStatus[i].getPath());
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#list()
+ */
+ public String[] list() throws IOException {
+ FileStatus[] fileStatus =
+ fs.listStatus(directory, LuceneIndexFileNameFilter.getFilter());
+ String[] result = new String[fileStatus.length];
+ for (int i = 0; i < fileStatus.length; i++) {
+ result[i] = fileStatus[i].getPath().getName();
+ }
+ return result;
+ }
+
+ @Override
+ public String[] listAll() throws IOException {
+ return list();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#fileExists(java.lang.String)
+ */
+ public boolean fileExists(String name) throws IOException {
+ return fs.exists(new Path(directory, name));
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#fileModified(java.lang.String)
+ */
+ public long fileModified(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#touchFile(java.lang.String)
+ */
+ public void touchFile(String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#fileLength(java.lang.String)
+ */
+ public long fileLength(String name) throws IOException {
+ return fs.getFileStatus(new Path(directory, name)).getLen();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#deleteFile(java.lang.String)
+ */
+ public void deleteFile(String name) throws IOException {
+ if (!fs.delete(new Path(directory, name), true)) {
+ throw new IOException("Cannot delete index file " + name);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#renameFile(java.lang.String, java.lang.String)
+ */
+ public void renameFile(String from, String to) throws IOException {
+ fs.rename(new Path(directory, from), new Path(directory, to));
+ }
+
+ @Override
+ public IndexOutput createOutput(String name, IOContext context) throws IOException {
+ //nocommit What should we be doing with the IOContext here?
+ Path file = new Path(directory, name);
+ if (fs.exists(file) && !fs.delete(file, true)) {
+ // delete the existing one if applicable
+ throw new IOException("Cannot overwrite index file " + file);
+ }
+
+ return new FileSystemIndexOutput(file, ioFileBufferSize);
+ }
+
+ @Override
+ public void sync(Collection<String> names) throws IOException {
+
+ }
+
+ @Override
+ public IndexInput openInput(String name, IOContext context) throws IOException {
+ return new FileSystemIndexInput(new Path(directory, name), ioFileBufferSize);
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#makeLock(java.lang.String)
+ */
+ public Lock makeLock(final String name) {
+ return new Lock() {
+ public boolean obtain() {
+ return true;
+ }
+
+ public void release() {
+ }
+
+ public boolean isLocked() {
+ throw new UnsupportedOperationException();
+ }
+
+ public String toString() {
+ return "Lock@" + new Path(directory, name);
+ }
+ };
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.lucene.store.Directory#close()
+ */
+ public void close() throws IOException {
+ // do not close the file system
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ return this.getClass().getName() + "@" + directory;
+ }
+
+ private class FileSystemIndexInput extends BufferedIndexInput {
+
+ // shared by clones
+ private class Descriptor {
+ public final FSDataInputStream in;
+ public long position; // cache of in.getPos()
+
+ public Descriptor(Path file, int ioFileBufferSize) throws IOException {
+ this.in = fs.open(file, ioFileBufferSize);
+ }
+ }
+
+ private final Path filePath; // for debugging
+ private final Descriptor descriptor;
+ private final long length;
+ private boolean isOpen;
+ private boolean isClone;
+
+ public FileSystemIndexInput(Path path, int ioFileBufferSize)
+ throws IOException {
+ super("FSII_" + path.getName(), ioFileBufferSize);
+ filePath = path;
+ descriptor = new Descriptor(path, ioFileBufferSize);
+ length = fs.getFileStatus(path).getLen();
+ isOpen = true;
+ }
+
+ protected void readInternal(byte[] b, int offset, int len)
+ throws IOException {
+ long position = getFilePointer();
+ if (position != descriptor.position) {
+ descriptor.in.seek(position);
+ descriptor.position = position;
+ }
+ int total = 0;
+ do {
+ int i = descriptor.in.read(b, offset + total, len - total);
+ if (i == -1) {
+ throw new IOException("Read past EOF");
+ }
+ descriptor.position += i;
+ total += i;
+ } while (total < len);
+ }
+
+ public void close() throws IOException {
+ if (!isClone) {
+ if (isOpen) {
+ descriptor.in.close();
+ isOpen = false;
+ } else {
+ throw new IOException("Index file " + filePath + " already closed");
+ }
+ }
+ }
+
+ protected void seekInternal(long position) {
+ // handled in readInternal()
+ }
+
+ public long length() {
+ return length;
+ }
+
+ protected void finalize() throws IOException {
+ if (!isClone && isOpen) {
+ close(); // close the file
+ }
+ }
+
+ public BufferedIndexInput clone() {
+ FileSystemIndexInput clone = (FileSystemIndexInput) super.clone();
+ clone.isClone = true;
+ return clone;
+ }
+ }
+
+ private class FileSystemIndexOutput extends BufferedIndexOutput {
+
+ private final Path filePath; // for debugging
+ private final FSDataOutputStream out;
+ private boolean isOpen;
+
+ public FileSystemIndexOutput(Path path, int ioFileBufferSize)
+ throws IOException {
+ filePath = path;
+ // overwrite is true by default
+ out = fs.create(path, true, ioFileBufferSize);
+ isOpen = true;
+ }
+
+ public void flushBuffer(byte[] b, int offset, int size) throws IOException {
+ out.write(b, offset, size);
+ }
+
+ public void close() throws IOException {
+ if (isOpen) {
+ super.close();
+ out.close();
+ isOpen = false;
+ } else {
+ throw new IOException("Index file " + filePath + " already closed");
+ }
+ }
+
+ public void seek(long pos) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ public long length() throws IOException {
+ return out.getPos();
+ }
+
+ protected void finalize() throws IOException {
+ if (isOpen) {
+ close(); // close the file
+ }
+ }
+ }
+
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorage.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.DocumentStoredFieldVisitor;
+import org.apache.lucene.index.AtomicReaderContext;
+import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.SlowCompositeReaderWrapper;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Scorer;
+import org.apache.lucene.search.Weight;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FSDirectory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
+
+/**
+ * Generates a sequence file from a Lucene index with a specified id field as the key and a content field as the value.
+ * Configure this class with a {@link LuceneStorageConfiguration} bean.
+ */
+public class SequenceFilesFromLuceneStorage {
+
+ public static final String SEPARATOR_FIELDS = " ";
+
+ private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromLuceneStorage.class);
+
+ /**
+ * Generates a sequence files from a Lucene index via the given {@link LuceneStorageConfiguration}
+ *
+ * @param lucene2seqConf configuration bean
+ * @throws java.io.IOException if index cannot be opened or sequence file could not be written
+ */
+ public void run(LuceneStorageConfiguration lucene2seqConf) throws IOException {
+ List<Path> indexPaths = lucene2seqConf.getIndexPaths();
+
+ for (Path indexPath : indexPaths) {
+ Directory directory = FSDirectory.open(new File(indexPath.toString()));
+ IndexReader reader = DirectoryReader.open(directory);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Configuration configuration = lucene2seqConf.getConfiguration();
+ FileSystem fileSystem = FileSystem.get(configuration);
+ Path sequenceFilePath = new Path(lucene2seqConf.getSequenceFilesOutputPath(), indexPath);
+ SequenceFile.Writer sequenceFileWriter = new SequenceFile.Writer(fileSystem, configuration, sequenceFilePath, Text.class, Text.class);
+
+ Text key = new Text();
+ Text value = new Text();
+
+ Weight weight = lucene2seqConf.getQuery().createWeight(searcher);
+ //TODO: as the name implies, this is slow, but this is sequential anyway, so not a big deal. Better perf. would be by looping on the segments
+ AtomicReaderContext context = SlowCompositeReaderWrapper.wrap(reader).getContext();
+
+ Scorer scorer = weight.scorer(context, true, false, null);
+
+ if (scorer != null) {
+ int processedDocs = 0;
+ int docId;
+
+ while ((docId = scorer.nextDoc()) != NO_MORE_DOCS && processedDocs < lucene2seqConf.getMaxHits()) {
+ DocumentStoredFieldVisitor storedFieldVisitor = lucene2seqConf.getStoredFieldVisitor();
+ reader.document(docId, storedFieldVisitor);
+ Document doc = storedFieldVisitor.getDocument();
+ String idValue = doc.get(lucene2seqConf.getIdField());
+
+ StringBuilder fieldValueBuilder = new StringBuilder();
+ List<String> fields = lucene2seqConf.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ String field = fields.get(i);
+ String fieldValue = doc.get(field);
+ if (isNotBlank(fieldValue)) {
+ fieldValueBuilder.append(fieldValue);
+ if (i != fields.size() - 1) {
+ fieldValueBuilder.append(SEPARATOR_FIELDS);
+ }
+ }
+ }
+
+ if (isBlank(idValue) || isBlank(fieldValueBuilder.toString())) {
+ continue;
+ }
+
+ key.set(idValue);
+ value.set(fieldValueBuilder.toString());
+
+ sequenceFileWriter.append(key, value);
+
+ processedDocs++;
+ }
+
+ log.info("Wrote " + processedDocs + " documents in " + sequenceFilePath.toUri());
+ } else {
+ Closeables.close(sequenceFileWriter, true);
+ directory.close();
+ //searcher.close();
+ reader.close();
+ throw new RuntimeException("Could not write sequence files. Could not create scorer");
+ }
+
+ Closeables.close(sequenceFileWriter, true);
+ directory.close();
+ //searcher.close();
+ reader.close();
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriver.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.lucene.queryparser.classic.QueryParser;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+/**
+ * Driver class for the lucene2seq program. Converts text contents of stored fields of a lucene index into a Hadoop
+ * SequenceFile. The key of the sequence file is the document ID and the value is the concatenated text of the specified
+ * stored field(s).
+ */
+public class SequenceFilesFromLuceneStorageDriver extends AbstractJob {
+
+ static final String OPTION_LUCENE_DIRECTORY = "dir";
+ static final String OPTION_ID_FIELD = "idField";
+ static final String OPTION_FIELD = "fields";
+ static final String OPTION_QUERY = "query";
+ static final String OPTION_MAX_HITS = "maxHits";
+
+ static final Query DEFAULT_QUERY = new MatchAllDocsQuery();
+ static final int DEFAULT_MAX_HITS = Integer.MAX_VALUE;
+
+ static final String SEPARATOR_FIELDS = ",";
+ static final String QUERY_DELIMITER = "'";
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SequenceFilesFromLuceneStorageDriver(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ addOutputOption();
+
+ addOption(OPTION_LUCENE_DIRECTORY, "d", "Lucene directory / directories. Comma separated.", true);
+ addOption(OPTION_ID_FIELD, "i", "The field in the index containing the id", true);
+ addOption(OPTION_FIELD, "f", "The stored field(s) in the index containing text", true);
+
+ addOption(OPTION_QUERY, "q", "(Optional) Lucene query. Defaults to " + DEFAULT_QUERY.getClass().getSimpleName());
+ addOption(OPTION_MAX_HITS, "n", "(Optional) Max hits. Defaults to " + DEFAULT_MAX_HITS);
+ addOption(DefaultOptionCreator.methodOption().create());
+
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+
+ Configuration configuration = getConf();
+ if (configuration == null) {
+ configuration = new Configuration();
+ }
+
+ String[] paths = getOption(OPTION_LUCENE_DIRECTORY).split(",");
+ List<Path> indexPaths = new ArrayList<Path>();
+ for (String path : paths) {
+ indexPaths.add(new Path(path));
+ }
+
+ Path sequenceFilesOutputPath = new Path((getOption(DefaultOptionCreator.OUTPUT_OPTION)));
+
+ String idField = getOption(OPTION_ID_FIELD);
+ String fields = getOption(OPTION_FIELD);
+
+ LuceneStorageConfiguration lucene2SeqConf = newLucene2SeqConfiguration(configuration,
+ indexPaths,
+ sequenceFilesOutputPath,
+ idField,
+ asList(fields.split(SEPARATOR_FIELDS)));
+
+ Query query = DEFAULT_QUERY;
+ if (hasOption(OPTION_QUERY)) {
+ try {
+ String queryString = getOption(OPTION_QUERY).replaceAll(QUERY_DELIMITER, "");
+ QueryParser queryParser = new QueryParser(Version.LUCENE_35, queryString, new StandardAnalyzer(Version.LUCENE_35));
+ query = queryParser.parse(queryString);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+ lucene2SeqConf.setQuery(query);
+
+ int maxHits = DEFAULT_MAX_HITS;
+ if (hasOption(OPTION_MAX_HITS)) {
+ String maxHitsString = getOption(OPTION_MAX_HITS);
+ maxHits = Integer.valueOf(maxHitsString);
+ }
+ lucene2SeqConf.setMaxHits(maxHits);
+
+ if (hasOption(DefaultOptionCreator.METHOD_OPTION) && getOption(DefaultOptionCreator.METHOD_OPTION).equals("sequential")) {
+ new SequenceFilesFromLuceneStorage().run(lucene2SeqConf);
+ } else {
+ new SequenceFilesFromLuceneStorageMRJob().run(lucene2SeqConf);
+ }
+ return 0;
+ }
+
+ public LuceneStorageConfiguration newLucene2SeqConfiguration(Configuration configuration,
+ List<Path> indexPaths,
+ Path sequenceFilesOutputPath,
+ String idField,
+ List<String> fields) {
+ return new LuceneStorageConfiguration(
+ configuration,
+ indexPaths,
+ sequenceFilesOutputPath,
+ idField,
+ fields);
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJob.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,55 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Generates a sequence file from a Lucene index via MapReduce. Uses a specified id field as the key and a content field as the value.
+ * Configure this class with a {@link LuceneStorageConfiguration} bean.
+ */
+public class SequenceFilesFromLuceneStorageMRJob {
+
+ public void run(LuceneStorageConfiguration lucene2seqConf) {
+ try {
+ Configuration configuration = lucene2seqConf.serialize();
+
+ Job job = new Job(configuration, "LuceneIndexToSequenceFiles: " + lucene2seqConf.getIndexPaths() + " -> M/R -> " + lucene2seqConf.getSequenceFilesOutputPath());
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ job.setMapperClass(SequenceFilesFromLuceneStorageMapper.class);
+
+ job.setInputFormatClass(LuceneSegmentInputFormat.class);
+
+ for (Path indexPath : lucene2seqConf.getIndexPaths()) {
+ FileInputFormat.addInputPath(job, indexPath);
+ }
+
+ FileOutputFormat.setOutputPath(job, lucene2seqConf.getSequenceFilesOutputPath());
+
+ job.setJarByClass(SequenceFilesFromLuceneStorageMRJob.class);
+ job.setNumReduceTasks(0);
+
+ job.waitForCompletion(true);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMapper.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,85 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.SegmentInfoPerCommit;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.store.IOContext;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+/**
+ * Maps document IDs to key value pairs with ID field as the key and the concatenated stored field(s)
+ * as value.
+ */
+public class SequenceFilesFromLuceneStorageMapper extends Mapper<Text, NullWritable, Text, Text> {
+
+ public static final String SEPARATOR_FIELDS = " ";
+ public static final int USE_TERM_INFOS = 1;
+
+ private LuceneStorageConfiguration lucene2SeqConfiguration;
+ private SegmentReader segmentReader;
+
+ private Text idKey;
+ private Text fieldValue;
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration configuration = context.getConfiguration();
+
+ lucene2SeqConfiguration = new LuceneStorageConfiguration(configuration);
+
+ LuceneSegmentInputSplit inputSplit = (LuceneSegmentInputSplit) context.getInputSplit();
+
+ SegmentInfoPerCommit segmentInfo = inputSplit.getSegment(configuration);
+ segmentReader = new SegmentReader(segmentInfo, USE_TERM_INFOS, IOContext.READ);//nocommit: Should we use IOContext.READONCE?
+
+ idKey = new Text();
+ fieldValue = new Text();
+ }
+
+ @Override
+ protected void map(Text key, NullWritable text, Context context) throws IOException, InterruptedException {
+ int docId = Integer.valueOf(key.toString());
+ Document document = segmentReader.document(docId);
+
+ String idString = document.get(lucene2SeqConfiguration.getIdField());
+
+ StringBuilder valueBuilder = new StringBuilder();
+ List<String> fields = lucene2SeqConfiguration.getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ String field = fields.get(i);
+ String fieldValue = document.get(field);
+ if (isNotBlank(fieldValue)) {
+ valueBuilder.append(fieldValue);
+ if (i != fields.size() - 1) {
+ valueBuilder.append(SEPARATOR_FIELDS);
+ }
+ }
+ }
+
+ idKey.set(nullSafe(idString));
+ fieldValue.set(nullSafe(valueBuilder.toString()));
+
+ context.write(idKey, fieldValue);
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ segmentReader.close();
+ }
+
+ private String nullSafe(String value) {
+ if (value == null) {
+ return "";
+ } else {
+ return value;
+ }
+ }
+}
\ No newline at end of file
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/AbstractLuceneStorageTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,60 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.MultipleFieldsDocument;
+import org.apache.mahout.text.doc.NumericFieldDocument;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Abstract test for working with Lucene storage.
+ */
+public abstract class AbstractLuceneStorageTest {
+
+ private Path indexPath = new Path("index");
+
+ protected void commitDocuments(SingleFieldDocument... documents) throws IOException {
+ IndexWriter indexWriter = new IndexWriter(getDirectory(), new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer()));
+
+ for (SingleFieldDocument singleFieldDocument : documents) {
+ indexWriter.addDocument(singleFieldDocument.asLuceneDocument());
+ }
+
+ indexWriter.commit();
+ indexWriter.close();
+ }
+
+ protected void assertSimpleDocumentEquals(SingleFieldDocument expected, Pair<Text, Text> actual) {
+ assertEquals(expected.getId(), actual.getFirst().toString());
+ assertEquals(expected.getField(), actual.getSecond().toString());
+ }
+
+ protected void assertMultipleFieldsDocumentEquals(MultipleFieldsDocument expected, Pair<Text, Text> actual) {
+ assertEquals(expected.getId(), actual.getFirst().toString());
+ assertEquals(expected.getField() + " " + expected.getField1() + " " + expected.getField2(), actual.getSecond().toString());
+ }
+
+ protected void assertNumericFieldEquals(NumericFieldDocument expected, Pair<Text, Text> actual) {
+ assertEquals(expected.getId(), actual.getFirst().toString());
+ assertEquals(expected.getField() + " " + expected.getNumericField(), actual.getSecond().toString());
+ }
+
+ protected FSDirectory getDirectory() throws IOException {
+ return FSDirectory.open(new File(indexPath.toString()));
+ }
+
+ protected Path getIndexPath() {
+ return indexPath;
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputFormatTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,72 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static junit.framework.Assert.assertEquals;
+
+public class LuceneSegmentInputFormatTest {
+
+ private LuceneSegmentInputFormat inputFormat;
+ private JobContext jobContext;
+ private Path indexPath;
+ private Configuration conf;
+ private FSDirectory directory;
+
+ @Before
+ public void before() throws IOException {
+ inputFormat = new LuceneSegmentInputFormat();
+ indexPath = new Path("index");
+
+ LuceneStorageConfiguration lucene2SeqConf = new LuceneStorageConfiguration(new Configuration(), asList(indexPath), new Path("output"), "id", asList("field"));
+ conf = lucene2SeqConf.serialize();
+
+ jobContext = new JobContext(conf, new JobID());
+ directory = FSDirectory.open(new File(indexPath.toString()));
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(conf, indexPath);
+ }
+
+ @Test
+ public void testGetSplits() throws IOException, InterruptedException {
+ SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+ SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+ SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+ List<SingleFieldDocument> documents = asList(doc1, doc2, doc3);
+
+ for (SingleFieldDocument singleFieldDocument : documents) {
+ commitDocument(singleFieldDocument);
+ }
+
+ List<LuceneSegmentInputSplit> splits = inputFormat.getSplits(jobContext);
+ assertEquals(3, splits.size());
+ }
+
+ private void commitDocument(SingleFieldDocument doc) throws IOException {
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer());
+ IndexWriter indexWriter = new IndexWriter(directory, conf);
+ indexWriter.addDocument(doc.asLuceneDocument());
+ indexWriter.commit();
+ indexWriter.close();
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentInputSplitTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,89 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.SegmentInfo;
+import org.apache.lucene.index.SegmentReader;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static junit.framework.Assert.assertEquals;
+
+public class LuceneSegmentInputSplitTest {
+
+ private FSDirectory directory;
+ private Path indexPath;
+ private Configuration conf;
+
+ @Before
+ public void before() throws IOException {
+ indexPath = new Path("index");
+ directory = FSDirectory.open(new File(indexPath.toString()));
+ conf = new Configuration();
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(conf, indexPath);
+ }
+
+ @Test
+ public void testGetSegment() throws Exception {
+ SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+ SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+ SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+ List<SingleFieldDocument> docs = asList(doc1, doc2, doc3);
+ for (SingleFieldDocument doc : docs) {
+ addDocument(doc);
+ }
+
+ assertSegmentContainsOneDoc("_0");
+ assertSegmentContainsOneDoc("_1");
+ assertSegmentContainsOneDoc("_2");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetSegment_nonExistingSegment() throws Exception {
+ SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+ SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+ SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+ List<SingleFieldDocument> docs = asList(doc1, doc2, doc3);
+ for (SingleFieldDocument doc : docs) {
+ addDocument(doc);
+ }
+
+ LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, "_3", 1000);
+ inputSplit.getSegment(conf);
+ }
+
+ private void assertSegmentContainsOneDoc(String segmentName) throws IOException {
+ LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(indexPath, segmentName, 1000);
+ SegmentInfo segment = inputSplit.getSegment(conf);
+ SegmentReader segmentReader = SegmentReader.get(true, segment, 1);
+ assertEquals(segmentName, segment.name);
+ assertEquals(1, segmentReader.numDocs());
+ }
+
+ private void addDocument(SingleFieldDocument doc) throws IOException {
+ IndexWriterConfig conf = new IndexWriterConfig(Version.LUCENE_35, new DefaultAnalyzer());
+ IndexWriter indexWriter = new IndexWriter(directory, conf);
+ indexWriter.addDocument(doc.asLuceneDocument());
+ indexWriter.commit();
+ indexWriter.close();
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneSegmentRecordReaderTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,72 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.lucene.index.*;
+import org.apache.lucene.store.FSDirectory;
+import org.apache.lucene.util.Version;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class LuceneSegmentRecordReaderTest extends AbstractLuceneStorageTest {
+
+ private LuceneSegmentRecordReader recordReader;
+ private Configuration configuration;
+
+ @Before
+ public void before() throws IOException, InterruptedException {
+ LuceneStorageConfiguration lucene2SeqConf = new LuceneStorageConfiguration(new Configuration(), asList(getIndexPath()), new Path("output"), "id", asList("field"));
+ configuration = lucene2SeqConf.serialize();
+
+ SingleFieldDocument doc1 = new SingleFieldDocument("1", "This is simple document 1");
+ SingleFieldDocument doc2 = new SingleFieldDocument("2", "This is simple document 2");
+ SingleFieldDocument doc3 = new SingleFieldDocument("3", "This is simple document 3");
+
+ commitDocuments(doc1, doc2, doc3);
+
+ SegmentInfos segmentInfos = new SegmentInfos();
+ segmentInfos.read(getDirectory());
+
+ SegmentInfo segmentInfo = segmentInfos.asList().get(0);
+ LuceneSegmentInputSplit inputSplit = new LuceneSegmentInputSplit(getIndexPath(), segmentInfo.name, segmentInfo.sizeInBytes(true));
+
+ TaskAttemptContext context = new TaskAttemptContext(configuration, new TaskAttemptID());
+
+ recordReader = new LuceneSegmentRecordReader();
+ recordReader.initialize(inputSplit, context);
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(configuration, getIndexPath());
+ }
+
+ @Test
+ public void testKey() throws Exception {
+ recordReader.nextKeyValue();
+ assertEquals("0", recordReader.getCurrentKey().toString());
+ recordReader.nextKeyValue();
+ assertEquals("1", recordReader.getCurrentKey().toString());
+ recordReader.nextKeyValue();
+ assertEquals("2", recordReader.getCurrentKey().toString());
+ }
+
+ @Test
+ public void testGetCurrentValue() throws Exception {
+ assertEquals(NullWritable.get(), recordReader.getCurrentValue());
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/LuceneStorageConfigurationTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,32 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+
+public class LuceneStorageConfigurationTest {
+
+ @Test
+ public void testSerialization() throws Exception {
+ Configuration conf = new Configuration();
+ Path indexPath = new Path("indexPath");
+ Path outputPath = new Path("outputPath");
+ LuceneStorageConfiguration luceneStorageConf = new LuceneStorageConfiguration(conf, asList(indexPath), outputPath, "id", asList("field"));
+
+ Configuration serializedConf = luceneStorageConf.serialize();
+
+ LuceneStorageConfiguration deserializedConf = new LuceneStorageConfiguration(serializedConf);
+
+ assertEquals(luceneStorageConf, deserializedConf);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSerialization_notSerialized() throws IOException {
+ new LuceneStorageConfiguration(new Configuration());
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageDriverTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.lucene.search.TermQuery;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SequenceFilesFromLuceneStorageDriverTest extends AbstractLuceneStorageTest {
+
+ private SequenceFilesFromLuceneStorageDriver driver;
+ private LuceneStorageConfiguration lucene2SeqConf;
+ private String idField;
+ private List<String> fields;
+ private Path seqFilesOutputPath;
+ private Configuration conf;
+
+ @Before
+ public void before() throws Exception {
+ conf = new Configuration();
+ conf.set("io.serializations", "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ seqFilesOutputPath = new Path("seqfiles");
+ idField = "id";
+ fields = asList("field");
+
+ driver = new SequenceFilesFromLuceneStorageDriver() {
+ @Override
+ public LuceneStorageConfiguration newLucene2SeqConfiguration(Configuration configuration, List<Path> indexPaths, Path seqPath, String idField, List<String> fields) {
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration, indexPaths, seqPath, idField, fields);
+ return lucene2SeqConf;
+ }
+ };
+
+ commitDocuments(new SingleFieldDocument("1", "Mahout is cool"));
+ commitDocuments(new SingleFieldDocument("2", "Mahout is cool"));
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(conf, seqFilesOutputPath);
+ HadoopUtil.delete(conf, getIndexPath());
+ }
+
+ @Test
+ public void testNewLucene2SeqConfiguration() {
+ lucene2SeqConf = driver.newLucene2SeqConfiguration(conf,
+ asList(new Path(getIndexPath().toString())),
+ seqFilesOutputPath,
+ idField,
+ fields);
+
+ assertEquals(conf, lucene2SeqConf.getConfiguration());
+ assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+ assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+ assertEquals(idField, lucene2SeqConf.getIdField());
+ assertEquals(fields, lucene2SeqConf.getFields());
+ }
+
+ @Test
+ public void testRun() throws Exception {
+ String queryField = "queryfield";
+ String queryTerm = "queryterm";
+ String maxHits = "500";
+ String field1 = "field1";
+ String field2 = "field2";
+
+ String[] args = new String[]{
+ "-d", getIndexPath().toString(),
+ "-o", seqFilesOutputPath.toString(),
+ "-i", idField,
+ "-f", field1 + "," + field2,
+ "-q", queryField + ":" + queryTerm,
+ "-n", maxHits,
+ "-xm", "sequential"
+ };
+
+ driver.setConf(conf);
+ driver.run(args);
+
+ assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+ assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+ assertEquals(idField, lucene2SeqConf.getIdField());
+ assertEquals(asList(field1, field2), lucene2SeqConf.getFields());
+
+ assertTrue(lucene2SeqConf.getQuery() instanceof TermQuery);
+ assertEquals(queryField, ((TermQuery) lucene2SeqConf.getQuery()).getTerm().field());
+ assertEquals(queryTerm, ((TermQuery) lucene2SeqConf.getQuery()).getTerm().text());
+ assertEquals(new Integer(maxHits), (Integer) lucene2SeqConf.getMaxHits());
+ }
+
+ @Test
+ public void testRun_optionalArguments() throws Exception {
+ String[] args = new String[]{
+ "-d", getIndexPath().toString(),
+ "-o", seqFilesOutputPath.toString(),
+ "-i", idField,
+ "-f", StringUtils.join(fields, SequenceFilesFromLuceneStorageDriver.SEPARATOR_FIELDS)
+ };
+
+ driver.setConf(conf);
+ driver.run(args);
+
+ assertEquals(asList(getIndexPath()), lucene2SeqConf.getIndexPaths());
+ assertEquals(seqFilesOutputPath, lucene2SeqConf.getSequenceFilesOutputPath());
+ assertEquals(idField, lucene2SeqConf.getIdField());
+ assertEquals(fields, lucene2SeqConf.getFields());
+ assertEquals(conf, lucene2SeqConf.getConfiguration());
+
+ assertEquals(SequenceFilesFromLuceneStorageDriver.DEFAULT_QUERY, lucene2SeqConf.getQuery());
+ assertEquals(SequenceFilesFromLuceneStorageDriver.DEFAULT_MAX_HITS, lucene2SeqConf.getMaxHits());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testRun_invalidQuery() throws Exception {
+ String[] args = new String[]{
+ "-d", getIndexPath().toString(),
+ "-o", seqFilesOutputPath.toString(),
+ "-i", idField,
+ "-f", StringUtils.join(fields, SequenceFilesFromLuceneStorageDriver.SEPARATOR_FIELDS),
+ "-q", "inva:lid:query"
+ };
+
+ driver.setConf(conf);
+ driver.run(args);
+ }
+
+ @Test
+ public void testHelp() throws Exception {
+ driver = new SequenceFilesFromLuceneStorageDriver();
+ driver.run(new String[]{"--help"});
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageMRJobTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,64 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.vectorizer.DefaultAnalyzer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class SequenceFilesFromLuceneStorageMRJobTest extends AbstractLuceneStorageTest {
+
+ private SequenceFilesFromLuceneStorageMRJob lucene2seq;
+ private LuceneStorageConfiguration lucene2SeqConf;
+ private SingleFieldDocument document1;
+ private SingleFieldDocument document2;
+ private SingleFieldDocument document3;
+ private SingleFieldDocument document4;
+
+ @Before
+ public void before() {
+ lucene2seq = new SequenceFilesFromLuceneStorageMRJob();
+
+ Configuration configuration = new Configuration();
+ Path seqOutputPath = new Path("seqOutputPath");
+
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration, asList(getIndexPath()), seqOutputPath, SingleFieldDocument.ID_FIELD, asList(SingleFieldDocument.FIELD));
+
+ document1 = new SingleFieldDocument("1", "This is test document 1");
+ document2 = new SingleFieldDocument("2", "This is test document 2");
+ document3 = new SingleFieldDocument("3", "This is test document 3");
+ document4 = new SingleFieldDocument("4", "This is test document 4");
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getSequenceFilesOutputPath());
+ HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getIndexPaths());
+ }
+
+ @Test
+ public void testRun() throws IOException {
+ commitDocuments(document1, document2, document3, document4);
+
+ lucene2seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(document1, iterator.next());
+ assertSimpleDocumentEquals(document2, iterator.next());
+ assertSimpleDocumentEquals(document3, iterator.next());
+ assertSimpleDocumentEquals(document4, iterator.next());
+ }
+}
Added: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java?rev=1490329&view=auto
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java (added)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromLuceneStorageTest.java Thu Jun 6 15:51:06 2013
@@ -0,0 +1,197 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.text.doc.MultipleFieldsDocument;
+import org.apache.mahout.text.doc.NumericFieldDocument;
+import org.apache.mahout.text.doc.SingleFieldDocument;
+import org.apache.mahout.text.doc.UnstoredFieldsDocument;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertFalse;
+
+public class SequenceFilesFromLuceneStorageTest extends AbstractLuceneStorageTest {
+
+ private SequenceFilesFromLuceneStorage lucene2Seq;
+ private LuceneStorageConfiguration lucene2SeqConf;
+
+ private SingleFieldDocument document1;
+ private SingleFieldDocument document2;
+ private SingleFieldDocument document3;
+ private Path seqFilesOutputPath;
+ private Configuration configuration;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void before() throws IOException {
+ configuration = new Configuration();
+ seqFilesOutputPath = new Path("seqfiles");
+
+ lucene2Seq = new SequenceFilesFromLuceneStorage();
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+ asList(getIndexPath()),
+ seqFilesOutputPath,
+ SingleFieldDocument.ID_FIELD,
+ asList(SingleFieldDocument.FIELD));
+
+ document1 = new SingleFieldDocument("1", "This is test document 1");
+ document2 = new SingleFieldDocument("2", "This is test document 2");
+ document3 = new SingleFieldDocument("3", "This is test document 3");
+ }
+
+ @After
+ public void after() throws IOException {
+ HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getSequenceFilesOutputPath());
+ HadoopUtil.delete(lucene2SeqConf.getConfiguration(), lucene2SeqConf.getIndexPaths());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun() throws Exception {
+ commitDocuments(document1, document2, document3);
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(document1, iterator.next());
+ assertSimpleDocumentEquals(document2, iterator.next());
+ assertSimpleDocumentEquals(document3, iterator.next());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun_skipEmptyIdFieldDocs() throws IOException {
+ commitDocuments(document1, new SingleFieldDocument("", "This is a test document with no id"), document2);
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(document1, iterator.next());
+ assertSimpleDocumentEquals(document2, iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun_skipEmptyFieldDocs() throws IOException {
+ commitDocuments(document1, new SingleFieldDocument("4", ""), document2);
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(document1, iterator.next());
+ assertSimpleDocumentEquals(document2, iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun_skipUnstoredFields() throws IOException {
+ commitDocuments(new UnstoredFieldsDocument("5", "This is test document 5"));
+
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+ asList(getIndexPath()),
+ seqFilesOutputPath,
+ SingleFieldDocument.ID_FIELD,
+ asList(UnstoredFieldsDocument.FIELD, UnstoredFieldsDocument.UNSTORED_FIELD));
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertFalse(iterator.next().getSecond().toString().contains("null"));
+ assertFalse(iterator.hasNext());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun_maxHits() throws IOException {
+ commitDocuments(document1, document2, document3, new SingleFieldDocument("4", "This is test document 4"));
+
+ lucene2SeqConf.setMaxHits(3);
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(document1, iterator.next());
+ assertSimpleDocumentEquals(document2, iterator.next());
+ assertSimpleDocumentEquals(document3, iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testRun_query() throws IOException {
+ commitDocuments(document1, document2, document3, new SingleFieldDocument("4", "Mahout is cool"));
+
+ Query query = new TermQuery(new Term(lucene2SeqConf.getFields().get(0), "mahout"));
+
+ lucene2SeqConf.setQuery(query);
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertSimpleDocumentEquals(new SingleFieldDocument("4", "Mahout is cool"), iterator.next());
+ assertFalse(iterator.hasNext());
+ }
+
+ @Test
+ public void testRun_multipleFields() throws IOException {
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+ asList(getIndexPath()),
+ seqFilesOutputPath,
+ SingleFieldDocument.ID_FIELD,
+ asList(MultipleFieldsDocument.FIELD, MultipleFieldsDocument.FIELD1, MultipleFieldsDocument.FIELD2));
+
+ MultipleFieldsDocument multipleFieldsDocument1 = new MultipleFieldsDocument("1", "This is field 1-1", "This is field 1-2", "This is field 1-3");
+ MultipleFieldsDocument multipleFieldsDocument2 = new MultipleFieldsDocument("2", "This is field 2-1", "This is field 2-2", "This is field 2-3");
+ MultipleFieldsDocument multipleFieldsDocument3 = new MultipleFieldsDocument("3", "This is field 3-1", "This is field 3-2", "This is field 3-3");
+ commitDocuments(multipleFieldsDocument1, multipleFieldsDocument2, multipleFieldsDocument3);
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertMultipleFieldsDocumentEquals(multipleFieldsDocument1, iterator.next());
+ assertMultipleFieldsDocumentEquals(multipleFieldsDocument2, iterator.next());
+ assertMultipleFieldsDocumentEquals(multipleFieldsDocument3, iterator.next());
+ }
+
+ @Test
+ public void testRun_numericField() throws IOException {
+ lucene2SeqConf = new LuceneStorageConfiguration(configuration,
+ asList(getIndexPath()),
+ seqFilesOutputPath,
+ SingleFieldDocument.ID_FIELD,
+ asList(NumericFieldDocument.FIELD, NumericFieldDocument.NUMERIC_FIELD));
+
+ NumericFieldDocument doc1 = new NumericFieldDocument("1", "This is field 1", 100);
+ NumericFieldDocument doc2 = new NumericFieldDocument("2", "This is field 2", 200);
+ NumericFieldDocument doc3 = new NumericFieldDocument("3", "This is field 3", 300);
+
+ commitDocuments(doc1, doc2, doc3);
+
+ lucene2Seq.run(lucene2SeqConf);
+
+ Iterator<Pair<Text, Text>> iterator = lucene2SeqConf.getSequenceFileIterator();
+
+ assertNumericFieldEquals(doc1, iterator.next());
+ assertNumericFieldEquals(doc2, iterator.next());
+ assertNumericFieldEquals(doc3, iterator.next());
+ }
+}
Modified: mahout/trunk/pom.xml
URL: http://svn.apache.org/viewvc/mahout/trunk/pom.xml?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/pom.xml (original)
+++ mahout/trunk/pom.xml Thu Jun 6 15:51:06 2013
@@ -100,7 +100,7 @@
<maven.clover.multiproject>true</maven.clover.multiproject>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hadoop.version>1.1.2</hadoop.version>
- <lucene.version>4.2.1</lucene.version>
+ <lucene.version>4.3.0</lucene.version>
</properties>
<issueManagement>
<system>Jira</system>
Modified: mahout/trunk/src/conf/driver.classes.default.props
URL: http://svn.apache.org/viewvc/mahout/trunk/src/conf/driver.classes.default.props?rev=1490329&r1=1490328&r2=1490329&view=diff
==============================================================================
--- mahout/trunk/src/conf/driver.classes.default.props (original)
+++ mahout/trunk/src/conf/driver.classes.default.props Thu Jun 6 15:51:06 2013
@@ -13,6 +13,7 @@ org.apache.mahout.vectorizer.SparseVecto
org.apache.mahout.vectorizer.EncodedVectorsFromSequenceFiles = seq2encoded: Encoded Sparse Vector generation from Text sequence files
org.apache.mahout.text.WikipediaToSequenceFile = seqwiki : Wikipedia xml dump to sequence file
org.apache.mahout.text.SequenceFilesFromMailArchives = seqmailarchives : Creates SequenceFile from a directory containing gzipped mail archives
+org.apache.mahout.text.LuceneIndexToSequenceFilesDriver = lucene2seq : Generate Text SequenceFiles from a Lucene index
#Math
org.apache.mahout.math.hadoop.TransposeJob = transpose : Take the transpose of a matrix