You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:53 UTC
svn commit: r1398581 [8/9] - in
/hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ bin/ conf/
hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-...
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Tue Oct 16 00:02:55 2012
@@ -1,256 +1,256 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
-import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-/**
- * This class provides the getters and the setters to a number of parameters.
- * Most of the parameters are related to the index update and the rest are
- * from the existing Map/Reduce parameters.
- */
-public class IndexUpdateConfiguration {
- final Configuration conf;
-
- /**
- * Constructor
- * @param conf
- */
- public IndexUpdateConfiguration(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Get the underlying configuration object.
- * @return the configuration
- */
- public Configuration getConfiguration() {
- return conf;
- }
-
- //
- // existing map/reduce properties
- //
- // public int getIOFileBufferSize() {
- // return getInt("io.file.buffer.size", 4096);
- // }
-
- /**
- * Get the IO sort space in MB.
- * @return the IO sort space in MB
- */
- public int getIOSortMB() {
- return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
- }
-
- /**
- * Set the IO sort space in MB.
- * @param mb the IO sort space in MB
- */
- public void setIOSortMB(int mb) {
- conf.setInt(MRJobConfig.IO_SORT_MB, mb);
- }
-
- /**
- * Get the Map/Reduce temp directory.
- * @return the Map/Reduce temp directory
- */
- public String getMapredTempDir() {
- return conf.get(MRConfig.TEMP_DIR);
- }
-
- //
- // properties for index update
- //
- /**
- * Get the distribution policy class.
- * @return the distribution policy class
- */
- public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
- return conf.getClass("sea.distribution.policy",
- HashingDistributionPolicy.class, IDistributionPolicy.class);
- }
-
- /**
- * Set the distribution policy class.
- * @param theClass the distribution policy class
- */
- public void setDistributionPolicyClass(
- Class<? extends IDistributionPolicy> theClass) {
- conf.setClass("sea.distribution.policy", theClass,
- IDistributionPolicy.class);
- }
-
- /**
- * Get the analyzer class.
- * @return the analyzer class
- */
- public Class<? extends Analyzer> getDocumentAnalyzerClass() {
- return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
- Analyzer.class);
- }
-
- /**
- * Set the analyzer class.
- * @param theClass the analyzer class
- */
- public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
- conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
- }
-
- /**
- * Get the index input format class.
- * @return the index input format class
- */
- public Class<? extends InputFormat> getIndexInputFormatClass() {
- return conf.getClass("sea.input.format", LineDocInputFormat.class,
- InputFormat.class);
- }
-
- /**
- * Set the index input format class.
- * @param theClass the index input format class
- */
- public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
- conf.setClass("sea.input.format", theClass, InputFormat.class);
- }
-
- /**
- * Get the index updater class.
- * @return the index updater class
- */
- public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
- return conf.getClass("sea.index.updater", IndexUpdater.class,
- IIndexUpdater.class);
- }
-
- /**
- * Set the index updater class.
- * @param theClass the index updater class
- */
- public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
- conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
- }
-
- /**
- * Get the local analysis class.
- * @return the local analysis class
- */
- public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
- return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
- ILocalAnalysis.class);
- }
-
- /**
- * Set the local analysis class.
- * @param theClass the local analysis class
- */
- public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
- conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
- }
-
- /**
- * Get the string representation of a number of shards.
- * @return the string representation of a number of shards
- */
- public String getIndexShards() {
- return conf.get("sea.index.shards");
- }
-
- /**
- * Set the string representation of a number of shards.
- * @param shards the string representation of a number of shards
- */
- public void setIndexShards(String shards) {
- conf.set("sea.index.shards", shards);
- }
-
- /**
- * Get the max field length for a Lucene instance.
- * @return the max field length for a Lucene instance
- */
- public int getIndexMaxFieldLength() {
- return conf.getInt("sea.max.field.length", -1);
- }
-
- /**
- * Set the max field length for a Lucene instance.
- * @param maxFieldLength the max field length for a Lucene instance
- */
- public void setIndexMaxFieldLength(int maxFieldLength) {
- conf.setInt("sea.max.field.length", maxFieldLength);
- }
-
- /**
- * Get the max number of segments for a Lucene instance.
- * @return the max number of segments for a Lucene instance
- */
- public int getIndexMaxNumSegments() {
- return conf.getInt("sea.max.num.segments", -1);
- }
-
- /**
- * Set the max number of segments for a Lucene instance.
- * @param maxNumSegments the max number of segments for a Lucene instance
- */
- public void setIndexMaxNumSegments(int maxNumSegments) {
- conf.setInt("sea.max.num.segments", maxNumSegments);
- }
-
- /**
- * Check whether to use the compound file format for a Lucene instance.
- * @return true if using the compound file format for a Lucene instance
- */
- public boolean getIndexUseCompoundFile() {
- return conf.getBoolean("sea.use.compound.file", false);
- }
-
- /**
- * Set whether use the compound file format for a Lucene instance.
- * @param useCompoundFile whether to use the compound file format
- */
- public void setIndexUseCompoundFile(boolean useCompoundFile) {
- conf.setBoolean("sea.use.compound.file", useCompoundFile);
- }
-
- /**
- * Get the max ram index size in bytes. The default is 50M.
- * @return the max ram index size in bytes
- */
- public long getMaxRAMSizeInBytes() {
- return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
- }
-
- /**
- * Set the max ram index size in bytes.
- * @param b the max ram index size in bytes
- */
- public void setMaxRAMSizeInBytes(long b) {
- conf.setLong("sea.max.ramsize.bytes", b);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
+import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * This class provides the getters and the setters to a number of parameters.
+ * Most of the parameters are related to the index update and the rest are
+ * from the existing Map/Reduce parameters.
+ */
+public class IndexUpdateConfiguration {
+ final Configuration conf;
+
+ /**
+ * Constructor
+ * @param conf
+ */
+ public IndexUpdateConfiguration(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Get the underlying configuration object.
+ * @return the configuration
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ //
+ // existing map/reduce properties
+ //
+ // public int getIOFileBufferSize() {
+ // return getInt("io.file.buffer.size", 4096);
+ // }
+
+ /**
+ * Get the IO sort space in MB.
+ * @return the IO sort space in MB
+ */
+ public int getIOSortMB() {
+ return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
+ }
+
+ /**
+ * Set the IO sort space in MB.
+ * @param mb the IO sort space in MB
+ */
+ public void setIOSortMB(int mb) {
+ conf.setInt(MRJobConfig.IO_SORT_MB, mb);
+ }
+
+ /**
+ * Get the Map/Reduce temp directory.
+ * @return the Map/Reduce temp directory
+ */
+ public String getMapredTempDir() {
+ return conf.get(MRConfig.TEMP_DIR);
+ }
+
+ //
+ // properties for index update
+ //
+ /**
+ * Get the distribution policy class.
+ * @return the distribution policy class
+ */
+ public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
+ return conf.getClass("sea.distribution.policy",
+ HashingDistributionPolicy.class, IDistributionPolicy.class);
+ }
+
+ /**
+ * Set the distribution policy class.
+ * @param theClass the distribution policy class
+ */
+ public void setDistributionPolicyClass(
+ Class<? extends IDistributionPolicy> theClass) {
+ conf.setClass("sea.distribution.policy", theClass,
+ IDistributionPolicy.class);
+ }
+
+ /**
+ * Get the analyzer class.
+ * @return the analyzer class
+ */
+ public Class<? extends Analyzer> getDocumentAnalyzerClass() {
+ return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
+ Analyzer.class);
+ }
+
+ /**
+ * Set the analyzer class.
+ * @param theClass the analyzer class
+ */
+ public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
+ conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
+ }
+
+ /**
+ * Get the index input format class.
+ * @return the index input format class
+ */
+ public Class<? extends InputFormat> getIndexInputFormatClass() {
+ return conf.getClass("sea.input.format", LineDocInputFormat.class,
+ InputFormat.class);
+ }
+
+ /**
+ * Set the index input format class.
+ * @param theClass the index input format class
+ */
+ public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
+ conf.setClass("sea.input.format", theClass, InputFormat.class);
+ }
+
+ /**
+ * Get the index updater class.
+ * @return the index updater class
+ */
+ public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
+ return conf.getClass("sea.index.updater", IndexUpdater.class,
+ IIndexUpdater.class);
+ }
+
+ /**
+ * Set the index updater class.
+ * @param theClass the index updater class
+ */
+ public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
+ conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
+ }
+
+ /**
+ * Get the local analysis class.
+ * @return the local analysis class
+ */
+ public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
+ return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
+ ILocalAnalysis.class);
+ }
+
+ /**
+ * Set the local analysis class.
+ * @param theClass the local analysis class
+ */
+ public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
+ conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
+ }
+
+ /**
+ * Get the string representation of a number of shards.
+ * @return the string representation of a number of shards
+ */
+ public String getIndexShards() {
+ return conf.get("sea.index.shards");
+ }
+
+ /**
+ * Set the string representation of a number of shards.
+ * @param shards the string representation of a number of shards
+ */
+ public void setIndexShards(String shards) {
+ conf.set("sea.index.shards", shards);
+ }
+
+ /**
+ * Get the max field length for a Lucene instance.
+ * @return the max field length for a Lucene instance
+ */
+ public int getIndexMaxFieldLength() {
+ return conf.getInt("sea.max.field.length", -1);
+ }
+
+ /**
+ * Set the max field length for a Lucene instance.
+ * @param maxFieldLength the max field length for a Lucene instance
+ */
+ public void setIndexMaxFieldLength(int maxFieldLength) {
+ conf.setInt("sea.max.field.length", maxFieldLength);
+ }
+
+ /**
+ * Get the max number of segments for a Lucene instance.
+ * @return the max number of segments for a Lucene instance
+ */
+ public int getIndexMaxNumSegments() {
+ return conf.getInt("sea.max.num.segments", -1);
+ }
+
+ /**
+ * Set the max number of segments for a Lucene instance.
+ * @param maxNumSegments the max number of segments for a Lucene instance
+ */
+ public void setIndexMaxNumSegments(int maxNumSegments) {
+ conf.setInt("sea.max.num.segments", maxNumSegments);
+ }
+
+ /**
+ * Check whether to use the compound file format for a Lucene instance.
+ * @return true if using the compound file format for a Lucene instance
+ */
+ public boolean getIndexUseCompoundFile() {
+ return conf.getBoolean("sea.use.compound.file", false);
+ }
+
+ /**
+ * Set whether use the compound file format for a Lucene instance.
+ * @param useCompoundFile whether to use the compound file format
+ */
+ public void setIndexUseCompoundFile(boolean useCompoundFile) {
+ conf.setBoolean("sea.use.compound.file", useCompoundFile);
+ }
+
+ /**
+ * Get the max ram index size in bytes. The default is 50M.
+ * @return the max ram index size in bytes
+ */
+ public long getMaxRAMSizeInBytes() {
+ return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
+ }
+
+ /**
+ * Set the max ram index size in bytes.
+ * @param b the max ram index size in bytes
+ */
+ public void setMaxRAMSizeInBytes(long b) {
+ conf.setLong("sea.max.ramsize.bytes", b);
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java Tue Oct 16 00:02:55 2012
@@ -1,199 +1,199 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.lucene.analysis.Analyzer;
-
-/**
- * This class applies local analysis on a key-value pair and then convert the
- * result docid-operation pair to a shard-and-intermediate form pair.
- */
-public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
- extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
- static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
-
- /**
- * Get the map output key class.
- * @return the map output key class
- */
- public static Class<? extends WritableComparable> getMapOutputKeyClass() {
- return Shard.class;
- }
-
- /**
- * Get the map output value class.
- * @return the map output value class
- */
- public static Class<? extends Writable> getMapOutputValueClass() {
- return IntermediateForm.class;
- }
-
- IndexUpdateConfiguration iconf;
- private Analyzer analyzer;
- private Shard[] shards;
- private IDistributionPolicy distributionPolicy;
-
- private ILocalAnalysis<K, V> localAnalysis;
- private DocumentID tmpKey;
- private DocumentAndOp tmpValue;
-
- private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
- new OutputCollector<DocumentID, DocumentAndOp>() {
- public void collect(DocumentID key, DocumentAndOp value)
- throws IOException {
- tmpKey = key;
- tmpValue = value;
- }
- };
-
- /**
- * Map a key-value pair to a shard-and-intermediate form pair. Internally,
- * the local analysis is first applied to map the key-value pair to a
- * document id-and-operation pair, then the docid-and-operation pair is
- * mapped to a shard-intermediate form pair. The intermediate form is of the
- * form of a single-document ram index and/or a single delete term.
- */
- public void map(K key, V value,
- OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
- throws IOException {
-
- synchronized (this) {
- localAnalysis.map(key, value, tmpCollector, reporter);
-
- if (tmpKey != null && tmpValue != null) {
- DocumentAndOp doc = tmpValue;
- IntermediateForm form = new IntermediateForm();
- form.configure(iconf);
- form.process(doc, analyzer);
- form.closeWriter();
-
- if (doc.getOp() == DocumentAndOp.Op.INSERT) {
- int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
- if (chosenShard >= 0) {
- // insert into one shard
- output.collect(shards[chosenShard], form);
- } else {
- throw new IOException("Chosen shard for insert must be >= 0");
- }
-
- } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
- int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
- if (chosenShard >= 0) {
- // delete from one shard
- output.collect(shards[chosenShard], form);
- } else {
- // broadcast delete to all shards
- for (int i = 0; i < shards.length; i++) {
- output.collect(shards[i], form);
- }
- }
-
- } else { // UPDATE
- int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
- int deleteFromShard =
- distributionPolicy.chooseShardForDelete(tmpKey);
-
- if (insertToShard >= 0) {
- if (insertToShard == deleteFromShard) {
- // update into one shard
- output.collect(shards[insertToShard], form);
- } else {
- // prepare a deletion form
- IntermediateForm deletionForm = new IntermediateForm();
- deletionForm.configure(iconf);
- deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
- doc.getTerm()), analyzer);
- deletionForm.closeWriter();
-
- if (deleteFromShard >= 0) {
- // delete from one shard
- output.collect(shards[deleteFromShard], deletionForm);
- } else {
- // broadcast delete to all shards
- for (int i = 0; i < shards.length; i++) {
- output.collect(shards[i], deletionForm);
- }
- }
-
- // prepare an insertion form
- IntermediateForm insertionForm = new IntermediateForm();
- insertionForm.configure(iconf);
- insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
- doc.getDocument()), analyzer);
- insertionForm.closeWriter();
-
- // insert into one shard
- output.collect(shards[insertToShard], insertionForm);
- }
- } else {
- throw new IOException("Chosen shard for insert must be >= 0");
- }
- }
- }
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- analyzer =
- (Analyzer) ReflectionUtils.newInstance(
- iconf.getDocumentAnalyzerClass(), job);
-
- localAnalysis =
- (ILocalAnalysis) ReflectionUtils.newInstance(
- iconf.getLocalAnalysisClass(), job);
- localAnalysis.configure(job);
-
- shards = Shard.getIndexShards(iconf);
-
- distributionPolicy =
- (IDistributionPolicy) ReflectionUtils.newInstance(
- iconf.getDistributionPolicyClass(), job);
- distributionPolicy.init(shards);
-
- LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
- LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
- LOG.info(shards.length + " shards = " + iconf.getIndexShards());
- LOG.info("sea.distribution.policy = "
- + distributionPolicy.getClass().getName());
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- localAnalysis.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.analysis.Analyzer;
+
+/**
+ * This class applies local analysis on a key-value pair and then convert the
+ * result docid-operation pair to a shard-and-intermediate form pair.
+ */
+public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
+ extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
+
+ /**
+ * Get the map output key class.
+ * @return the map output key class
+ */
+ public static Class<? extends WritableComparable> getMapOutputKeyClass() {
+ return Shard.class;
+ }
+
+ /**
+ * Get the map output value class.
+ * @return the map output value class
+ */
+ public static Class<? extends Writable> getMapOutputValueClass() {
+ return IntermediateForm.class;
+ }
+
+ IndexUpdateConfiguration iconf;
+ private Analyzer analyzer;
+ private Shard[] shards;
+ private IDistributionPolicy distributionPolicy;
+
+ private ILocalAnalysis<K, V> localAnalysis;
+ private DocumentID tmpKey;
+ private DocumentAndOp tmpValue;
+
+ private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
+ new OutputCollector<DocumentID, DocumentAndOp>() {
+ public void collect(DocumentID key, DocumentAndOp value)
+ throws IOException {
+ tmpKey = key;
+ tmpValue = value;
+ }
+ };
+
+ /**
+ * Map a key-value pair to a shard-and-intermediate form pair. Internally,
+ * the local analysis is first applied to map the key-value pair to a
+ * document id-and-operation pair, then the docid-and-operation pair is
+ * mapped to a shard-intermediate form pair. The intermediate form is of the
+ * form of a single-document ram index and/or a single delete term.
+ */
+ public void map(K key, V value,
+ OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
+ throws IOException {
+
+ synchronized (this) {
+ localAnalysis.map(key, value, tmpCollector, reporter);
+
+ if (tmpKey != null && tmpValue != null) {
+ DocumentAndOp doc = tmpValue;
+ IntermediateForm form = new IntermediateForm();
+ form.configure(iconf);
+ form.process(doc, analyzer);
+ form.closeWriter();
+
+ if (doc.getOp() == DocumentAndOp.Op.INSERT) {
+ int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
+ if (chosenShard >= 0) {
+ // insert into one shard
+ output.collect(shards[chosenShard], form);
+ } else {
+ throw new IOException("Chosen shard for insert must be >= 0");
+ }
+
+ } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
+ int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
+ if (chosenShard >= 0) {
+ // delete from one shard
+ output.collect(shards[chosenShard], form);
+ } else {
+ // broadcast delete to all shards
+ for (int i = 0; i < shards.length; i++) {
+ output.collect(shards[i], form);
+ }
+ }
+
+ } else { // UPDATE
+ int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
+ int deleteFromShard =
+ distributionPolicy.chooseShardForDelete(tmpKey);
+
+ if (insertToShard >= 0) {
+ if (insertToShard == deleteFromShard) {
+ // update into one shard
+ output.collect(shards[insertToShard], form);
+ } else {
+ // prepare a deletion form
+ IntermediateForm deletionForm = new IntermediateForm();
+ deletionForm.configure(iconf);
+ deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
+ doc.getTerm()), analyzer);
+ deletionForm.closeWriter();
+
+ if (deleteFromShard >= 0) {
+ // delete from one shard
+ output.collect(shards[deleteFromShard], deletionForm);
+ } else {
+ // broadcast delete to all shards
+ for (int i = 0; i < shards.length; i++) {
+ output.collect(shards[i], deletionForm);
+ }
+ }
+
+ // prepare an insertion form
+ IntermediateForm insertionForm = new IntermediateForm();
+ insertionForm.configure(iconf);
+ insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
+ doc.getDocument()), analyzer);
+ insertionForm.closeWriter();
+
+ // insert into one shard
+ output.collect(shards[insertToShard], insertionForm);
+ }
+ } else {
+ throw new IOException("Chosen shard for insert must be >= 0");
+ }
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ analyzer =
+ (Analyzer) ReflectionUtils.newInstance(
+ iconf.getDocumentAnalyzerClass(), job);
+
+ localAnalysis =
+ (ILocalAnalysis) ReflectionUtils.newInstance(
+ iconf.getLocalAnalysisClass(), job);
+ localAnalysis.configure(job);
+
+ shards = Shard.getIndexShards(iconf);
+
+ distributionPolicy =
+ (IDistributionPolicy) ReflectionUtils.newInstance(
+ iconf.getDistributionPolicyClass(), job);
+ distributionPolicy.init(shards);
+
+ LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
+ LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
+ LOG.info(shards.length + " shards = " + iconf.getIndexShards());
+ LOG.info("sea.distribution.policy = "
+ + distributionPolicy.getClass().getName());
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ localAnalysis.close();
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java Tue Oct 16 00:02:55 2012
@@ -1,60 +1,60 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-/**
- * This partitioner class puts the values of the same key - in this case the
- * same shard - in the same partition.
- */
-public class IndexUpdatePartitioner implements
- Partitioner<Shard, IntermediateForm> {
-
- private Shard[] shards;
- private Map<Shard, Integer> map;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
- */
- public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
- int partition = map.get(key).intValue();
- if (partition < numPartitions) {
- return partition;
- } else {
- return numPartitions - 1;
- }
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
- map = new HashMap<Shard, Integer>();
- for (int i = 0; i < shards.length; i++) {
- map.put(shards[i], i);
- }
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * This partitioner class puts the values of the same key - in this case the
+ * same shard - in the same partition.
+ */
+public class IndexUpdatePartitioner implements
+ Partitioner<Shard, IntermediateForm> {
+
+ private Shard[] shards;
+ private Map<Shard, Integer> map;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
+ */
+ public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
+ int partition = map.get(key).intValue();
+ if (partition < numPartitions) {
+ return partition;
+ } else {
+ return numPartitions - 1;
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
+ map = new HashMap<Shard, Integer>();
+ for (int i = 0; i < shards.length; i++) {
+ map.put(shards[i], i);
+ }
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java Tue Oct 16 00:02:55 2012
@@ -1,143 +1,143 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.contrib.index.lucene.ShardWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Closeable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This reducer applies to a shard the changes for it. A "new version" of
- * a shard is created at the end of a reduce. It is important to note that
- * the new version of the shard is not derived from scratch. By leveraging
- * Lucene's update algorithm, the new version of each Lucene instance will
- * share as many files as possible as the previous version.
- */
-public class IndexUpdateReducer extends MapReduceBase implements
- Reducer<Shard, IntermediateForm, Shard, Text> {
- static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
- static final Text DONE = new Text("done");
-
- /**
- * Get the reduce output key class.
- * @return the reduce output key class
- */
- public static Class<? extends WritableComparable> getOutputKeyClass() {
- return Shard.class;
- }
-
- /**
- * Get the reduce output value class.
- * @return the reduce output value class
- */
- public static Class<? extends Writable> getOutputValueClass() {
- return Text.class;
- }
-
- private IndexUpdateConfiguration iconf;
- private String mapredTempDir;
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
- */
- public void reduce(Shard key, Iterator<IntermediateForm> values,
- OutputCollector<Shard, Text> output, Reporter reporter)
- throws IOException {
-
- LOG.info("Construct a shard writer for " + key);
- FileSystem fs = FileSystem.get(iconf.getConfiguration());
- String temp =
- mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
- final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
-
- // update the shard
- while (values.hasNext()) {
- IntermediateForm form = values.next();
- writer.process(form);
- reporter.progress();
- }
-
- // close the shard
- final Reporter fReporter = reporter;
- new Closeable() {
- volatile boolean closed = false;
-
- public void close() throws IOException {
- // spawn a thread to give progress heartbeats
- Thread prog = new Thread() {
- public void run() {
- while (!closed) {
- try {
- fReporter.setStatus("closing");
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- continue;
- } catch (Throwable e) {
- return;
- }
- }
- }
- };
-
- try {
- prog.start();
-
- if (writer != null) {
- writer.close();
- }
- } finally {
- closed = true;
- }
- }
- }.close();
- LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
-
- output.collect(key, DONE);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
- */
- public void configure(JobConf job) {
- iconf = new IndexUpdateConfiguration(job);
- mapredTempDir = iconf.getMapredTempDir();
- mapredTempDir = Shard.normalizePath(mapredTempDir);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.mapred.MapReduceBase#close()
- */
- public void close() throws IOException {
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.contrib.index.lucene.ShardWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This reducer applies to a shard the changes for it. A "new version" of
+ * a shard is created at the end of a reduce. It is important to note that
+ * the new version of the shard is not derived from scratch. By leveraging
+ * Lucene's update algorithm, the new version of each Lucene instance will
+ * share as many files as possible as the previous version.
+ */
+public class IndexUpdateReducer extends MapReduceBase implements
+ Reducer<Shard, IntermediateForm, Shard, Text> {
+ static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
+ static final Text DONE = new Text("done");
+
+ /**
+ * Get the reduce output key class.
+ * @return the reduce output key class
+ */
+ public static Class<? extends WritableComparable> getOutputKeyClass() {
+ return Shard.class;
+ }
+
+ /**
+ * Get the reduce output value class.
+ * @return the reduce output value class
+ */
+ public static Class<? extends Writable> getOutputValueClass() {
+ return Text.class;
+ }
+
+ private IndexUpdateConfiguration iconf;
+ private String mapredTempDir;
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ public void reduce(Shard key, Iterator<IntermediateForm> values,
+ OutputCollector<Shard, Text> output, Reporter reporter)
+ throws IOException {
+
+ LOG.info("Construct a shard writer for " + key);
+ FileSystem fs = FileSystem.get(iconf.getConfiguration());
+ String temp =
+ mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
+ final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
+
+ // update the shard
+ while (values.hasNext()) {
+ IntermediateForm form = values.next();
+ writer.process(form);
+ reporter.progress();
+ }
+
+ // close the shard
+ final Reporter fReporter = reporter;
+ new Closeable() {
+ volatile boolean closed = false;
+
+ public void close() throws IOException {
+ // spawn a thread to give progress heartbeats
+ Thread prog = new Thread() {
+ public void run() {
+ while (!closed) {
+ try {
+ fReporter.setStatus("closing");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ } catch (Throwable e) {
+ return;
+ }
+ }
+ }
+ };
+
+ try {
+ prog.start();
+
+ if (writer != null) {
+ writer.close();
+ }
+ } finally {
+ closed = true;
+ }
+ }
+ }.close();
+ LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
+
+ output.collect(key, DONE);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+ */
+ public void configure(JobConf job) {
+ iconf = new IndexUpdateConfiguration(job);
+ mapredTempDir = iconf.getMapredTempDir();
+ mapredTempDir = Shard.normalizePath(mapredTempDir);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.mapred.MapReduceBase#close()
+ */
+ public void close() throws IOException {
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java Tue Oct 16 00:02:55 2012
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-
-/**
- * An intermediate form for one or more parsed Lucene documents and/or
- * delete terms. It actually uses Lucene file format as the format for
- * the intermediate form by using RAM dir files.
- *
- * Note: If process(*) is ever called, closeWriter() should be called.
- * Otherwise, no need to call closeWriter().
- */
-public class IntermediateForm implements Writable {
-
- private IndexUpdateConfiguration iconf = null;
- private final Collection<Term> deleteList;
- private RAMDirectory dir;
- private IndexWriter writer;
- private int numDocs;
-
- /**
- * Constructor
- * @throws IOException
- */
- public IntermediateForm() throws IOException {
- deleteList = new ConcurrentLinkedQueue<Term>();
- dir = new RAMDirectory();
- writer = null;
- numDocs = 0;
- }
-
- /**
- * Configure using an index update configuration.
- * @param iconf the index update configuration
- */
- public void configure(IndexUpdateConfiguration iconf) {
- this.iconf = iconf;
- }
-
- /**
- * Get the ram directory of the intermediate form.
- * @return the ram directory
- */
- public Directory getDirectory() {
- return dir;
- }
-
- /**
- * Get an iterator for the delete terms in the intermediate form.
- * @return an iterator for the delete terms
- */
- public Iterator<Term> deleteTermIterator() {
- return deleteList.iterator();
- }
-
- /**
- * This method is used by the index update mapper and process a document
- * operation into the current intermediate form.
- * @param doc input document operation
- * @param analyzer the analyzer
- * @throws IOException
- */
- public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
- if (doc.getOp() == DocumentAndOp.Op.DELETE
- || doc.getOp() == DocumentAndOp.Op.UPDATE) {
- deleteList.add(doc.getTerm());
-
- }
-
- if (doc.getOp() == DocumentAndOp.Op.INSERT
- || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-
- if (writer == null) {
- // analyzer is null because we specify an analyzer with addDocument
- writer = createWriter();
- }
-
- writer.addDocument(doc.getDocument(), analyzer);
- numDocs++;
- }
-
- }
-
- /**
- * This method is used by the index update combiner and process an
- * intermediate form into the current intermediate form. More specifically,
- * the input intermediate forms are a single-document ram index and/or a
- * single delete term.
- * @param form the input intermediate form
- * @throws IOException
- */
- public void process(IntermediateForm form) throws IOException {
- if (form.deleteList.size() > 0) {
- deleteList.addAll(form.deleteList);
- }
-
- if (form.dir.sizeInBytes() > 0) {
- if (writer == null) {
- writer = createWriter();
- }
-
- writer.addIndexesNoOptimize(new Directory[] { form.dir });
- numDocs++;
- }
-
- }
-
- /**
- * Close the Lucene index writer associated with the intermediate form,
- * if created. Do not close the ram directory. In fact, there is no need
- * to close a ram directory.
- * @throws IOException
- */
- public void closeWriter() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- }
- }
-
- /**
- * The total size of files in the directory and ram used by the index writer.
- * It does not include memory used by the delete list.
- * @return the total size in bytes
- */
- public long totalSizeInBytes() throws IOException {
- long size = dir.sizeInBytes();
- if (writer != null) {
- size += writer.ramSizeInBytes();
- }
- return size;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- StringBuilder buffer = new StringBuilder();
- buffer.append(this.getClass().getSimpleName());
- buffer.append("[numDocs=");
- buffer.append(numDocs);
- buffer.append(", numDeletes=");
- buffer.append(deleteList.size());
- if (deleteList.size() > 0) {
- buffer.append("(");
- Iterator<Term> iter = deleteTermIterator();
- while (iter.hasNext()) {
- buffer.append(iter.next());
- buffer.append(" ");
- }
- buffer.append(")");
- }
- buffer.append("]");
- return buffer.toString();
- }
-
- private IndexWriter createWriter() throws IOException {
- IndexWriter writer =
- new IndexWriter(dir, false, null,
- new KeepOnlyLastCommitDeletionPolicy());
- writer.setUseCompoundFile(false);
-
- if (iconf != null) {
- int maxFieldLength = iconf.getIndexMaxFieldLength();
- if (maxFieldLength > 0) {
- writer.setMaxFieldLength(maxFieldLength);
- }
- }
-
- return writer;
- }
-
- private void resetForm() throws IOException {
- deleteList.clear();
- if (dir.sizeInBytes() > 0) {
- // it's ok if we don't close a ram directory
- dir.close();
- // an alternative is to delete all the files and reuse the ram directory
- dir = new RAMDirectory();
- }
- assert (writer == null);
- numDocs = 0;
- }
-
- // ///////////////////////////////////
- // Writable
- // ///////////////////////////////////
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- out.writeInt(deleteList.size());
- for (Term term : deleteList) {
- Text.writeString(out, term.field());
- Text.writeString(out, term.text());
- }
-
- String[] files = dir.list();
- RAMDirectoryUtil.writeRAMFiles(out, dir, files);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- resetForm();
-
- int numDeleteTerms = in.readInt();
- for (int i = 0; i < numDeleteTerms; i++) {
- String field = Text.readString(in);
- String text = Text.readString(in);
- deleteList.add(new Term(field, text));
- }
-
- RAMDirectoryUtil.readRAMFiles(in, dir);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * An intermediate form for one or more parsed Lucene documents and/or
+ * delete terms. It actually uses Lucene file format as the format for
+ * the intermediate form by using RAM dir files.
+ *
+ * Note: If process(*) is ever called, closeWriter() should be called.
+ * Otherwise, no need to call closeWriter().
+ */
+public class IntermediateForm implements Writable {
+
+ private IndexUpdateConfiguration iconf = null;
+ private final Collection<Term> deleteList;
+ private RAMDirectory dir;
+ private IndexWriter writer;
+ private int numDocs;
+
+ /**
+ * Constructor
+ * @throws IOException
+ */
+ public IntermediateForm() throws IOException {
+ deleteList = new ConcurrentLinkedQueue<Term>();
+ dir = new RAMDirectory();
+ writer = null;
+ numDocs = 0;
+ }
+
+ /**
+ * Configure using an index update configuration.
+ * @param iconf the index update configuration
+ */
+ public void configure(IndexUpdateConfiguration iconf) {
+ this.iconf = iconf;
+ }
+
+ /**
+ * Get the ram directory of the intermediate form.
+ * @return the ram directory
+ */
+ public Directory getDirectory() {
+ return dir;
+ }
+
+ /**
+ * Get an iterator for the delete terms in the intermediate form.
+ * @return an iterator for the delete terms
+ */
+ public Iterator<Term> deleteTermIterator() {
+ return deleteList.iterator();
+ }
+
+ /**
+ * This method is used by the index update mapper and process a document
+ * operation into the current intermediate form.
+ * @param doc input document operation
+ * @param analyzer the analyzer
+ * @throws IOException
+ */
+ public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
+ if (doc.getOp() == DocumentAndOp.Op.DELETE
+ || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+ deleteList.add(doc.getTerm());
+
+ }
+
+ if (doc.getOp() == DocumentAndOp.Op.INSERT
+ || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+
+ if (writer == null) {
+ // analyzer is null because we specify an analyzer with addDocument
+ writer = createWriter();
+ }
+
+ writer.addDocument(doc.getDocument(), analyzer);
+ numDocs++;
+ }
+
+ }
+
+ /**
+ * This method is used by the index update combiner and process an
+ * intermediate form into the current intermediate form. More specifically,
+ * the input intermediate forms are a single-document ram index and/or a
+ * single delete term.
+ * @param form the input intermediate form
+ * @throws IOException
+ */
+ public void process(IntermediateForm form) throws IOException {
+ if (form.deleteList.size() > 0) {
+ deleteList.addAll(form.deleteList);
+ }
+
+ if (form.dir.sizeInBytes() > 0) {
+ if (writer == null) {
+ writer = createWriter();
+ }
+
+ writer.addIndexesNoOptimize(new Directory[] { form.dir });
+ numDocs++;
+ }
+
+ }
+
+ /**
+ * Close the Lucene index writer associated with the intermediate form,
+ * if created. Do not close the ram directory. In fact, there is no need
+ * to close a ram directory.
+ * @throws IOException
+ */
+ public void closeWriter() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ }
+
+ /**
+ * The total size of files in the directory and ram used by the index writer.
+ * It does not include memory used by the delete list.
+ * @return the total size in bytes
+ */
+ public long totalSizeInBytes() throws IOException {
+ long size = dir.sizeInBytes();
+ if (writer != null) {
+ size += writer.ramSizeInBytes();
+ }
+ return size;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(this.getClass().getSimpleName());
+ buffer.append("[numDocs=");
+ buffer.append(numDocs);
+ buffer.append(", numDeletes=");
+ buffer.append(deleteList.size());
+ if (deleteList.size() > 0) {
+ buffer.append("(");
+ Iterator<Term> iter = deleteTermIterator();
+ while (iter.hasNext()) {
+ buffer.append(iter.next());
+ buffer.append(" ");
+ }
+ buffer.append(")");
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ private IndexWriter createWriter() throws IOException {
+ IndexWriter writer =
+ new IndexWriter(dir, false, null,
+ new KeepOnlyLastCommitDeletionPolicy());
+ writer.setUseCompoundFile(false);
+
+ if (iconf != null) {
+ int maxFieldLength = iconf.getIndexMaxFieldLength();
+ if (maxFieldLength > 0) {
+ writer.setMaxFieldLength(maxFieldLength);
+ }
+ }
+
+ return writer;
+ }
+
+ private void resetForm() throws IOException {
+ deleteList.clear();
+ if (dir.sizeInBytes() > 0) {
+ // it's ok if we don't close a ram directory
+ dir.close();
+ // an alternative is to delete all the files and reuse the ram directory
+ dir = new RAMDirectory();
+ }
+ assert (writer == null);
+ numDocs = 0;
+ }
+
+ // ///////////////////////////////////
+ // Writable
+ // ///////////////////////////////////
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(deleteList.size());
+ for (Term term : deleteList) {
+ Text.writeString(out, term.field());
+ Text.writeString(out, term.text());
+ }
+
+ String[] files = dir.list();
+ RAMDirectoryUtil.writeRAMFiles(out, dir, files);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ resetForm();
+
+ int numDeleteTerms = in.readInt();
+ for (int i = 0; i < numDeleteTerms; i++) {
+ String field = Text.readString(in);
+ String text = Text.readString(in);
+ deleteList.add(new Term(field, text));
+ }
+
+ RAMDirectoryUtil.readRAMFiles(in, dir);
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java Tue Oct 16 00:02:55 2012
@@ -1,105 +1,105 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.lucene;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMDirectory;
-
-public class TestMixedDirectory extends TestCase {
- private int numDocsPerUpdate = 10;
- private int maxBufferedDocs = 2;
-
- public void testMixedDirectoryAndPolicy() throws IOException {
- Directory readDir = new RAMDirectory();
- updateIndex(readDir, 0, numDocsPerUpdate,
- new KeepOnlyLastCommitDeletionPolicy());
-
- verify(readDir, numDocsPerUpdate);
-
- IndexOutput out =
- readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
- + ".cfs");
- out.writeInt(0);
- out.close();
-
- Directory writeDir = new RAMDirectory();
- Directory mixedDir = new MixedDirectory(readDir, writeDir);
- updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
- new MixedDeletionPolicy());
-
- verify(readDir, numDocsPerUpdate);
- verify(mixedDir, 2 * numDocsPerUpdate);
- }
-
- public void updateIndex(Directory dir, int base, int numDocs,
- IndexDeletionPolicy policy) throws IOException {
- IndexWriter writer =
- new IndexWriter(dir, false, new StandardAnalyzer(), policy);
- writer.setMaxBufferedDocs(maxBufferedDocs);
- writer.setMergeFactor(1000);
- for (int i = 0; i < numDocs; i++) {
- addDoc(writer, base + i);
- }
- writer.close();
- }
-
- private void addDoc(IndexWriter writer, int id) throws IOException {
- Document doc = new Document();
- doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
- Field.Index.UN_TOKENIZED));
- doc.add(new Field("content", "apache", Field.Store.NO,
- Field.Index.TOKENIZED));
- writer.addDocument(doc);
- }
-
- private void verify(Directory dir, int expectedHits) throws IOException {
- IndexSearcher searcher = new IndexSearcher(dir);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
- int numHits = hits.length();
-
- assertEquals(expectedHits, numHits);
-
- int[] docs = new int[numHits];
- for (int i = 0; i < numHits; i++) {
- Document hit = hits.doc(i);
- docs[Integer.parseInt(hit.get("id"))]++;
- }
- for (int i = 0; i < numHits; i++) {
- assertEquals(1, docs[i]);
- }
-
- searcher.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.lucene;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+
+public class TestMixedDirectory extends TestCase {
+ private int numDocsPerUpdate = 10;
+ private int maxBufferedDocs = 2;
+
+ public void testMixedDirectoryAndPolicy() throws IOException {
+ Directory readDir = new RAMDirectory();
+ updateIndex(readDir, 0, numDocsPerUpdate,
+ new KeepOnlyLastCommitDeletionPolicy());
+
+ verify(readDir, numDocsPerUpdate);
+
+ IndexOutput out =
+ readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
+ + ".cfs");
+ out.writeInt(0);
+ out.close();
+
+ Directory writeDir = new RAMDirectory();
+ Directory mixedDir = new MixedDirectory(readDir, writeDir);
+ updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
+ new MixedDeletionPolicy());
+
+ verify(readDir, numDocsPerUpdate);
+ verify(mixedDir, 2 * numDocsPerUpdate);
+ }
+
+ public void updateIndex(Directory dir, int base, int numDocs,
+ IndexDeletionPolicy policy) throws IOException {
+ IndexWriter writer =
+ new IndexWriter(dir, false, new StandardAnalyzer(), policy);
+ writer.setMaxBufferedDocs(maxBufferedDocs);
+ writer.setMergeFactor(1000);
+ for (int i = 0; i < numDocs; i++) {
+ addDoc(writer, base + i);
+ }
+ writer.close();
+ }
+
+ private void addDoc(IndexWriter writer, int id) throws IOException {
+ Document doc = new Document();
+ doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
+ Field.Index.UN_TOKENIZED));
+ doc.add(new Field("content", "apache", Field.Store.NO,
+ Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ }
+
+ private void verify(Directory dir, int expectedHits) throws IOException {
+ IndexSearcher searcher = new IndexSearcher(dir);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+ int numHits = hits.length();
+
+ assertEquals(expectedHits, numHits);
+
+ int[] docs = new int[numHits];
+ for (int i = 0; i < numHits; i++) {
+ Document hit = hits.doc(i);
+ docs[Integer.parseInt(hit.get("id"))]++;
+ }
+ for (int i = 0; i < numHits; i++) {
+ assertEquals(1, docs[i]);
+ }
+
+ searcher.close();
+ }
+
+}
Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Tue Oct 16 00:02:55 2012
@@ -1,234 +1,234 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestDistributionPolicy extends TestCase {
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- // however, "we only allow 0 or 1 reducer in local mode" - from
- // LocalJobRunner
- private Configuration conf;
- private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
- private Path localUpdatePath =
- new Path(System.getProperty("build.test") + "/sample/data2.txt");
- private Path inputPath = new Path("/myexample/data.txt");
- private Path updatePath = new Path("/myexample/data2.txt");
- private Path outputPath = new Path("/myoutput");
- private Path indexPath = new Path("/myindex");
- private int numShards = 3;
- private int numMapTasks = 5;
-
- private int numDataNodes = 3;
- private int numTaskTrackers = 3;
-
- private int numDocsPerRun = 10; // num of docs in local input path
-
- private FileSystem fs;
- private MiniDFSCluster dfsCluster;
- private MiniMRCluster mrCluster;
-
- public TestDistributionPolicy() throws IOException {
- super();
- if (System.getProperty("hadoop.log.dir") == null) {
- String base = new File(".").getPath(); // getAbsolutePath();
- System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
- }
- conf = new Configuration();
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- try {
- dfsCluster =
- new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
- fs = dfsCluster.getFileSystem();
- if (fs.exists(inputPath)) {
- fs.delete(inputPath, true);
- }
- fs.copyFromLocalFile(localInputPath, inputPath);
- if (fs.exists(updatePath)) {
- fs.delete(updatePath, true);
- }
- fs.copyFromLocalFile(localUpdatePath, updatePath);
-
- if (fs.exists(outputPath)) {
- // do not create, mapred will create
- fs.delete(outputPath, true);
- }
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- mrCluster =
- new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
- } catch (IOException e) {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- throw e;
- }
-
- }
-
- protected void tearDown() throws Exception {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- super.tearDown();
- }
-
- public void testDistributionPolicy() throws IOException {
- IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-
- // test hashing distribution policy
- iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
- onetest();
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- // test round-robin distribution policy
- iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
- onetest();
- }
-
- private void onetest() throws IOException {
- long versionNumber = -1;
- long generation = -1;
-
- Shard[] shards = new Shard[numShards];
- for (int j = 0; j < shards.length; j++) {
- shards[j] =
- new Shard(versionNumber,
- new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
- generation);
- }
-
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- IIndexUpdater updater = new IndexUpdater();
- updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
- shards);
-
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- // delete docs w/ even docids, update docs w/ odd docids
- updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
- shards);
-
- verify(shards);
- }
-
- private void verify(Shard[] shards) throws IOException {
- // verify the index
- IndexReader[] readers = new IndexReader[shards.length];
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- readers[i] = IndexReader.open(dir);
- }
-
- IndexReader reader = new MultiReader(readers);
- IndexSearcher searcher = new IndexSearcher(reader);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
- assertEquals(0, hits.length());
-
- hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
- assertEquals(numDocsPerRun / 2, hits.length());
-
- int[] counts = new int[numDocsPerRun];
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- counts[Integer.parseInt(doc.get("id"))]++;
- }
-
- for (int i = 0; i < numDocsPerRun; i++) {
- if (i % 2 == 0) {
- assertEquals(0, counts[i]);
- } else {
- assertEquals(1, counts[i]);
- }
- }
-
- searcher.close();
- reader.close();
- }
-
-}
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestDistributionPolicy extends TestCase {
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ // however, "we only allow 0 or 1 reducer in local mode" - from
+ // LocalJobRunner
+ private Configuration conf;
+ private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+ private Path localUpdatePath =
+ new Path(System.getProperty("build.test") + "/sample/data2.txt");
+ private Path inputPath = new Path("/myexample/data.txt");
+ private Path updatePath = new Path("/myexample/data2.txt");
+ private Path outputPath = new Path("/myoutput");
+ private Path indexPath = new Path("/myindex");
+ private int numShards = 3;
+ private int numMapTasks = 5;
+
+ private int numDataNodes = 3;
+ private int numTaskTrackers = 3;
+
+ private int numDocsPerRun = 10; // num of docs in local input path
+
+ private FileSystem fs;
+ private MiniDFSCluster dfsCluster;
+ private MiniMRCluster mrCluster;
+
+ public TestDistributionPolicy() throws IOException {
+ super();
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+ }
+ conf = new Configuration();
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ try {
+ dfsCluster =
+ new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+ fs = dfsCluster.getFileSystem();
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+ fs.copyFromLocalFile(localInputPath, inputPath);
+ if (fs.exists(updatePath)) {
+ fs.delete(updatePath, true);
+ }
+ fs.copyFromLocalFile(localUpdatePath, updatePath);
+
+ if (fs.exists(outputPath)) {
+ // do not create, mapred will create
+ fs.delete(outputPath, true);
+ }
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ mrCluster =
+ new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+ } catch (IOException e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ throw e;
+ }
+
+ }
+
+ protected void tearDown() throws Exception {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void testDistributionPolicy() throws IOException {
+ IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+
+ // test hashing distribution policy
+ iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
+ onetest();
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ // test round-robin distribution policy
+ iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
+ onetest();
+ }
+
+ private void onetest() throws IOException {
+ long versionNumber = -1;
+ long generation = -1;
+
+ Shard[] shards = new Shard[numShards];
+ for (int j = 0; j < shards.length; j++) {
+ shards[j] =
+ new Shard(versionNumber,
+ new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
+ generation);
+ }
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ IIndexUpdater updater = new IndexUpdater();
+ updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+ shards);
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ // delete docs w/ even docids, update docs w/ odd docids
+ updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
+ shards);
+
+ verify(shards);
+ }
+
+ private void verify(Shard[] shards) throws IOException {
+ // verify the index
+ IndexReader[] readers = new IndexReader[shards.length];
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ readers[i] = IndexReader.open(dir);
+ }
+
+ IndexReader reader = new MultiReader(readers);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+ assertEquals(0, hits.length());
+
+ hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
+ assertEquals(numDocsPerRun / 2, hits.length());
+
+ int[] counts = new int[numDocsPerRun];
+ for (int i = 0; i < hits.length(); i++) {
+ Document doc = hits.doc(i);
+ counts[Integer.parseInt(doc.get("id"))]++;
+ }
+
+ for (int i = 0; i < numDocsPerRun; i++) {
+ if (i % 2 == 0) {
+ assertEquals(0, counts[i]);
+ } else {
+ assertEquals(1, counts[i]);
+ }
+ }
+
+ searcher.close();
+ reader.close();
+ }
+
+}