You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2012/10/19 04:28:42 UTC
svn commit: r1399950 [11/11] - in
/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/
dev-support/ hadoop-mapreduce-client/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/
hadoop-mapredu...
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/java/org/apache/hadoop/contrib/index/mapred/IntermediateForm.java Fri Oct 19 02:25:55 2012
@@ -1,252 +1,252 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.RAMDirectory;
-
-/**
- * An intermediate form for one or more parsed Lucene documents and/or
- * delete terms. It actually uses Lucene file format as the format for
- * the intermediate form by using RAM dir files.
- *
- * Note: If process(*) is ever called, closeWriter() should be called.
- * Otherwise, no need to call closeWriter().
- */
-public class IntermediateForm implements Writable {
-
- private IndexUpdateConfiguration iconf = null;
- private final Collection<Term> deleteList;
- private RAMDirectory dir;
- private IndexWriter writer;
- private int numDocs;
-
- /**
- * Constructor
- * @throws IOException
- */
- public IntermediateForm() throws IOException {
- deleteList = new ConcurrentLinkedQueue<Term>();
- dir = new RAMDirectory();
- writer = null;
- numDocs = 0;
- }
-
- /**
- * Configure using an index update configuration.
- * @param iconf the index update configuration
- */
- public void configure(IndexUpdateConfiguration iconf) {
- this.iconf = iconf;
- }
-
- /**
- * Get the ram directory of the intermediate form.
- * @return the ram directory
- */
- public Directory getDirectory() {
- return dir;
- }
-
- /**
- * Get an iterator for the delete terms in the intermediate form.
- * @return an iterator for the delete terms
- */
- public Iterator<Term> deleteTermIterator() {
- return deleteList.iterator();
- }
-
- /**
- * This method is used by the index update mapper and process a document
- * operation into the current intermediate form.
- * @param doc input document operation
- * @param analyzer the analyzer
- * @throws IOException
- */
- public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
- if (doc.getOp() == DocumentAndOp.Op.DELETE
- || doc.getOp() == DocumentAndOp.Op.UPDATE) {
- deleteList.add(doc.getTerm());
-
- }
-
- if (doc.getOp() == DocumentAndOp.Op.INSERT
- || doc.getOp() == DocumentAndOp.Op.UPDATE) {
-
- if (writer == null) {
- // analyzer is null because we specify an analyzer with addDocument
- writer = createWriter();
- }
-
- writer.addDocument(doc.getDocument(), analyzer);
- numDocs++;
- }
-
- }
-
- /**
- * This method is used by the index update combiner and process an
- * intermediate form into the current intermediate form. More specifically,
- * the input intermediate forms are a single-document ram index and/or a
- * single delete term.
- * @param form the input intermediate form
- * @throws IOException
- */
- public void process(IntermediateForm form) throws IOException {
- if (form.deleteList.size() > 0) {
- deleteList.addAll(form.deleteList);
- }
-
- if (form.dir.sizeInBytes() > 0) {
- if (writer == null) {
- writer = createWriter();
- }
-
- writer.addIndexesNoOptimize(new Directory[] { form.dir });
- numDocs++;
- }
-
- }
-
- /**
- * Close the Lucene index writer associated with the intermediate form,
- * if created. Do not close the ram directory. In fact, there is no need
- * to close a ram directory.
- * @throws IOException
- */
- public void closeWriter() throws IOException {
- if (writer != null) {
- writer.close();
- writer = null;
- }
- }
-
- /**
- * The total size of files in the directory and ram used by the index writer.
- * It does not include memory used by the delete list.
- * @return the total size in bytes
- */
- public long totalSizeInBytes() throws IOException {
- long size = dir.sizeInBytes();
- if (writer != null) {
- size += writer.ramSizeInBytes();
- }
- return size;
- }
-
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
- public String toString() {
- StringBuilder buffer = new StringBuilder();
- buffer.append(this.getClass().getSimpleName());
- buffer.append("[numDocs=");
- buffer.append(numDocs);
- buffer.append(", numDeletes=");
- buffer.append(deleteList.size());
- if (deleteList.size() > 0) {
- buffer.append("(");
- Iterator<Term> iter = deleteTermIterator();
- while (iter.hasNext()) {
- buffer.append(iter.next());
- buffer.append(" ");
- }
- buffer.append(")");
- }
- buffer.append("]");
- return buffer.toString();
- }
-
- private IndexWriter createWriter() throws IOException {
- IndexWriter writer =
- new IndexWriter(dir, false, null,
- new KeepOnlyLastCommitDeletionPolicy());
- writer.setUseCompoundFile(false);
-
- if (iconf != null) {
- int maxFieldLength = iconf.getIndexMaxFieldLength();
- if (maxFieldLength > 0) {
- writer.setMaxFieldLength(maxFieldLength);
- }
- }
-
- return writer;
- }
-
- private void resetForm() throws IOException {
- deleteList.clear();
- if (dir.sizeInBytes() > 0) {
- // it's ok if we don't close a ram directory
- dir.close();
- // an alternative is to delete all the files and reuse the ram directory
- dir = new RAMDirectory();
- }
- assert (writer == null);
- numDocs = 0;
- }
-
- // ///////////////////////////////////
- // Writable
- // ///////////////////////////////////
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
- */
- public void write(DataOutput out) throws IOException {
- out.writeInt(deleteList.size());
- for (Term term : deleteList) {
- Text.writeString(out, term.field());
- Text.writeString(out, term.text());
- }
-
- String[] files = dir.list();
- RAMDirectoryUtil.writeRAMFiles(out, dir, files);
- }
-
- /* (non-Javadoc)
- * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
- */
- public void readFields(DataInput in) throws IOException {
- resetForm();
-
- int numDeleteTerms = in.readInt();
- for (int i = 0; i < numDeleteTerms; i++) {
- String field = Text.readString(in);
- String text = Text.readString(in);
- deleteList.add(new Term(field, text));
- }
-
- RAMDirectoryUtil.readRAMFiles(in, dir);
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.hadoop.contrib.index.lucene.RAMDirectoryUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.RAMDirectory;
+
+/**
+ * An intermediate form for one or more parsed Lucene documents and/or
+ * delete terms. It actually uses Lucene file format as the format for
+ * the intermediate form by using RAM dir files.
+ *
+ * Note: If process(*) is ever called, closeWriter() should be called.
+ * Otherwise, no need to call closeWriter().
+ */
+public class IntermediateForm implements Writable {
+
+ private IndexUpdateConfiguration iconf = null;
+ private final Collection<Term> deleteList;
+ private RAMDirectory dir;
+ private IndexWriter writer;
+ private int numDocs;
+
+ /**
+ * Constructor
+ * @throws IOException
+ */
+ public IntermediateForm() throws IOException {
+ deleteList = new ConcurrentLinkedQueue<Term>();
+ dir = new RAMDirectory();
+ writer = null;
+ numDocs = 0;
+ }
+
+ /**
+ * Configure using an index update configuration.
+ * @param iconf the index update configuration
+ */
+ public void configure(IndexUpdateConfiguration iconf) {
+ this.iconf = iconf;
+ }
+
+ /**
+ * Get the ram directory of the intermediate form.
+ * @return the ram directory
+ */
+ public Directory getDirectory() {
+ return dir;
+ }
+
+ /**
+ * Get an iterator for the delete terms in the intermediate form.
+ * @return an iterator for the delete terms
+ */
+ public Iterator<Term> deleteTermIterator() {
+ return deleteList.iterator();
+ }
+
+ /**
+ * This method is used by the index update mapper and process a document
+ * operation into the current intermediate form.
+ * @param doc input document operation
+ * @param analyzer the analyzer
+ * @throws IOException
+ */
+ public void process(DocumentAndOp doc, Analyzer analyzer) throws IOException {
+ if (doc.getOp() == DocumentAndOp.Op.DELETE
+ || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+ deleteList.add(doc.getTerm());
+
+ }
+
+ if (doc.getOp() == DocumentAndOp.Op.INSERT
+ || doc.getOp() == DocumentAndOp.Op.UPDATE) {
+
+ if (writer == null) {
+ // analyzer is null because we specify an analyzer with addDocument
+ writer = createWriter();
+ }
+
+ writer.addDocument(doc.getDocument(), analyzer);
+ numDocs++;
+ }
+
+ }
+
+ /**
+ * This method is used by the index update combiner and process an
+ * intermediate form into the current intermediate form. More specifically,
+ * the input intermediate forms are a single-document ram index and/or a
+ * single delete term.
+ * @param form the input intermediate form
+ * @throws IOException
+ */
+ public void process(IntermediateForm form) throws IOException {
+ if (form.deleteList.size() > 0) {
+ deleteList.addAll(form.deleteList);
+ }
+
+ if (form.dir.sizeInBytes() > 0) {
+ if (writer == null) {
+ writer = createWriter();
+ }
+
+ writer.addIndexesNoOptimize(new Directory[] { form.dir });
+ numDocs++;
+ }
+
+ }
+
+ /**
+ * Close the Lucene index writer associated with the intermediate form,
+ * if created. Do not close the ram directory. In fact, there is no need
+ * to close a ram directory.
+ * @throws IOException
+ */
+ public void closeWriter() throws IOException {
+ if (writer != null) {
+ writer.close();
+ writer = null;
+ }
+ }
+
+ /**
+ * The total size of files in the directory and ram used by the index writer.
+ * It does not include memory used by the delete list.
+ * @return the total size in bytes
+ */
+ public long totalSizeInBytes() throws IOException {
+ long size = dir.sizeInBytes();
+ if (writer != null) {
+ size += writer.ramSizeInBytes();
+ }
+ return size;
+ }
+
+ /* (non-Javadoc)
+ * @see java.lang.Object#toString()
+ */
+ public String toString() {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(this.getClass().getSimpleName());
+ buffer.append("[numDocs=");
+ buffer.append(numDocs);
+ buffer.append(", numDeletes=");
+ buffer.append(deleteList.size());
+ if (deleteList.size() > 0) {
+ buffer.append("(");
+ Iterator<Term> iter = deleteTermIterator();
+ while (iter.hasNext()) {
+ buffer.append(iter.next());
+ buffer.append(" ");
+ }
+ buffer.append(")");
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ private IndexWriter createWriter() throws IOException {
+ IndexWriter writer =
+ new IndexWriter(dir, false, null,
+ new KeepOnlyLastCommitDeletionPolicy());
+ writer.setUseCompoundFile(false);
+
+ if (iconf != null) {
+ int maxFieldLength = iconf.getIndexMaxFieldLength();
+ if (maxFieldLength > 0) {
+ writer.setMaxFieldLength(maxFieldLength);
+ }
+ }
+
+ return writer;
+ }
+
+ private void resetForm() throws IOException {
+ deleteList.clear();
+ if (dir.sizeInBytes() > 0) {
+ // it's ok if we don't close a ram directory
+ dir.close();
+ // an alternative is to delete all the files and reuse the ram directory
+ dir = new RAMDirectory();
+ }
+ assert (writer == null);
+ numDocs = 0;
+ }
+
+ // ///////////////////////////////////
+ // Writable
+ // ///////////////////////////////////
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(deleteList.size());
+ for (Term term : deleteList) {
+ Text.writeString(out, term.field());
+ Text.writeString(out, term.text());
+ }
+
+ String[] files = dir.list();
+ RAMDirectoryUtil.writeRAMFiles(out, dir, files);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
+ */
+ public void readFields(DataInput in) throws IOException {
+ resetForm();
+
+ int numDeleteTerms = in.readInt();
+ for (int i = 0; i < numDeleteTerms; i++) {
+ String field = Text.readString(in);
+ String text = Text.readString(in);
+ deleteList.add(new Term(field, text));
+ }
+
+ RAMDirectoryUtil.readRAMFiles(in, dir);
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/lucene/TestMixedDirectory.java Fri Oct 19 02:25:55 2012
@@ -1,105 +1,105 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.lucene;
-
-import java.io.IOException;
-
-import junit.framework.TestCase;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.index.IndexDeletionPolicy;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-import org.apache.lucene.store.IndexOutput;
-import org.apache.lucene.store.RAMDirectory;
-
-public class TestMixedDirectory extends TestCase {
- private int numDocsPerUpdate = 10;
- private int maxBufferedDocs = 2;
-
- public void testMixedDirectoryAndPolicy() throws IOException {
- Directory readDir = new RAMDirectory();
- updateIndex(readDir, 0, numDocsPerUpdate,
- new KeepOnlyLastCommitDeletionPolicy());
-
- verify(readDir, numDocsPerUpdate);
-
- IndexOutput out =
- readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
- + ".cfs");
- out.writeInt(0);
- out.close();
-
- Directory writeDir = new RAMDirectory();
- Directory mixedDir = new MixedDirectory(readDir, writeDir);
- updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
- new MixedDeletionPolicy());
-
- verify(readDir, numDocsPerUpdate);
- verify(mixedDir, 2 * numDocsPerUpdate);
- }
-
- public void updateIndex(Directory dir, int base, int numDocs,
- IndexDeletionPolicy policy) throws IOException {
- IndexWriter writer =
- new IndexWriter(dir, false, new StandardAnalyzer(), policy);
- writer.setMaxBufferedDocs(maxBufferedDocs);
- writer.setMergeFactor(1000);
- for (int i = 0; i < numDocs; i++) {
- addDoc(writer, base + i);
- }
- writer.close();
- }
-
- private void addDoc(IndexWriter writer, int id) throws IOException {
- Document doc = new Document();
- doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
- Field.Index.UN_TOKENIZED));
- doc.add(new Field("content", "apache", Field.Store.NO,
- Field.Index.TOKENIZED));
- writer.addDocument(doc);
- }
-
- private void verify(Directory dir, int expectedHits) throws IOException {
- IndexSearcher searcher = new IndexSearcher(dir);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
- int numHits = hits.length();
-
- assertEquals(expectedHits, numHits);
-
- int[] docs = new int[numHits];
- for (int i = 0; i < numHits; i++) {
- Document hit = hits.doc(i);
- docs[Integer.parseInt(hit.get("id"))]++;
- }
- for (int i = 0; i < numHits; i++) {
- assertEquals(1, docs[i]);
- }
-
- searcher.close();
- }
-
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.lucene;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.index.IndexDeletionPolicy;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.IndexOutput;
+import org.apache.lucene.store.RAMDirectory;
+
+public class TestMixedDirectory extends TestCase {
+ private int numDocsPerUpdate = 10;
+ private int maxBufferedDocs = 2;
+
+ public void testMixedDirectoryAndPolicy() throws IOException {
+ Directory readDir = new RAMDirectory();
+ updateIndex(readDir, 0, numDocsPerUpdate,
+ new KeepOnlyLastCommitDeletionPolicy());
+
+ verify(readDir, numDocsPerUpdate);
+
+ IndexOutput out =
+ readDir.createOutput("_" + (numDocsPerUpdate / maxBufferedDocs + 2)
+ + ".cfs");
+ out.writeInt(0);
+ out.close();
+
+ Directory writeDir = new RAMDirectory();
+ Directory mixedDir = new MixedDirectory(readDir, writeDir);
+ updateIndex(mixedDir, numDocsPerUpdate, numDocsPerUpdate,
+ new MixedDeletionPolicy());
+
+ verify(readDir, numDocsPerUpdate);
+ verify(mixedDir, 2 * numDocsPerUpdate);
+ }
+
+ public void updateIndex(Directory dir, int base, int numDocs,
+ IndexDeletionPolicy policy) throws IOException {
+ IndexWriter writer =
+ new IndexWriter(dir, false, new StandardAnalyzer(), policy);
+ writer.setMaxBufferedDocs(maxBufferedDocs);
+ writer.setMergeFactor(1000);
+ for (int i = 0; i < numDocs; i++) {
+ addDoc(writer, base + i);
+ }
+ writer.close();
+ }
+
+ private void addDoc(IndexWriter writer, int id) throws IOException {
+ Document doc = new Document();
+ doc.add(new Field("id", String.valueOf(id), Field.Store.YES,
+ Field.Index.UN_TOKENIZED));
+ doc.add(new Field("content", "apache", Field.Store.NO,
+ Field.Index.TOKENIZED));
+ writer.addDocument(doc);
+ }
+
+ private void verify(Directory dir, int expectedHits) throws IOException {
+ IndexSearcher searcher = new IndexSearcher(dir);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+ int numHits = hits.length();
+
+ assertEquals(expectedHits, numHits);
+
+ int[] docs = new int[numHits];
+ for (int i = 0; i < numHits; i++) {
+ Document hit = hits.doc(i);
+ docs[Integer.parseInt(hit.get("id"))]++;
+ }
+ for (int i = 0; i < numHits; i++) {
+ assertEquals(1, docs[i]);
+ }
+
+ searcher.close();
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestDistributionPolicy.java Fri Oct 19 02:25:55 2012
@@ -1,234 +1,234 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
-import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
+import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestDistributionPolicy extends TestCase {
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- // however, "we only allow 0 or 1 reducer in local mode" - from
- // LocalJobRunner
- private Configuration conf;
- private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
- private Path localUpdatePath =
- new Path(System.getProperty("build.test") + "/sample/data2.txt");
- private Path inputPath = new Path("/myexample/data.txt");
- private Path updatePath = new Path("/myexample/data2.txt");
- private Path outputPath = new Path("/myoutput");
- private Path indexPath = new Path("/myindex");
- private int numShards = 3;
- private int numMapTasks = 5;
-
- private int numDataNodes = 3;
- private int numTaskTrackers = 3;
-
- private int numDocsPerRun = 10; // num of docs in local input path
-
- private FileSystem fs;
- private MiniDFSCluster dfsCluster;
- private MiniMRCluster mrCluster;
-
- public TestDistributionPolicy() throws IOException {
- super();
- if (System.getProperty("hadoop.log.dir") == null) {
- String base = new File(".").getPath(); // getAbsolutePath();
- System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
- }
- conf = new Configuration();
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- try {
- dfsCluster =
- new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
- fs = dfsCluster.getFileSystem();
- if (fs.exists(inputPath)) {
- fs.delete(inputPath, true);
- }
- fs.copyFromLocalFile(localInputPath, inputPath);
- if (fs.exists(updatePath)) {
- fs.delete(updatePath, true);
- }
- fs.copyFromLocalFile(localUpdatePath, updatePath);
-
- if (fs.exists(outputPath)) {
- // do not create, mapred will create
- fs.delete(outputPath, true);
- }
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- mrCluster =
- new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
- } catch (IOException e) {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- throw e;
- }
-
- }
-
- protected void tearDown() throws Exception {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- super.tearDown();
- }
-
- public void testDistributionPolicy() throws IOException {
- IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-
- // test hashing distribution policy
- iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
- onetest();
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- // test round-robin distribution policy
- iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
- onetest();
- }
-
- private void onetest() throws IOException {
- long versionNumber = -1;
- long generation = -1;
-
- Shard[] shards = new Shard[numShards];
- for (int j = 0; j < shards.length; j++) {
- shards[j] =
- new Shard(versionNumber,
- new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
- generation);
- }
-
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- IIndexUpdater updater = new IndexUpdater();
- updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
- shards);
-
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- // delete docs w/ even docids, update docs w/ odd docids
- updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
- shards);
-
- verify(shards);
- }
-
- private void verify(Shard[] shards) throws IOException {
- // verify the index
- IndexReader[] readers = new IndexReader[shards.length];
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- readers[i] = IndexReader.open(dir);
- }
-
- IndexReader reader = new MultiReader(readers);
- IndexSearcher searcher = new IndexSearcher(reader);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
- assertEquals(0, hits.length());
-
- hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
- assertEquals(numDocsPerRun / 2, hits.length());
-
- int[] counts = new int[numDocsPerRun];
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- counts[Integer.parseInt(doc.get("id"))]++;
- }
-
- for (int i = 0; i < numDocsPerRun; i++) {
- if (i % 2 == 0) {
- assertEquals(0, counts[i]);
- } else {
- assertEquals(1, counts[i]);
- }
- }
-
- searcher.close();
- reader.close();
- }
-
-}
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestDistributionPolicy extends TestCase {
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ // however, "we only allow 0 or 1 reducer in local mode" - from
+ // LocalJobRunner
+ private Configuration conf;
+ private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+ private Path localUpdatePath =
+ new Path(System.getProperty("build.test") + "/sample/data2.txt");
+ private Path inputPath = new Path("/myexample/data.txt");
+ private Path updatePath = new Path("/myexample/data2.txt");
+ private Path outputPath = new Path("/myoutput");
+ private Path indexPath = new Path("/myindex");
+ private int numShards = 3;
+ private int numMapTasks = 5;
+
+ private int numDataNodes = 3;
+ private int numTaskTrackers = 3;
+
+ private int numDocsPerRun = 10; // num of docs in local input path
+
+ private FileSystem fs;
+ private MiniDFSCluster dfsCluster;
+ private MiniMRCluster mrCluster;
+
+ public TestDistributionPolicy() throws IOException {
+ super();
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+ }
+ conf = new Configuration();
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ try {
+ dfsCluster =
+ new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+ fs = dfsCluster.getFileSystem();
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+ fs.copyFromLocalFile(localInputPath, inputPath);
+ if (fs.exists(updatePath)) {
+ fs.delete(updatePath, true);
+ }
+ fs.copyFromLocalFile(localUpdatePath, updatePath);
+
+ if (fs.exists(outputPath)) {
+ // do not create, mapred will create
+ fs.delete(outputPath, true);
+ }
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ mrCluster =
+ new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+ } catch (IOException e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ throw e;
+ }
+
+ }
+
+ protected void tearDown() throws Exception {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void testDistributionPolicy() throws IOException {
+ IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+
+ // test hashing distribution policy
+ iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
+ onetest();
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ // test round-robin distribution policy
+ iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
+ onetest();
+ }
+
+ private void onetest() throws IOException {
+ long versionNumber = -1;
+ long generation = -1;
+
+ Shard[] shards = new Shard[numShards];
+ for (int j = 0; j < shards.length; j++) {
+ shards[j] =
+ new Shard(versionNumber,
+ new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
+ generation);
+ }
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ IIndexUpdater updater = new IndexUpdater();
+ updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+ shards);
+
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ // delete docs w/ even docids, update docs w/ odd docids
+ updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
+ shards);
+
+ verify(shards);
+ }
+
+ private void verify(Shard[] shards) throws IOException {
+ // verify the index
+ IndexReader[] readers = new IndexReader[shards.length];
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ readers[i] = IndexReader.open(dir);
+ }
+
+ IndexReader reader = new MultiReader(readers);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+ assertEquals(0, hits.length());
+
+ hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
+ assertEquals(numDocsPerRun / 2, hits.length());
+
+ int[] counts = new int[numDocsPerRun];
+ for (int i = 0; i < hits.length(); i++) {
+ Document doc = hits.doc(i);
+ counts[Integer.parseInt(doc.get("id"))]++;
+ }
+
+ for (int i = 0; i < numDocsPerRun; i++) {
+ if (i % 2 == 0) {
+ assertEquals(0, counts[i]);
+ } else {
+ assertEquals(1, counts[i]);
+ }
+ }
+
+ searcher.close();
+ reader.close();
+ }
+
+}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java Fri Oct 19 02:25:55 2012
@@ -1,258 +1,258 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestIndexUpdater extends TestCase {
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- // however, "we only allow 0 or 1 reducer in local mode" - from
- // LocalJobRunner
- private Configuration conf;
- private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
- private Path inputPath = new Path("/myexample/data.txt");
- private Path outputPath = new Path("/myoutput");
- private Path indexPath = new Path("/myindex");
- private int initNumShards = 3;
- private int numMapTasks = 5;
-
- private int numDataNodes = 3;
- private int numTaskTrackers = 3;
-
- private int numRuns = 3;
- private int numDocsPerRun = 10; // num of docs in local input path
-
- private FileSystem fs;
- private MiniDFSCluster dfsCluster;
- private MiniMRCluster mrCluster;
-
- public TestIndexUpdater() throws IOException {
- super();
- if (System.getProperty("hadoop.log.dir") == null) {
- String base = new File(".").getPath(); // getAbsolutePath();
- System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
- }
- conf = new Configuration();
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestIndexUpdater extends TestCase {
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ // however, "we only allow 0 or 1 reducer in local mode" - from
+ // LocalJobRunner
+ private Configuration conf;
+ private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+ private Path inputPath = new Path("/myexample/data.txt");
+ private Path outputPath = new Path("/myoutput");
+ private Path indexPath = new Path("/myindex");
+ private int initNumShards = 3;
+ private int numMapTasks = 5;
+
+ private int numDataNodes = 3;
+ private int numTaskTrackers = 3;
+
+ private int numRuns = 3;
+ private int numDocsPerRun = 10; // num of docs in local input path
+
+ private FileSystem fs;
+ private MiniDFSCluster dfsCluster;
+ private MiniMRCluster mrCluster;
+
+ public TestIndexUpdater() throws IOException {
+ super();
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+ }
+ conf = new Configuration();
//See MAPREDUCE-947 for more details. Setting to false prevents the creation of _SUCCESS.
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- try {
- dfsCluster =
- new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
- fs = dfsCluster.getFileSystem();
- if (fs.exists(inputPath)) {
- fs.delete(inputPath, true);
- }
- fs.copyFromLocalFile(localInputPath, inputPath);
-
- if (fs.exists(outputPath)) {
- // do not create, mapred will create
- fs.delete(outputPath, true);
- }
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- mrCluster =
- new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
- } catch (IOException e) {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- throw e;
- }
-
- }
-
- protected void tearDown() throws Exception {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- super.tearDown();
- }
-
- public void testIndexUpdater() throws IOException {
- IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
- // max field length, compound file and number of segments will be checked
- // later
- iconf.setIndexMaxFieldLength(2);
- iconf.setIndexUseCompoundFile(true);
- iconf.setIndexMaxNumSegments(1);
- iconf.setMaxRAMSizeInBytes(20480);
-
- long versionNumber = -1;
- long generation = -1;
-
- for (int i = 0; i < numRuns; i++) {
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- Shard[] shards = new Shard[initNumShards + i];
- for (int j = 0; j < shards.length; j++) {
- shards[j] =
- new Shard(versionNumber, new Path(indexPath,
- NUMBER_FORMAT.format(j)).toString(), generation);
- }
- run(i + 1, shards);
- }
- }
-
- private void run(int numRuns, Shard[] shards) throws IOException {
- IIndexUpdater updater = new IndexUpdater();
- updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
- shards);
-
- // verify the done files
- Path[] doneFileNames = new Path[shards.length];
- int count = 0;
- FileStatus[] fileStatus = fs.listStatus(outputPath);
- for (int i = 0; i < fileStatus.length; i++) {
- FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
- for (int j = 0; j < doneFiles.length; j++) {
- doneFileNames[count++] = doneFiles[j].getPath();
- }
- }
- assertEquals(shards.length, count);
- for (int i = 0; i < count; i++) {
- assertTrue(doneFileNames[i].getName().startsWith(
- IndexUpdateReducer.DONE.toString()));
- }
-
- // verify the index
- IndexReader[] readers = new IndexReader[shards.length];
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- readers[i] = IndexReader.open(dir);
- }
-
- IndexReader reader = new MultiReader(readers);
- IndexSearcher searcher = new IndexSearcher(reader);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-
- assertEquals(numRuns * numDocsPerRun, hits.length());
-
- int[] counts = new int[numDocsPerRun];
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- counts[Integer.parseInt(doc.get("id"))]++;
- }
-
- for (int i = 0; i < numDocsPerRun; i++) {
- assertEquals(numRuns, counts[i]);
- }
-
- // max field length is 2, so "dot" is also indexed but not "org"
- hits = searcher.search(new TermQuery(new Term("content", "dot")));
- assertEquals(numRuns, hits.length());
-
- hits = searcher.search(new TermQuery(new Term("content", "org")));
- assertEquals(0, hits.length());
-
- searcher.close();
- reader.close();
-
- // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
- // to remove earlier checkpoints
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- IndexWriter writer =
- new IndexWriter(dir, false, null,
- new KeepOnlyLastCommitDeletionPolicy());
- writer.close();
- }
-
- // verify the number of segments, must be done after an writer with
- // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
- for (int i = 0; i < shards.length; i++) {
- PathFilter cfsFilter = new PathFilter() {
- public boolean accept(Path path) {
- return path.getName().endsWith(".cfs");
- }
- };
- FileStatus[] cfsFiles =
- fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
- assertEquals(1, cfsFiles.length);
- }
- }
-
-}
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ try {
+ dfsCluster =
+ new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+ fs = dfsCluster.getFileSystem();
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+ fs.copyFromLocalFile(localInputPath, inputPath);
+
+ if (fs.exists(outputPath)) {
+ // do not create, mapred will create
+ fs.delete(outputPath, true);
+ }
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ mrCluster =
+ new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+ } catch (IOException e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ throw e;
+ }
+
+ }
+
+ protected void tearDown() throws Exception {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void testIndexUpdater() throws IOException {
+ IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+ // max field length, compound file and number of segments will be checked
+ // later
+ iconf.setIndexMaxFieldLength(2);
+ iconf.setIndexUseCompoundFile(true);
+ iconf.setIndexMaxNumSegments(1);
+ iconf.setMaxRAMSizeInBytes(20480);
+
+ long versionNumber = -1;
+ long generation = -1;
+
+ for (int i = 0; i < numRuns; i++) {
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ Shard[] shards = new Shard[initNumShards + i];
+ for (int j = 0; j < shards.length; j++) {
+ shards[j] =
+ new Shard(versionNumber, new Path(indexPath,
+ NUMBER_FORMAT.format(j)).toString(), generation);
+ }
+ run(i + 1, shards);
+ }
+ }
+
+ private void run(int numRuns, Shard[] shards) throws IOException {
+ IIndexUpdater updater = new IndexUpdater();
+ updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+ shards);
+
+ // verify the done files
+ Path[] doneFileNames = new Path[shards.length];
+ int count = 0;
+ FileStatus[] fileStatus = fs.listStatus(outputPath);
+ for (int i = 0; i < fileStatus.length; i++) {
+ FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
+ for (int j = 0; j < doneFiles.length; j++) {
+ doneFileNames[count++] = doneFiles[j].getPath();
+ }
+ }
+ assertEquals(shards.length, count);
+ for (int i = 0; i < count; i++) {
+ assertTrue(doneFileNames[i].getName().startsWith(
+ IndexUpdateReducer.DONE.toString()));
+ }
+
+ // verify the index
+ IndexReader[] readers = new IndexReader[shards.length];
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ readers[i] = IndexReader.open(dir);
+ }
+
+ IndexReader reader = new MultiReader(readers);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+
+ assertEquals(numRuns * numDocsPerRun, hits.length());
+
+ int[] counts = new int[numDocsPerRun];
+ for (int i = 0; i < hits.length(); i++) {
+ Document doc = hits.doc(i);
+ counts[Integer.parseInt(doc.get("id"))]++;
+ }
+
+ for (int i = 0; i < numDocsPerRun; i++) {
+ assertEquals(numRuns, counts[i]);
+ }
+
+ // max field length is 2, so "dot" is also indexed but not "org"
+ hits = searcher.search(new TermQuery(new Term("content", "dot")));
+ assertEquals(numRuns, hits.length());
+
+ hits = searcher.search(new TermQuery(new Term("content", "org")));
+ assertEquals(0, hits.length());
+
+ searcher.close();
+ reader.close();
+
+ // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
+ // to remove earlier checkpoints
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ IndexWriter writer =
+ new IndexWriter(dir, false, null,
+ new KeepOnlyLastCommitDeletionPolicy());
+ writer.close();
+ }
+
+ // verify the number of segments, must be done after an writer with
+ // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
+ for (int i = 0; i < shards.length; i++) {
+ PathFilter cfsFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return path.getName().endsWith(".cfs");
+ }
+ };
+ FileStatus[] cfsFiles =
+ fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
+ assertEquals(1, cfsFiles.length);
+ }
+ }
+
+}
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/contrib/vaidya/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/contrib/vaidya:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/vaidya:r1360400-1399945
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/examples/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/examples:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/examples:r1360400-1399945
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/java:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/java:r1360400-1399945
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/mapred-default.xml?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/mapred-default.xml (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/java/mapred-default.xml Fri Oct 19 02:25:55 2012
@@ -701,7 +701,7 @@
<property>
<name>mapreduce.job.maxtaskfailures.per.tracker</name>
- <value>4</value>
+ <value>3</value>
<description>The number of task-failures on a tasktracker of a given job
after which new tasks of that job aren't assigned to it.
</description>
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred:r1360400-1399945
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/test/mapred:r1363593-1396941
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs:r1360400-1399945
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs:r1360400-1399945
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:r1360400-1399945
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc:r1363593-1396941
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/SortValidator.java Fri Oct 19 02:25:55 2012
@@ -33,7 +33,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.lib.HashPartitioner;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.*;
@@ -345,7 +344,8 @@ public class SortValidator extends Confi
FileInputFormat.setInputPaths(jobConf, sortInput);
FileInputFormat.addInputPath(jobConf, sortOutput);
- Path outputPath = new Path("/tmp/sortvalidate/recordstatschecker");
+ Path outputPath = new Path(new Path("/tmp",
+ "sortvalidate"), UUID.randomUUID().toString());
if (defaultfs.exists(outputPath)) {
defaultfs.delete(outputPath, true);
}
@@ -365,31 +365,44 @@ public class SortValidator extends Confi
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
- Date end_time = new Date();
- System.out.println("Job ended: " + end_time);
- System.out.println("The job took " +
- (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
-
- // Check to ensure that the statistics of the
- // framework's sort-input and sort-output match
- SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
- new Path(outputPath, "part-00000"), defaults);
- IntWritable k1 = new IntWritable();
- IntWritable k2 = new IntWritable();
- RecordStatsWritable v1 = new RecordStatsWritable();
- RecordStatsWritable v2 = new RecordStatsWritable();
- if (!stats.next(k1, v1)) {
- throw new IOException("Failed to read record #1 from reduce's output");
- }
- if (!stats.next(k2, v2)) {
- throw new IOException("Failed to read record #2 from reduce's output");
- }
-
- if ((v1.getBytes() != v2.getBytes()) || (v1.getRecords() != v2.getRecords()) ||
- v1.getChecksum() != v2.getChecksum()) {
- throw new IOException("(" +
- v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum() + ") v/s (" +
- v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum() + ")");
+ try {
+ Date end_time = new Date();
+ System.out.println("Job ended: " + end_time);
+ System.out.println("The job took " +
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+
+ // Check to ensure that the statistics of the
+ // framework's sort-input and sort-output match
+ SequenceFile.Reader stats = new SequenceFile.Reader(defaultfs,
+ new Path(outputPath, "part-00000"), defaults);
+ try {
+ IntWritable k1 = new IntWritable();
+ IntWritable k2 = new IntWritable();
+ RecordStatsWritable v1 = new RecordStatsWritable();
+ RecordStatsWritable v2 = new RecordStatsWritable();
+ if (!stats.next(k1, v1)) {
+ throw new IOException(
+ "Failed to read record #1 from reduce's output");
+ }
+ if (!stats.next(k2, v2)) {
+ throw new IOException(
+ "Failed to read record #2 from reduce's output");
+ }
+
+ if ((v1.getBytes() != v2.getBytes()) ||
+ (v1.getRecords() != v2.getRecords()) ||
+ v1.getChecksum() != v2.getChecksum()) {
+ throw new IOException("(" +
+ v1.getBytes() + ", " + v1.getRecords() + ", " + v1.getChecksum()
+ + ") v/s (" +
+ v2.getBytes() + ", " + v2.getRecords() + ", " + v2.getChecksum()
+ + ")");
+ }
+ } finally {
+ stats.close();
+ }
+ } finally {
+ defaultfs.delete(outputPath, true);
}
}
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueInformation.java Fri Oct 19 02:25:55 2012
@@ -98,7 +98,7 @@ public class TestJobQueueInformation ext
dfsCluster.shutdown();
}
- public void testJobQueues() throws IOException {
+ public void testJobQueues() throws Exception {
JobClient jc = new JobClient(mrCluster.createJobConf());
String expectedQueueInfo = "Maximum Tasks Per Job :: 10";
JobQueueInfo[] queueInfos = jc.getQueues();
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Oct 19 02:25:55 2012
@@ -149,7 +149,7 @@ public class TestSetupAndCleanupFailure
private void testSetupAndCleanupKill(MiniMRCluster mr,
MiniDFSCluster dfs,
boolean commandLineKill)
- throws IOException {
+ throws Exception {
// launch job with waiting setup/cleanup
RunningJob job = launchJobWithWaitingSetupAndCleanup(mr);
@@ -223,7 +223,7 @@ public class TestSetupAndCleanupFailure
// Also Tests the command-line kill for setup/cleanup attempts.
// tests the setup/cleanup attempts getting killed if
// they were running on a lost tracker
- public void testWithDFS() throws IOException {
+ public void testWithDFS() throws Exception {
MiniDFSCluster dfs = null;
MiniMRCluster mr = null;
FileSystem fileSys = null;
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=1399950&r1=1399949&r2=1399950&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Fri Oct 19 02:25:55 2012
@@ -449,7 +449,7 @@ public class UtilsForTests {
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
String mapSignalFile,
String reduceSignalFile, int replication)
- throws IOException {
+ throws Exception {
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile),
(short)replication);
writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile),
@@ -462,7 +462,7 @@ public class UtilsForTests {
static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys,
boolean isMap, String mapSignalFile,
String reduceSignalFile)
- throws IOException {
+ throws Exception {
// signal the maps to complete
writeFile(dfs.getNameNode(), fileSys.getConf(),
isMap
@@ -483,7 +483,7 @@ public class UtilsForTests {
}
static void writeFile(NameNode namenode, Configuration conf, Path name,
- short replication) throws IOException {
+ short replication) throws Exception {
FileSystem fileSys = FileSystem.get(conf);
SequenceFile.Writer writer =
SequenceFile.createWriter(fileSys, conf, name,
Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/src/webapps/job/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-3077/hadoop-mapreduce-project/src/webapps/job:r1363593-1396941
Merged /hadoop/common/trunk/hadoop-mapreduce-project/src/webapps/job:r1360400-1399945