You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2012/10/16 02:03:53 UTC

svn commit: r1398581 [8/9] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-...

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateConfiguration.java Tue Oct 16 00:02:55 2012
@@ -1,256 +1,256 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
-import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-
-/**
- * This class provides the getters and the setters to a number of parameters.
- * Most of the parameters are related to the index update and the rest are
- * from the existing Map/Reduce parameters.  
- */
-public class IndexUpdateConfiguration {
-  final Configuration conf;
-
-  /**
-   * Constructor
-   * @param conf
-   */
-  public IndexUpdateConfiguration(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Get the underlying configuration object.
-   * @return the configuration
-   */
-  public Configuration getConfiguration() {
-    return conf;
-  }
-
-  //
-  // existing map/reduce properties
-  //
-  // public int getIOFileBufferSize() {
-  // return getInt("io.file.buffer.size", 4096);
-  // }
-
-  /**
-   * Get the IO sort space in MB.
-   * @return the IO sort space in MB
-   */
-  public int getIOSortMB() {
-    return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
-  }
-
-  /**
-   * Set the IO sort space in MB.
-   * @param mb  the IO sort space in MB
-   */
-  public void setIOSortMB(int mb) {
-    conf.setInt(MRJobConfig.IO_SORT_MB, mb);
-  }
-
-  /**
-   * Get the Map/Reduce temp directory.
-   * @return the Map/Reduce temp directory
-   */
-  public String getMapredTempDir() {
-    return conf.get(MRConfig.TEMP_DIR);
-  }
-
-  //
-  // properties for index update
-  //
-  /**
-   * Get the distribution policy class.
-   * @return the distribution policy class
-   */
-  public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
-    return conf.getClass("sea.distribution.policy",
-        HashingDistributionPolicy.class, IDistributionPolicy.class);
-  }
-
-  /**
-   * Set the distribution policy class.
-   * @param theClass  the distribution policy class
-   */
-  public void setDistributionPolicyClass(
-      Class<? extends IDistributionPolicy> theClass) {
-    conf.setClass("sea.distribution.policy", theClass,
-        IDistributionPolicy.class);
-  }
-
-  /**
-   * Get the analyzer class.
-   * @return the analyzer class
-   */
-  public Class<? extends Analyzer> getDocumentAnalyzerClass() {
-    return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
-        Analyzer.class);
-  }
-
-  /**
-   * Set the analyzer class.
-   * @param theClass  the analyzer class
-   */
-  public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
-    conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
-  }
-
-  /**
-   * Get the index input format class.
-   * @return the index input format class
-   */
-  public Class<? extends InputFormat> getIndexInputFormatClass() {
-    return conf.getClass("sea.input.format", LineDocInputFormat.class,
-        InputFormat.class);
-  }
-
-  /**
-   * Set the index input format class.
-   * @param theClass  the index input format class
-   */
-  public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
-    conf.setClass("sea.input.format", theClass, InputFormat.class);
-  }
-
-  /**
-   * Get the index updater class.
-   * @return the index updater class
-   */
-  public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
-    return conf.getClass("sea.index.updater", IndexUpdater.class,
-        IIndexUpdater.class);
-  }
-
-  /**
-   * Set the index updater class.
-   * @param theClass  the index updater class
-   */
-  public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
-    conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
-  }
-
-  /**
-   * Get the local analysis class.
-   * @return the local analysis class
-   */
-  public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
-    return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
-        ILocalAnalysis.class);
-  }
-
-  /**
-   * Set the local analysis class.
-   * @param theClass  the local analysis class
-   */
-  public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
-    conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
-  }
-
-  /**
-   * Get the string representation of a number of shards.
-   * @return the string representation of a number of shards
-   */
-  public String getIndexShards() {
-    return conf.get("sea.index.shards");
-  }
-
-  /**
-   * Set the string representation of a number of shards.
-   * @param shards  the string representation of a number of shards
-   */
-  public void setIndexShards(String shards) {
-    conf.set("sea.index.shards", shards);
-  }
-
-  /**
-   * Get the max field length for a Lucene instance.
-   * @return the max field length for a Lucene instance
-   */
-  public int getIndexMaxFieldLength() {
-    return conf.getInt("sea.max.field.length", -1);
-  }
-
-  /**
-   * Set the max field length for a Lucene instance.
-   * @param maxFieldLength  the max field length for a Lucene instance
-   */
-  public void setIndexMaxFieldLength(int maxFieldLength) {
-    conf.setInt("sea.max.field.length", maxFieldLength);
-  }
-
-  /**
-   * Get the max number of segments for a Lucene instance.
-   * @return the max number of segments for a Lucene instance
-   */
-  public int getIndexMaxNumSegments() {
-    return conf.getInt("sea.max.num.segments", -1);
-  }
-
-  /**
-   * Set the max number of segments for a Lucene instance.
-   * @param maxNumSegments  the max number of segments for a Lucene instance
-   */
-  public void setIndexMaxNumSegments(int maxNumSegments) {
-    conf.setInt("sea.max.num.segments", maxNumSegments);
-  }
-
-  /**
-   * Check whether to use the compound file format for a Lucene instance.
-   * @return true if using the compound file format for a Lucene instance
-   */
-  public boolean getIndexUseCompoundFile() {
-    return conf.getBoolean("sea.use.compound.file", false);
-  }
-
-  /**
-   * Set whether use the compound file format for a Lucene instance.
-   * @param useCompoundFile  whether to use the compound file format
-   */
-  public void setIndexUseCompoundFile(boolean useCompoundFile) {
-    conf.setBoolean("sea.use.compound.file", useCompoundFile);
-  }
-
-  /**
-   * Get the max ram index size in bytes. The default is 50M.
-   * @return the max ram index size in bytes
-   */
-  public long getMaxRAMSizeInBytes() {
-    return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
-  }
-
-  /**
-   * Set the max ram index size in bytes.
-   * @param b  the max ram index size in bytes
-   */
-  public void setMaxRAMSizeInBytes(long b) {
-    conf.setLong("sea.max.ramsize.bytes", b);
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.LineDocInputFormat;
+import org.apache.hadoop.contrib.index.example.LineDocLocalAnalysis;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+
+/**
+ * This class provides the getters and the setters to a number of parameters.
+ * Most of the parameters are related to the index update and the rest are
+ * from the existing Map/Reduce parameters.  
+ */
+public class IndexUpdateConfiguration {
+  final Configuration conf;
+
+  /**
+   * Constructor
+   * @param conf
+   */
+  public IndexUpdateConfiguration(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * Get the underlying configuration object.
+   * @return the configuration
+   */
+  public Configuration getConfiguration() {
+    return conf;
+  }
+
+  //
+  // existing map/reduce properties
+  //
+  // public int getIOFileBufferSize() {
+  // return getInt("io.file.buffer.size", 4096);
+  // }
+
+  /**
+   * Get the IO sort space in MB.
+   * @return the IO sort space in MB
+   */
+  public int getIOSortMB() {
+    return conf.getInt(MRJobConfig.IO_SORT_MB, 100);
+  }
+
+  /**
+   * Set the IO sort space in MB.
+   * @param mb  the IO sort space in MB
+   */
+  public void setIOSortMB(int mb) {
+    conf.setInt(MRJobConfig.IO_SORT_MB, mb);
+  }
+
+  /**
+   * Get the Map/Reduce temp directory.
+   * @return the Map/Reduce temp directory
+   */
+  public String getMapredTempDir() {
+    return conf.get(MRConfig.TEMP_DIR);
+  }
+
+  //
+  // properties for index update
+  //
+  /**
+   * Get the distribution policy class.
+   * @return the distribution policy class
+   */
+  public Class<? extends IDistributionPolicy> getDistributionPolicyClass() {
+    return conf.getClass("sea.distribution.policy",
+        HashingDistributionPolicy.class, IDistributionPolicy.class);
+  }
+
+  /**
+   * Set the distribution policy class.
+   * @param theClass  the distribution policy class
+   */
+  public void setDistributionPolicyClass(
+      Class<? extends IDistributionPolicy> theClass) {
+    conf.setClass("sea.distribution.policy", theClass,
+        IDistributionPolicy.class);
+  }
+
+  /**
+   * Get the analyzer class.
+   * @return the analyzer class
+   */
+  public Class<? extends Analyzer> getDocumentAnalyzerClass() {
+    return conf.getClass("sea.document.analyzer", StandardAnalyzer.class,
+        Analyzer.class);
+  }
+
+  /**
+   * Set the analyzer class.
+   * @param theClass  the analyzer class
+   */
+  public void setDocumentAnalyzerClass(Class<? extends Analyzer> theClass) {
+    conf.setClass("sea.document.analyzer", theClass, Analyzer.class);
+  }
+
+  /**
+   * Get the index input format class.
+   * @return the index input format class
+   */
+  public Class<? extends InputFormat> getIndexInputFormatClass() {
+    return conf.getClass("sea.input.format", LineDocInputFormat.class,
+        InputFormat.class);
+  }
+
+  /**
+   * Set the index input format class.
+   * @param theClass  the index input format class
+   */
+  public void setIndexInputFormatClass(Class<? extends InputFormat> theClass) {
+    conf.setClass("sea.input.format", theClass, InputFormat.class);
+  }
+
+  /**
+   * Get the index updater class.
+   * @return the index updater class
+   */
+  public Class<? extends IIndexUpdater> getIndexUpdaterClass() {
+    return conf.getClass("sea.index.updater", IndexUpdater.class,
+        IIndexUpdater.class);
+  }
+
+  /**
+   * Set the index updater class.
+   * @param theClass  the index updater class
+   */
+  public void setIndexUpdaterClass(Class<? extends IIndexUpdater> theClass) {
+    conf.setClass("sea.index.updater", theClass, IIndexUpdater.class);
+  }
+
+  /**
+   * Get the local analysis class.
+   * @return the local analysis class
+   */
+  public Class<? extends ILocalAnalysis> getLocalAnalysisClass() {
+    return conf.getClass("sea.local.analysis", LineDocLocalAnalysis.class,
+        ILocalAnalysis.class);
+  }
+
+  /**
+   * Set the local analysis class.
+   * @param theClass  the local analysis class
+   */
+  public void setLocalAnalysisClass(Class<? extends ILocalAnalysis> theClass) {
+    conf.setClass("sea.local.analysis", theClass, ILocalAnalysis.class);
+  }
+
+  /**
+   * Get the string representation of a number of shards.
+   * @return the string representation of a number of shards
+   */
+  public String getIndexShards() {
+    return conf.get("sea.index.shards");
+  }
+
+  /**
+   * Set the string representation of a number of shards.
+   * @param shards  the string representation of a number of shards
+   */
+  public void setIndexShards(String shards) {
+    conf.set("sea.index.shards", shards);
+  }
+
+  /**
+   * Get the max field length for a Lucene instance.
+   * @return the max field length for a Lucene instance
+   */
+  public int getIndexMaxFieldLength() {
+    return conf.getInt("sea.max.field.length", -1);
+  }
+
+  /**
+   * Set the max field length for a Lucene instance.
+   * @param maxFieldLength  the max field length for a Lucene instance
+   */
+  public void setIndexMaxFieldLength(int maxFieldLength) {
+    conf.setInt("sea.max.field.length", maxFieldLength);
+  }
+
+  /**
+   * Get the max number of segments for a Lucene instance.
+   * @return the max number of segments for a Lucene instance
+   */
+  public int getIndexMaxNumSegments() {
+    return conf.getInt("sea.max.num.segments", -1);
+  }
+
+  /**
+   * Set the max number of segments for a Lucene instance.
+   * @param maxNumSegments  the max number of segments for a Lucene instance
+   */
+  public void setIndexMaxNumSegments(int maxNumSegments) {
+    conf.setInt("sea.max.num.segments", maxNumSegments);
+  }
+
+  /**
+   * Check whether to use the compound file format for a Lucene instance.
+   * @return true if using the compound file format for a Lucene instance
+   */
+  public boolean getIndexUseCompoundFile() {
+    return conf.getBoolean("sea.use.compound.file", false);
+  }
+
+  /**
+   * Set whether use the compound file format for a Lucene instance.
+   * @param useCompoundFile  whether to use the compound file format
+   */
+  public void setIndexUseCompoundFile(boolean useCompoundFile) {
+    conf.setBoolean("sea.use.compound.file", useCompoundFile);
+  }
+
+  /**
+   * Get the max ram index size in bytes. The default is 50M.
+   * @return the max ram index size in bytes
+   */
+  public long getMaxRAMSizeInBytes() {
+    return conf.getLong("sea.max.ramsize.bytes", 50L << 20);
+  }
+
+  /**
+   * Set the max ram index size in bytes.
+   * @param b  the max ram index size in bytes
+   */
+  public void setMaxRAMSizeInBytes(long b) {
+    conf.setLong("sea.max.ramsize.bytes", b);
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateMapper.java Tue Oct 16 00:02:55 2012
@@ -1,199 +1,199 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.lucene.analysis.Analyzer;
-
-/**
- * This class applies local analysis on a key-value pair and then convert the
- * result docid-operation pair to a shard-and-intermediate form pair.
- */
-public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
-    extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
-  static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
-
-  /**
-   * Get the map output key class.
-   * @return the map output key class
-   */
-  public static Class<? extends WritableComparable> getMapOutputKeyClass() {
-    return Shard.class;
-  }
-
-  /**
-   * Get the map output value class.
-   * @return the map output value class
-   */
-  public static Class<? extends Writable> getMapOutputValueClass() {
-    return IntermediateForm.class;
-  }
-
-  IndexUpdateConfiguration iconf;
-  private Analyzer analyzer;
-  private Shard[] shards;
-  private IDistributionPolicy distributionPolicy;
-
-  private ILocalAnalysis<K, V> localAnalysis;
-  private DocumentID tmpKey;
-  private DocumentAndOp tmpValue;
-
-  private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
-      new OutputCollector<DocumentID, DocumentAndOp>() {
-        public void collect(DocumentID key, DocumentAndOp value)
-            throws IOException {
-          tmpKey = key;
-          tmpValue = value;
-        }
-      };
-
-  /**
-   * Map a key-value pair to a shard-and-intermediate form pair. Internally,
-   * the local analysis is first applied to map the key-value pair to a
-   * document id-and-operation pair, then the docid-and-operation pair is
-   * mapped to a shard-intermediate form pair. The intermediate form is of the
-   * form of a single-document ram index and/or a single delete term.
-   */
-  public void map(K key, V value,
-      OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
-      throws IOException {
-
-    synchronized (this) {
-      localAnalysis.map(key, value, tmpCollector, reporter);
-
-      if (tmpKey != null && tmpValue != null) {
-        DocumentAndOp doc = tmpValue;
-        IntermediateForm form = new IntermediateForm();
-        form.configure(iconf);
-        form.process(doc, analyzer);
-        form.closeWriter();
-
-        if (doc.getOp() == DocumentAndOp.Op.INSERT) {
-          int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
-          if (chosenShard >= 0) {
-            // insert into one shard
-            output.collect(shards[chosenShard], form);
-          } else {
-            throw new IOException("Chosen shard for insert must be >= 0");
-          }
-
-        } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
-          int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
-          if (chosenShard >= 0) {
-            // delete from one shard
-            output.collect(shards[chosenShard], form);
-          } else {
-            // broadcast delete to all shards
-            for (int i = 0; i < shards.length; i++) {
-              output.collect(shards[i], form);
-            }
-          }
-
-        } else { // UPDATE
-          int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
-          int deleteFromShard =
-              distributionPolicy.chooseShardForDelete(tmpKey);
-
-          if (insertToShard >= 0) {
-            if (insertToShard == deleteFromShard) {
-              // update into one shard
-              output.collect(shards[insertToShard], form);
-            } else {
-              // prepare a deletion form
-              IntermediateForm deletionForm = new IntermediateForm();
-              deletionForm.configure(iconf);
-              deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
-                  doc.getTerm()), analyzer);
-              deletionForm.closeWriter();
-
-              if (deleteFromShard >= 0) {
-                // delete from one shard
-                output.collect(shards[deleteFromShard], deletionForm);
-              } else {
-                // broadcast delete to all shards
-                for (int i = 0; i < shards.length; i++) {
-                  output.collect(shards[i], deletionForm);
-                }
-              }
-
-              // prepare an insertion form
-              IntermediateForm insertionForm = new IntermediateForm();
-              insertionForm.configure(iconf);
-              insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
-                  doc.getDocument()), analyzer);
-              insertionForm.closeWriter();
-
-              // insert into one shard
-              output.collect(shards[insertToShard], insertionForm);
-            }
-          } else {
-            throw new IOException("Chosen shard for insert must be >= 0");
-          }
-        }
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
-   */
-  public void configure(JobConf job) {
-    iconf = new IndexUpdateConfiguration(job);
-    analyzer =
-        (Analyzer) ReflectionUtils.newInstance(
-            iconf.getDocumentAnalyzerClass(), job);
-
-    localAnalysis =
-        (ILocalAnalysis) ReflectionUtils.newInstance(
-            iconf.getLocalAnalysisClass(), job);
-    localAnalysis.configure(job);
-
-    shards = Shard.getIndexShards(iconf);
-
-    distributionPolicy =
-        (IDistributionPolicy) ReflectionUtils.newInstance(
-            iconf.getDistributionPolicyClass(), job);
-    distributionPolicy.init(shards);
-
-    LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
-    LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
-    LOG.info(shards.length + " shards = " + iconf.getIndexShards());
-    LOG.info("sea.distribution.policy = "
-        + distributionPolicy.getClass().getName());
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.MapReduceBase#close()
-   */
-  public void close() throws IOException {
-    localAnalysis.close();
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.lucene.analysis.Analyzer;
+
+/**
+ * This class applies local analysis on a key-value pair and then convert the
+ * result docid-operation pair to a shard-and-intermediate form pair.
+ */
+public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
+    extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
+  static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
+
+  /**
+   * Get the map output key class.
+   * @return the map output key class
+   */
+  public static Class<? extends WritableComparable> getMapOutputKeyClass() {
+    return Shard.class;
+  }
+
+  /**
+   * Get the map output value class.
+   * @return the map output value class
+   */
+  public static Class<? extends Writable> getMapOutputValueClass() {
+    return IntermediateForm.class;
+  }
+
+  IndexUpdateConfiguration iconf;
+  private Analyzer analyzer;
+  private Shard[] shards;
+  private IDistributionPolicy distributionPolicy;
+
+  private ILocalAnalysis<K, V> localAnalysis;
+  private DocumentID tmpKey;
+  private DocumentAndOp tmpValue;
+
+  private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
+      new OutputCollector<DocumentID, DocumentAndOp>() {
+        public void collect(DocumentID key, DocumentAndOp value)
+            throws IOException {
+          tmpKey = key;
+          tmpValue = value;
+        }
+      };
+
+  /**
+   * Map a key-value pair to a shard-and-intermediate form pair. Internally,
+   * the local analysis is first applied to map the key-value pair to a
+   * document id-and-operation pair, then the docid-and-operation pair is
+   * mapped to a shard-intermediate form pair. The intermediate form is of the
+   * form of a single-document ram index and/or a single delete term.
+   */
+  public void map(K key, V value,
+      OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
+      throws IOException {
+
+    synchronized (this) {
+      localAnalysis.map(key, value, tmpCollector, reporter);
+
+      if (tmpKey != null && tmpValue != null) {
+        DocumentAndOp doc = tmpValue;
+        IntermediateForm form = new IntermediateForm();
+        form.configure(iconf);
+        form.process(doc, analyzer);
+        form.closeWriter();
+
+        if (doc.getOp() == DocumentAndOp.Op.INSERT) {
+          int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
+          if (chosenShard >= 0) {
+            // insert into one shard
+            output.collect(shards[chosenShard], form);
+          } else {
+            throw new IOException("Chosen shard for insert must be >= 0");
+          }
+
+        } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
+          int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
+          if (chosenShard >= 0) {
+            // delete from one shard
+            output.collect(shards[chosenShard], form);
+          } else {
+            // broadcast delete to all shards
+            for (int i = 0; i < shards.length; i++) {
+              output.collect(shards[i], form);
+            }
+          }
+
+        } else { // UPDATE
+          int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
+          int deleteFromShard =
+              distributionPolicy.chooseShardForDelete(tmpKey);
+
+          if (insertToShard >= 0) {
+            if (insertToShard == deleteFromShard) {
+              // update into one shard
+              output.collect(shards[insertToShard], form);
+            } else {
+              // prepare a deletion form
+              IntermediateForm deletionForm = new IntermediateForm();
+              deletionForm.configure(iconf);
+              deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
+                  doc.getTerm()), analyzer);
+              deletionForm.closeWriter();
+
+              if (deleteFromShard >= 0) {
+                // delete from one shard
+                output.collect(shards[deleteFromShard], deletionForm);
+              } else {
+                // broadcast delete to all shards
+                for (int i = 0; i < shards.length; i++) {
+                  output.collect(shards[i], deletionForm);
+                }
+              }
+
+              // prepare an insertion form
+              IntermediateForm insertionForm = new IntermediateForm();
+              insertionForm.configure(iconf);
+              insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
+                  doc.getDocument()), analyzer);
+              insertionForm.closeWriter();
+
+              // insert into one shard
+              output.collect(shards[insertToShard], insertionForm);
+            }
+          } else {
+            throw new IOException("Chosen shard for insert must be >= 0");
+          }
+        }
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    iconf = new IndexUpdateConfiguration(job);
+    analyzer =
+        (Analyzer) ReflectionUtils.newInstance(
+            iconf.getDocumentAnalyzerClass(), job);
+
+    localAnalysis =
+        (ILocalAnalysis) ReflectionUtils.newInstance(
+            iconf.getLocalAnalysisClass(), job);
+    localAnalysis.configure(job);
+
+    shards = Shard.getIndexShards(iconf);
+
+    distributionPolicy =
+        (IDistributionPolicy) ReflectionUtils.newInstance(
+            iconf.getDistributionPolicyClass(), job);
+    distributionPolicy.init(shards);
+
+    LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
+    LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
+    LOG.info(shards.length + " shards = " + iconf.getIndexShards());
+    LOG.info("sea.distribution.policy = "
+        + distributionPolicy.getClass().getName());
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   */
+  public void close() throws IOException {
+    localAnalysis.close();
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdatePartitioner.java Tue Oct 16 00:02:55 2012
@@ -1,60 +1,60 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
-
-/**
- * This partitioner class puts the values of the same key - in this case the
- * same shard - in the same partition.
- */
-public class IndexUpdatePartitioner implements
-    Partitioner<Shard, IntermediateForm> {
-
-  private Shard[] shards;
-  private Map<Shard, Integer> map;
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
-   */
-  public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
-    int partition = map.get(key).intValue();
-    if (partition < numPartitions) {
-      return partition;
-    } else {
-      return numPartitions - 1;
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
-   */
-  public void configure(JobConf job) {
-    shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
-    map = new HashMap<Shard, Integer>();
-    for (int i = 0; i < shards.length; i++) {
-      map.put(shards[i], i);
-    }
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Partitioner;
+
+/**
+ * This partitioner class puts the values of the same key - in this case the
+ * same shard - in the same partition.
+ */
+public class IndexUpdatePartitioner implements
+    Partitioner<Shard, IntermediateForm> {
+
+  private Shard[] shards;
+  private Map<Shard, Integer> map;
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.Partitioner#getPartition(java.lang.Object, java.lang.Object, int)
+   */
+  public int getPartition(Shard key, IntermediateForm value, int numPartitions) {
+    int partition = map.get(key).intValue();
+    if (partition < numPartitions) {
+      return partition;
+    } else {
+      return numPartitions - 1;
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.JobConfigurable#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    shards = Shard.getIndexShards(new IndexUpdateConfiguration(job));
+    map = new HashMap<Shard, Integer>();
+    for (int i = 0; i < shards.length; i++) {
+      map.put(shards[i], i);
+    }
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IndexUpdateReducer.java Tue Oct 16 00:02:55 2012
@@ -1,143 +1,143 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.contrib.index.lucene.ShardWriter;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Closeable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.MapReduceBase;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-
-/**
- * This reducer applies to a shard the changes for it. A "new version" of
- * a shard is created at the end of a reduce. It is important to note that
- * the new version of the shard is not derived from scratch. By leveraging
- * Lucene's update algorithm, the new version of each Lucene instance will
- * share as many files as possible as the previous version. 
- */
-public class IndexUpdateReducer extends MapReduceBase implements
-    Reducer<Shard, IntermediateForm, Shard, Text> {
-  static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
-  static final Text DONE = new Text("done");
-
-  /**
-   * Get the reduce output key class.
-   * @return the reduce output key class
-   */
-  public static Class<? extends WritableComparable> getOutputKeyClass() {
-    return Shard.class;
-  }
-
-  /**
-   * Get the reduce output value class.
-   * @return the reduce output value class
-   */
-  public static Class<? extends Writable> getOutputValueClass() {
-    return Text.class;
-  }
-
-  private IndexUpdateConfiguration iconf;
-  private String mapredTempDir;
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
-   */
-  public void reduce(Shard key, Iterator<IntermediateForm> values,
-      OutputCollector<Shard, Text> output, Reporter reporter)
-      throws IOException {
-
-    LOG.info("Construct a shard writer for " + key);
-    FileSystem fs = FileSystem.get(iconf.getConfiguration());
-    String temp =
-        mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
-    final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
-
-    // update the shard
-    while (values.hasNext()) {
-      IntermediateForm form = values.next();
-      writer.process(form);
-      reporter.progress();
-    }
-
-    // close the shard
-    final Reporter fReporter = reporter;
-    new Closeable() {
-      volatile boolean closed = false;
-
-      public void close() throws IOException {
-        // spawn a thread to give progress heartbeats
-        Thread prog = new Thread() {
-          public void run() {
-            while (!closed) {
-              try {
-                fReporter.setStatus("closing");
-                Thread.sleep(1000);
-              } catch (InterruptedException e) {
-                continue;
-              } catch (Throwable e) {
-                return;
-              }
-            }
-          }
-        };
-
-        try {
-          prog.start();
-
-          if (writer != null) {
-            writer.close();
-          }
-        } finally {
-          closed = true;
-        }
-      }
-    }.close();
-    LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
-
-    output.collect(key, DONE);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
-   */
-  public void configure(JobConf job) {
-    iconf = new IndexUpdateConfiguration(job);
-    mapredTempDir = iconf.getMapredTempDir();
-    mapredTempDir = Shard.normalizePath(mapredTempDir);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.mapred.MapReduceBase#close()
-   */
-  public void close() throws IOException {
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.contrib.index.lucene.ShardWriter;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * This reducer applies to a shard the changes for it. A "new version" of
+ * a shard is created at the end of a reduce. It is important to note that
+ * the new version of the shard is not derived from scratch. By leveraging
+ * Lucene's update algorithm, the new version of each Lucene instance will
+ * share as many files as possible as the previous version. 
+ */
+public class IndexUpdateReducer extends MapReduceBase implements
+    Reducer<Shard, IntermediateForm, Shard, Text> {
+  static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
+  static final Text DONE = new Text("done");
+
+  /**
+   * Get the reduce output key class.
+   * @return the reduce output key class
+   */
+  public static Class<? extends WritableComparable> getOutputKeyClass() {
+    return Shard.class;
+  }
+
+  /**
+   * Get the reduce output value class.
+   * @return the reduce output value class
+   */
+  public static Class<? extends Writable> getOutputValueClass() {
+    return Text.class;
+  }
+
+  private IndexUpdateConfiguration iconf;
+  private String mapredTempDir;
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
+   */
+  public void reduce(Shard key, Iterator<IntermediateForm> values,
+      OutputCollector<Shard, Text> output, Reporter reporter)
+      throws IOException {
+
+    LOG.info("Construct a shard writer for " + key);
+    FileSystem fs = FileSystem.get(iconf.getConfiguration());
+    String temp =
+        mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
+    final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
+
+    // update the shard
+    while (values.hasNext()) {
+      IntermediateForm form = values.next();
+      writer.process(form);
+      reporter.progress();
+    }
+
+    // close the shard
+    final Reporter fReporter = reporter;
+    new Closeable() {
+      volatile boolean closed = false;
+
+      public void close() throws IOException {
+        // spawn a thread to give progress heartbeats
+        Thread prog = new Thread() {
+          public void run() {
+            while (!closed) {
+              try {
+                fReporter.setStatus("closing");
+                Thread.sleep(1000);
+              } catch (InterruptedException e) {
+                continue;
+              } catch (Throwable e) {
+                return;
+              }
+            }
+          }
+        };
+
+        try {
+          prog.start();
+
+          if (writer != null) {
+            writer.close();
+          }
+        } finally {
+          closed = true;
+        }
+      }
+    }.close();
+    LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
+
+    output.collect(key, DONE);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
+   */
+  public void configure(JobConf job) {
+    iconf = new IndexUpdateConfiguration(job);
+    mapredTempDir = iconf.getMapredTempDir();
+    mapredTempDir = Shard.normalizePath(mapredTempDir);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.mapred.MapReduceBase#close()
+   */
+  public void close() throws IOException {
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java Tue Oct 16 00:02:55 2012
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-
-/**
- * An intermediate form for one or more parsed Lucene documents and/or
- * delete terms. It actually uses Lucene file format as the format for
- * the intermediate form by using RAM dir files.
- * 
- * Note: If process(*) is ever called, closeWriter() should be called.
- * Otherwise, no need to call closeWriter().
- */
-public class IntermediateForm implements Writable {
-
-  private IndexUpdateConfiguration iconf = null;
-  private final Collection<Term> deleteList;
-  private RAMDirectory dir;
-  private IndexWriter writer;
-  private int numDocs;
-
-  /**
-   * Constructor
-   * @throws IOException
-   */
-  public IntermediateForm() throws IOException {
-    deleteList = new ConcurrentLinkedQueue<Term>();
-    dir = new RAMDirectory();
-    writer = null;
-    numDocs = 0;
-  }
-
-  /**
-   * Configure using an index update configuration.
-   * @param iconf  the index update configuration
-   */
-  public void configure(IndexUpdateConfiguration iconf) {
-    this.iconf = iconf;
-  }
-
-  /**
-   * Get the ram directory of the intermediate form.
-   * @return the ram directory
-   */
-  public Directory getDirectory() {
-    return dir;
-  }
-
-  /**
-   * Get an iterator for the delete terms in the intermediate form.
-   * @return an iterator for the delete terms
-   */
-  public Iterator<Term> deleteTermIterator() {
-    return deleteList.iterator();
-  }
-
-  /**
-   * This method is used by the index update mapper and process a document
-   * operation into the current intermediate form.
-   * @param doc  input document operation
-   * @param analyzer  the analyzer
-   * @throws IOException
-   */
-  public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
-    if (doc.getOp() == DocumentAndOp.Op.DELETE
-        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-      deleteList.add(doc.getTerm());
-
-    }
-
-    if (doc.getOp() == DocumentAndOp.Op.INSERT
-        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-
-      if (writer == null) {
-        // analyzer is null because we specify an analyzer with addDocument
-        writer = createWriter();
-      }
-
-      writer.addDocument(doc.getDocument(), analyzer);
-      numDocs++;
-    }
-
-  }
-
-  /**
-   * This method is used by the index update combiner and process an
-   * intermediate form into the current intermediate form. More specifically,
-   * the input intermediate forms are a single-document ram index and/or a
-   * single delete term.
-   * @param form  the input intermediate form
-   * @throws IOException
-   */
-  public void process(IntermediateForm form) throws IOException {
-    if (form.deleteList.size() > 0) {
-      deleteList.addAll(form.deleteList);
-    }
-
-    if (form.dir.sizeInBytes() > 0) {
-      if (writer == null) {
-        writer = createWriter();
-      }
-
-      writer.addIndexesNoOptimize(new Directory[] { form.dir });
-      numDocs++;
-    }
-
-  }
-
-  /**
-   * Close the Lucene index writer associated with the intermediate form,
-   * if created. Do not close the ram directory. In fact, there is no need
-   * to close a ram directory.
-   * @throws IOException
-   */
-  public void closeWriter() throws IOException {
-    if (writer != null) {
-      writer.close();
-      writer = null;
-    }
-  }
-
-  /**
-   * The total size of files in the directory and ram used by the index writer.
-   * It does not include memory used by the delete list.
-   * @return the total size in bytes
-   */
-  public long totalSizeInBytes() throws IOException {
-    long size = dir.sizeInBytes();
-    if (writer != null) {
-      size += writer.ramSizeInBytes();
-    }
-    return size;
-  }
-
-  /* (non-Javadoc)
-   * @see java.lang.Object#toString()
-   */
-  public String toString() {
-    StringBuilder buffer = new StringBuilder();
-    buffer.append(this.getClass().getSimpleName());
-    buffer.append("[numDocs=");
-    buffer.append(numDocs);
-    buffer.append(", numDeletes=");
-    buffer.append(deleteList.size());
-    if (deleteList.size() > 0) {
-      buffer.append("(");
-      Iterator<Term> iter = deleteTermIterator();
-      while (iter.hasNext()) {
-        buffer.append(iter.next());
-        buffer.append(" ");
-      }
-      buffer.append(")");
-    }
-    buffer.append("]");
-    return buffer.toString();
-  }
-
-  private IndexWriter createWriter() throws IOException {
-    IndexWriter writer =
-        new IndexWriter(dir, false, null,
-            new KeepOnlyLastCommitDeletionPolicy());
-    writer.setUseCompoundFile(false);
-
-    if (iconf != null) {
-      int maxFieldLength = iconf.getIndexMaxFieldLength();
-      if (maxFieldLength > 0) {
-        writer.setMaxFieldLength(maxFieldLength);
-      }
-    }
-
-    return writer;
-  }
-
-  private void resetForm() throws IOException {
-    deleteList.clear();
-    if (dir.sizeInBytes() > 0) {
-      // it's ok if we don't close a ram directory
-      dir.close();
-      // an alternative is to delete all the files and reuse the ram directory
-      dir = new RAMDirectory();
-    }
-    assert (writer == null);
-    numDocs = 0;
-  }
-
-  // ///////////////////////////////////
-  // Writable
-  // ///////////////////////////////////
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
-   */
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(deleteList.size());
-    for (Term term : deleteList) {
-      Text.writeString(out, term.field());
-      Text.writeString(out, term.text());
-    }
-
-    String[] files = dir.list();
-    RAMDirectoryUtil.writeRAMFiles(out, dir, files);
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
-   */
-  public void readFields(DataInput in) throws IOException {
-    resetForm();
-
-    int numDeleteTerms = in.readInt();
-    for (int i = 0; i < numDeleteTerms; i++) {
-      String field = Text.readString(in);
-      String text = Text.readString(in);
-      deleteList.add(new Term(field, text));
-    }
-
-    RAMDirectoryUtil.readRAMFiles(in, dir);
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * An intermediate form for one or more parsed Lucene documents and/or
+ * delete terms. It actually uses Lucene file format as the format for
+ * the intermediate form by using RAM dir files.
+ * 
+ * Note: If process(*) is ever called, closeWriter() should be called.
+ * Otherwise, no need to call closeWriter().
+ */
+public class IntermediateForm implements Writable {
+
+  private IndexUpdateConfiguration iconf = null;
+  private final Collection<Term> deleteList;
+  private RAMDirectory dir;
+  private IndexWriter writer;
+  private int numDocs;
+
+  /**
+   * Constructor
+   * @throws IOException
+   */
+  public IntermediateForm() throws IOException {
+    deleteList = new ConcurrentLinkedQueue<Term>();
+    dir = new RAMDirectory();
+    writer = null;
+    numDocs = 0;
+  }
+
+  /**
+   * Configure using an index update configuration.
+   * @param iconf  the index update configuration
+   */
+  public void configure(IndexUpdateConfiguration iconf) {
+    this.iconf = iconf;
+  }
+
+  /**
+   * Get the ram directory of the intermediate form.
+   * @return the ram directory
+   */
+  public Directory getDirectory() {
+    return dir;
+  }
+
+  /**
+   * Get an iterator for the delete terms in the intermediate form.
+   * @return an iterator for the delete terms
+   */
+  public Iterator<Term> deleteTermIterator() {
+    return deleteList.iterator();
+  }
+
+  /**
+   * This method is used by the index update mapper and process a document
+   * operation into the current intermediate form.
+   * @param doc  input document operation
+   * @param analyzer  the analyzer
+   * @throws IOException
+   */
+  public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
+    if (doc.getOp() == DocumentAndOp.Op.DELETE
+        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+      deleteList.add(doc.getTerm());
+
+    }
+
+    if (doc.getOp() == DocumentAndOp.Op.INSERT
+        || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+
+      if (writer == null) {
+        // analyzer is null because we specify an analyzer with addDocument
+        writer = createWriter();
+      }
+
+      writer.addDocument(doc.getDocument(), analyzer);
+      numDocs++;
+    }
+
+  }
+
+  /**
+   * This method is used by the index update combiner and process an
+   * intermediate form into the current intermediate form. More specifically,
+   * the input intermediate forms are a single-document ram index and/or a
+   * single delete term.
+   * @param form  the input intermediate form
+   * @throws IOException
+   */
+  public void process(IntermediateForm form) throws IOException {
+    if (form.deleteList.size() > 0) {
+      deleteList.addAll(form.deleteList);
+    }
+
+    if (form.dir.sizeInBytes() > 0) {
+      if (writer == null) {
+        writer = createWriter();
+      }
+
+      writer.addIndexesNoOptimize(new Directory[] { form.dir });
+      numDocs++;
+    }
+
+  }
+
+  /**
+   * Close the Lucene index writer associated with the intermediate form,
+   * if created. Do not close the ram directory. In fact, there is no need
+   * to close a ram directory.
+   * @throws IOException
+   */
+  public void closeWriter() throws IOException {
+    if (writer != null) {
+      writer.close();
+      writer = null;
+    }
+  }
+
+  /**
+   * The total size of files in the directory and ram used by the index writer.
+   * It does not include memory used by the delete list.
+   * @return the total size in bytes
+   */
+  public long totalSizeInBytes() throws IOException {
+    long size = dir.sizeInBytes();
+    if (writer != null) {
+      size += writer.ramSizeInBytes();
+    }
+    return size;
+  }
+
+  /* (non-Javadoc)
+   * @see java.lang.Object#toString()
+   */
+  public String toString() {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append(this.getClass().getSimpleName());
+    buffer.append("[numDocs=");
+    buffer.append(numDocs);
+    buffer.append(", numDeletes=");
+    buffer.append(deleteList.size());
+    if (deleteList.size() > 0) {
+      buffer.append("(");
+      Iterator<Term> iter = deleteTermIterator();
+      while (iter.hasNext()) {
+        buffer.append(iter.next());
+        buffer.append(" ");
+      }
+      buffer.append(")");
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  private IndexWriter createWriter() throws IOException {
+    IndexWriter writer =
+        new IndexWriter(dir, false, null,
+            new KeepOnlyLastCommitDeletionPolicy());
+    writer.setUseCompoundFile(false);
+
+    if (iconf != null) {
+      int maxFieldLength = iconf.getIndexMaxFieldLength();
+      if (maxFieldLength > 0) {
+        writer.setMaxFieldLength(maxFieldLength);
+      }
+    }
+
+    return writer;
+  }
+
+  private void resetForm() throws IOException {
+    deleteList.clear();
+    if (dir.sizeInBytes() > 0) {
+      // it's ok if we don't close a ram directory
+      dir.close();
+      // an alternative is to delete all the files and reuse the ram directory
+      dir = new RAMDirectory();
+    }
+    assert (writer == null);
+    numDocs = 0;
+  }
+
+  // ///////////////////////////////////
+  // Writable
+  // ///////////////////////////////////
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(deleteList.size());
+    for (Term term : deleteList) {
+      Text.writeString(out, term.field());
+      Text.writeString(out, term.text());
+    }
+
+    String[] files = dir.list();
+    RAMDirectoryUtil.writeRAMFiles(out, dir, files);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+   */
+  public void readFields(DataInput in) throws IOException {
+    resetForm();
+
+    int numDeleteTerms = in.readInt();
+    for (int i = 0; i < numDeleteTerms; i++) {
+      String field = Text.readString(in);
+      String text = Text.readString(in);
+      deleteList.add(new Term(field, text));
+    }
+
+    RAMDirectoryUtil.readRAMFiles(in, dir);
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java Tue Oct 16 00:02:55 2012
@@ -1,105 +1,105 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.lucene;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMDirectory;
-
-public class TestMixedDirectory extends TestCase {
-  private int numDocsPerUpdate = 10;
-  private int maxBufferedDocs = 2;
-
-  public void testMixedDirectoryAndPolicy() throws IOException {
-    Directory readDir = new RAMDirectory();
-    updateIndex(readDir, 0, numDocsPerUpdate,
-        new KeepOnlyLastCommitDeletionPolicy());
-
-    verify(readDir, numDocsPerUpdate);
-
-    IndexOutput out =
-        readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
-            + ".cfs");
-    out.writeInt(0);
-    out.close();
-
-    Directory writeDir = new RAMDirectory();
-    Directory mixedDir = new MixedDirectory(readDir, writeDir);
-    updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
-        new MixedDeletionPolicy());
-
-    verify(readDir, numDocsPerUpdate);
-    verify(mixedDir, 2 * numDocsPerUpdate);
-  }
-
-  public void updateIndex(Directory dir, int base, int numDocs,
-      IndexDeletionPolicy policy) throws IOException {
-    IndexWriter writer =
-        new IndexWriter(dir, false, new StandardAnalyzer(), policy);
-    writer.setMaxBufferedDocs(maxBufferedDocs);
-    writer.setMergeFactor(1000);
-    for (int i = 0; i < numDocs; i++) {
-      addDoc(writer, base + i);
-    }
-    writer.close();
-  }
-
-  private void addDoc(IndexWriter writer, int id) throws IOException {
-    Document doc = new Document();
-    doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
-        Field.Index.UN_TOKENIZED));
-    doc.add(new Field("content", "apache", Field.Store.NO,
-        Field.Index.TOKENIZED));
-    writer.addDocument(doc);
-  }
-
-  private void verify(Directory dir, int expectedHits) throws IOException {
-    IndexSearcher searcher = new IndexSearcher(dir);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-    int numHits = hits.length();
-
-    assertEquals(expectedHits, numHits);
-
-    int[] docs = new int[numHits];
-    for (int i = 0; i < numHits; i++) {
-      Document hit = hits.doc(i);
-      docs[Integer.parseInt(hit.get("id"))]++;
-    }
-    for (int i = 0; i < numHits; i++) {
-      assertEquals(1, docs[i]);
-    }
-
-    searcher.close();
-  }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.lucene;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+
+public class TestMixedDirectory extends TestCase {
+  private int numDocsPerUpdate = 10;
+  private int maxBufferedDocs = 2;
+
+  public void testMixedDirectoryAndPolicy() throws IOException {
+    Directory readDir = new RAMDirectory();
+    updateIndex(readDir, 0, numDocsPerUpdate,
+        new KeepOnlyLastCommitDeletionPolicy());
+
+    verify(readDir, numDocsPerUpdate);
+
+    IndexOutput out =
+        readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
+            + ".cfs");
+    out.writeInt(0);
+    out.close();
+
+    Directory writeDir = new RAMDirectory();
+    Directory mixedDir = new MixedDirectory(readDir, writeDir);
+    updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
+        new MixedDeletionPolicy());
+
+    verify(readDir, numDocsPerUpdate);
+    verify(mixedDir, 2 * numDocsPerUpdate);
+  }
+
+  public void updateIndex(Directory dir, int base, int numDocs,
+      IndexDeletionPolicy policy) throws IOException {
+    IndexWriter writer =
+        new IndexWriter(dir, false, new StandardAnalyzer(), policy);
+    writer.setMaxBufferedDocs(maxBufferedDocs);
+    writer.setMergeFactor(1000);
+    for (int i = 0; i < numDocs; i++) {
+      addDoc(writer, base + i);
+    }
+    writer.close();
+  }
+
+  private void addDoc(IndexWriter writer, int id) throws IOException {
+    Document doc = new Document();
+    doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
+        Field.Index.UN_TOKENIZED));
+    doc.add(new Field("content", "apache", Field.Store.NO,
+        Field.Index.TOKENIZED));
+    writer.addDocument(doc);
+  }
+
+  private void verify(Directory dir, int expectedHits) throws IOException {
+    IndexSearcher searcher = new IndexSearcher(dir);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+    int numHits = hits.length();
+
+    assertEquals(expectedHits, numHits);
+
+    int[] docs = new int[numHits];
+    for (int i = 0; i < numHits; i++) {
+      Document hit = hits.doc(i);
+      docs[Integer.parseInt(hit.get("id"))]++;
+    }
+    for (int i = 0; i < numHits; i++) {
+      assertEquals(1, docs[i]);
+    }
+
+    searcher.close();
+  }
+
+}

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=1398581&r1=1398580&r2=1398581&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Tue Oct 16 00:02:55 2012
@@ -1,234 +1,234 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestDistributionPolicy extends TestCase {
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  // however, "we only allow 0 or 1 reducer in local mode" - from
-  // LocalJobRunner
-  private Configuration conf;
-  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
-  private Path localUpdatePath =
-      new Path(System.getProperty("build.test") + "/sample/data2.txt");
-  private Path inputPath = new Path("/myexample/data.txt");
-  private Path updatePath = new Path("/myexample/data2.txt");
-  private Path outputPath = new Path("/myoutput");
-  private Path indexPath = new Path("/myindex");
-  private int numShards = 3;
-  private int numMapTasks = 5;
-
-  private int numDataNodes = 3;
-  private int numTaskTrackers = 3;
-
-  private int numDocsPerRun = 10; // num of docs in local input path
-
-  private FileSystem fs;
-  private MiniDFSCluster dfsCluster;
-  private MiniMRCluster mrCluster;
-
-  public TestDistributionPolicy() throws IOException {
-    super();
-    if (System.getProperty("hadoop.log.dir") == null) {
-      String base = new File(".").getPath(); // getAbsolutePath();
-      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
-    }
-    conf = new Configuration();
-  }
-
-  protected void setUp() throws Exception {
-    super.setUp();
-    try {
-      dfsCluster =
-          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
-      fs = dfsCluster.getFileSystem();
-      if (fs.exists(inputPath)) {
-        fs.delete(inputPath, true);
-      }
-      fs.copyFromLocalFile(localInputPath, inputPath);
-      if (fs.exists(updatePath)) {
-        fs.delete(updatePath, true);
-      }
-      fs.copyFromLocalFile(localUpdatePath, updatePath);
-
-      if (fs.exists(outputPath)) {
-        // do not create, mapred will create
-        fs.delete(outputPath, true);
-      }
-
-      if (fs.exists(indexPath)) {
-        fs.delete(indexPath, true);
-      }
-
-      mrCluster =
-          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
-    } catch (IOException e) {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-        dfsCluster = null;
-      }
-
-      if (fs != null) {
-        fs.close();
-        fs = null;
-      }
-
-      if (mrCluster != null) {
-        mrCluster.shutdown();
-        mrCluster = null;
-      }
-
-      throw e;
-    }
-
-  }
-
-  protected void tearDown() throws Exception {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-      mrCluster = null;
-    }
-
-    super.tearDown();
-  }
-
-  public void testDistributionPolicy() throws IOException {
-    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-
-    // test hashing distribution policy
-    iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
-    onetest();
-
-    if (fs.exists(indexPath)) {
-      fs.delete(indexPath, true);
-    }
-
-    // test round-robin distribution policy
-    iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
-    onetest();
-  }
-
-  private void onetest() throws IOException {
-    long versionNumber = -1;
-    long generation = -1;
-
-    Shard[] shards = new Shard[numShards];
-    for (int j = 0; j < shards.length; j++) {
-      shards[j] =
-          new Shard(versionNumber,
-              new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
-              generation);
-    }
-
-    if (fs.exists(outputPath)) {
-      fs.delete(outputPath, true);
-    }
-
-    IIndexUpdater updater = new IndexUpdater();
-    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
-        shards);
-
-    if (fs.exists(outputPath)) {
-      fs.delete(outputPath, true);
-    }
-
-    // delete docs w/ even docids, update docs w/ odd docids
-    updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
-        shards);
-
-    verify(shards);
-  }
-
-  private void verify(Shard[] shards) throws IOException {
-    // verify the index
-    IndexReader[] readers = new IndexReader[shards.length];
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      readers[i] = IndexReader.open(dir);
-    }
-
-    IndexReader reader = new MultiReader(readers);
-    IndexSearcher searcher = new IndexSearcher(reader);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-    assertEquals(0, hits.length());
-
-    hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
-    assertEquals(numDocsPerRun / 2, hits.length());
-
-    int[] counts = new int[numDocsPerRun];
-    for (int i = 0; i < hits.length(); i++) {
-      Document doc = hits.doc(i);
-      counts[Integer.parseInt(doc.get("id"))]++;
-    }
-
-    for (int i = 0; i < numDocsPerRun; i++) {
-      if (i % 2 == 0) {
-        assertEquals(0, counts[i]);
-      } else {
-        assertEquals(1, counts[i]);
-      }
-    }
-
-    searcher.close();
-    reader.close();
-  }
-
-}
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestDistributionPolicy extends TestCase {
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  // however, "we only allow 0 or 1 reducer in local mode" - from
+  // LocalJobRunner
+  private Configuration conf;
+  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+  private Path localUpdatePath =
+      new Path(System.getProperty("build.test") + "/sample/data2.txt");
+  private Path inputPath = new Path("/myexample/data.txt");
+  private Path updatePath = new Path("/myexample/data2.txt");
+  private Path outputPath = new Path("/myoutput");
+  private Path indexPath = new Path("/myindex");
+  private int numShards = 3;
+  private int numMapTasks = 5;
+
+  private int numDataNodes = 3;
+  private int numTaskTrackers = 3;
+
+  private int numDocsPerRun = 10; // num of docs in local input path
+
+  private FileSystem fs;
+  private MiniDFSCluster dfsCluster;
+  private MiniMRCluster mrCluster;
+
+  public TestDistributionPolicy() throws IOException {
+    super();
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+    }
+    conf = new Configuration();
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    try {
+      dfsCluster =
+          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+      fs = dfsCluster.getFileSystem();
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+      fs.copyFromLocalFile(localInputPath, inputPath);
+      if (fs.exists(updatePath)) {
+        fs.delete(updatePath, true);
+      }
+      fs.copyFromLocalFile(localUpdatePath, updatePath);
+
+      if (fs.exists(outputPath)) {
+        // do not create, mapred will create
+        fs.delete(outputPath, true);
+      }
+
+      if (fs.exists(indexPath)) {
+        fs.delete(indexPath, true);
+      }
+
+      mrCluster =
+          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+    } catch (IOException e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+
+      if (fs != null) {
+        fs.close();
+        fs = null;
+      }
+
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+        mrCluster = null;
+      }
+
+      throw e;
+    }
+
+  }
+
+  protected void tearDown() throws Exception {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+
+    super.tearDown();
+  }
+
+  public void testDistributionPolicy() throws IOException {
+    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+
+    // test hashing distribution policy
+    iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
+    onetest();
+
+    if (fs.exists(indexPath)) {
+      fs.delete(indexPath, true);
+    }
+
+    // test round-robin distribution policy
+    iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
+    onetest();
+  }
+
+  private void onetest() throws IOException {
+    long versionNumber = -1;
+    long generation = -1;
+
+    Shard[] shards = new Shard[numShards];
+    for (int j = 0; j < shards.length; j++) {
+      shards[j] =
+          new Shard(versionNumber,
+              new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
+              generation);
+    }
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    IIndexUpdater updater = new IndexUpdater();
+    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+        shards);
+
+    if (fs.exists(outputPath)) {
+      fs.delete(outputPath, true);
+    }
+
+    // delete docs w/ even docids, update docs w/ odd docids
+    updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
+        shards);
+
+    verify(shards);
+  }
+
+  private void verify(Shard[] shards) throws IOException {
+    // verify the index
+    IndexReader[] readers = new IndexReader[shards.length];
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      readers[i] = IndexReader.open(dir);
+    }
+
+    IndexReader reader = new MultiReader(readers);
+    IndexSearcher searcher = new IndexSearcher(reader);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+    assertEquals(0, hits.length());
+
+    hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
+    assertEquals(numDocsPerRun / 2, hits.length());
+
+    int[] counts = new int[numDocsPerRun];
+    for (int i = 0; i < hits.length(); i++) {
+      Document doc = hits.doc(i);
+      counts[Integer.parseInt(doc.get("id"))]++;
+    }
+
+    for (int i = 0; i < numDocsPerRun; i++) {
+      if (i % 2 == 0) {
+        assertEquals(0, counts[i]);
+      } else {
+        assertEquals(1, counts[i]);
+      }
+    }
+
+    searcher.close();
+    reader.close();
+  }
+
+}