You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2012/10/12 06:35:46 UTC
svn commit: r1397432 [5/5] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/
hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/ h...
Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java?rev=1397432&r1=1397431&r2=1397432&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java Fri Oct 12 04:35:42 2012
@@ -1,258 +1,258 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestIndexUpdater extends TestCase {
-
- private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
- static {
- NUMBER_FORMAT.setMinimumIntegerDigits(5);
- NUMBER_FORMAT.setGroupingUsed(false);
- }
-
- // however, "we only allow 0 or 1 reducer in local mode" - from
- // LocalJobRunner
- private Configuration conf;
- private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
- private Path inputPath = new Path("/myexample/data.txt");
- private Path outputPath = new Path("/myoutput");
- private Path indexPath = new Path("/myindex");
- private int initNumShards = 3;
- private int numMapTasks = 5;
-
- private int numDataNodes = 3;
- private int numTaskTrackers = 3;
-
- private int numRuns = 3;
- private int numDocsPerRun = 10; // num of docs in local input path
-
- private FileSystem fs;
- private MiniDFSCluster dfsCluster;
- private MiniMRCluster mrCluster;
-
- public TestIndexUpdater() throws IOException {
- super();
- if (System.getProperty("hadoop.log.dir") == null) {
- String base = new File(".").getPath(); // getAbsolutePath();
- System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
- }
- conf = new Configuration();
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestIndexUpdater extends TestCase {
+
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ // however, "we only allow 0 or 1 reducer in local mode" - from
+ // LocalJobRunner
+ private Configuration conf;
+ private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+ private Path inputPath = new Path("/myexample/data.txt");
+ private Path outputPath = new Path("/myoutput");
+ private Path indexPath = new Path("/myindex");
+ private int initNumShards = 3;
+ private int numMapTasks = 5;
+
+ private int numDataNodes = 3;
+ private int numTaskTrackers = 3;
+
+ private int numRuns = 3;
+ private int numDocsPerRun = 10; // num of docs in local input path
+
+ private FileSystem fs;
+ private MiniDFSCluster dfsCluster;
+ private MiniMRCluster mrCluster;
+
+ public TestIndexUpdater() throws IOException {
+ super();
+ if (System.getProperty("hadoop.log.dir") == null) {
+ String base = new File(".").getPath(); // getAbsolutePath();
+ System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+ }
+ conf = new Configuration();
//See MAPREDUCE-947 for more details. Setting to false prevents the creation of _SUCCESS.
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
- }
-
- protected void setUp() throws Exception {
- super.setUp();
- try {
- dfsCluster =
- new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
- fs = dfsCluster.getFileSystem();
- if (fs.exists(inputPath)) {
- fs.delete(inputPath, true);
- }
- fs.copyFromLocalFile(localInputPath, inputPath);
-
- if (fs.exists(outputPath)) {
- // do not create, mapred will create
- fs.delete(outputPath, true);
- }
-
- if (fs.exists(indexPath)) {
- fs.delete(indexPath, true);
- }
-
- mrCluster =
- new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
- } catch (IOException e) {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- throw e;
- }
-
- }
-
- protected void tearDown() throws Exception {
- if (dfsCluster != null) {
- dfsCluster.shutdown();
- dfsCluster = null;
- }
-
- if (fs != null) {
- fs.close();
- fs = null;
- }
-
- if (mrCluster != null) {
- mrCluster.shutdown();
- mrCluster = null;
- }
-
- super.tearDown();
- }
-
- public void testIndexUpdater() throws IOException {
- IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
- // max field length, compound file and number of segments will be checked
- // later
- iconf.setIndexMaxFieldLength(2);
- iconf.setIndexUseCompoundFile(true);
- iconf.setIndexMaxNumSegments(1);
- iconf.setMaxRAMSizeInBytes(20480);
-
- long versionNumber = -1;
- long generation = -1;
-
- for (int i = 0; i < numRuns; i++) {
- if (fs.exists(outputPath)) {
- fs.delete(outputPath, true);
- }
-
- Shard[] shards = new Shard[initNumShards + i];
- for (int j = 0; j < shards.length; j++) {
- shards[j] =
- new Shard(versionNumber, new Path(indexPath,
- NUMBER_FORMAT.format(j)).toString(), generation);
- }
- run(i + 1, shards);
- }
- }
-
- private void run(int numRuns, Shard[] shards) throws IOException {
- IIndexUpdater updater = new IndexUpdater();
- updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
- shards);
-
- // verify the done files
- Path[] doneFileNames = new Path[shards.length];
- int count = 0;
- FileStatus[] fileStatus = fs.listStatus(outputPath);
- for (int i = 0; i < fileStatus.length; i++) {
- FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
- for (int j = 0; j < doneFiles.length; j++) {
- doneFileNames[count++] = doneFiles[j].getPath();
- }
- }
- assertEquals(shards.length, count);
- for (int i = 0; i < count; i++) {
- assertTrue(doneFileNames[i].getName().startsWith(
- IndexUpdateReducer.DONE.toString()));
- }
-
- // verify the index
- IndexReader[] readers = new IndexReader[shards.length];
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- readers[i] = IndexReader.open(dir);
- }
-
- IndexReader reader = new MultiReader(readers);
- IndexSearcher searcher = new IndexSearcher(reader);
- Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-
- assertEquals(numRuns * numDocsPerRun, hits.length());
-
- int[] counts = new int[numDocsPerRun];
- for (int i = 0; i < hits.length(); i++) {
- Document doc = hits.doc(i);
- counts[Integer.parseInt(doc.get("id"))]++;
- }
-
- for (int i = 0; i < numDocsPerRun; i++) {
- assertEquals(numRuns, counts[i]);
- }
-
- // max field length is 2, so "dot" is also indexed but not "org"
- hits = searcher.search(new TermQuery(new Term("content", "dot")));
- assertEquals(numRuns, hits.length());
-
- hits = searcher.search(new TermQuery(new Term("content", "org")));
- assertEquals(0, hits.length());
-
- searcher.close();
- reader.close();
-
- // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
- // to remove earlier checkpoints
- for (int i = 0; i < shards.length; i++) {
- Directory dir =
- new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
- false, conf);
- IndexWriter writer =
- new IndexWriter(dir, false, null,
- new KeepOnlyLastCommitDeletionPolicy());
- writer.close();
- }
-
- // verify the number of segments, must be done after an writer with
- // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
- for (int i = 0; i < shards.length; i++) {
- PathFilter cfsFilter = new PathFilter() {
- public boolean accept(Path path) {
- return path.getName().endsWith(".cfs");
- }
- };
- FileStatus[] cfsFiles =
- fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
- assertEquals(1, cfsFiles.length);
- }
- }
-
-}
+ }
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ try {
+ dfsCluster =
+ new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+ fs = dfsCluster.getFileSystem();
+ if (fs.exists(inputPath)) {
+ fs.delete(inputPath, true);
+ }
+ fs.copyFromLocalFile(localInputPath, inputPath);
+
+ if (fs.exists(outputPath)) {
+ // do not create, mapred will create
+ fs.delete(outputPath, true);
+ }
+
+ if (fs.exists(indexPath)) {
+ fs.delete(indexPath, true);
+ }
+
+ mrCluster =
+ new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+ } catch (IOException e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ throw e;
+ }
+
+ }
+
+ protected void tearDown() throws Exception {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+
+ if (fs != null) {
+ fs.close();
+ fs = null;
+ }
+
+ if (mrCluster != null) {
+ mrCluster.shutdown();
+ mrCluster = null;
+ }
+
+ super.tearDown();
+ }
+
+ public void testIndexUpdater() throws IOException {
+ IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+ // max field length, compound file and number of segments will be checked
+ // later
+ iconf.setIndexMaxFieldLength(2);
+ iconf.setIndexUseCompoundFile(true);
+ iconf.setIndexMaxNumSegments(1);
+ iconf.setMaxRAMSizeInBytes(20480);
+
+ long versionNumber = -1;
+ long generation = -1;
+
+ for (int i = 0; i < numRuns; i++) {
+ if (fs.exists(outputPath)) {
+ fs.delete(outputPath, true);
+ }
+
+ Shard[] shards = new Shard[initNumShards + i];
+ for (int j = 0; j < shards.length; j++) {
+ shards[j] =
+ new Shard(versionNumber, new Path(indexPath,
+ NUMBER_FORMAT.format(j)).toString(), generation);
+ }
+ run(i + 1, shards);
+ }
+ }
+
+ private void run(int numRuns, Shard[] shards) throws IOException {
+ IIndexUpdater updater = new IndexUpdater();
+ updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+ shards);
+
+ // verify the done files
+ Path[] doneFileNames = new Path[shards.length];
+ int count = 0;
+ FileStatus[] fileStatus = fs.listStatus(outputPath);
+ for (int i = 0; i < fileStatus.length; i++) {
+ FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
+ for (int j = 0; j < doneFiles.length; j++) {
+ doneFileNames[count++] = doneFiles[j].getPath();
+ }
+ }
+ assertEquals(shards.length, count);
+ for (int i = 0; i < count; i++) {
+ assertTrue(doneFileNames[i].getName().startsWith(
+ IndexUpdateReducer.DONE.toString()));
+ }
+
+ // verify the index
+ IndexReader[] readers = new IndexReader[shards.length];
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ readers[i] = IndexReader.open(dir);
+ }
+
+ IndexReader reader = new MultiReader(readers);
+ IndexSearcher searcher = new IndexSearcher(reader);
+ Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+
+ assertEquals(numRuns * numDocsPerRun, hits.length());
+
+ int[] counts = new int[numDocsPerRun];
+ for (int i = 0; i < hits.length(); i++) {
+ Document doc = hits.doc(i);
+ counts[Integer.parseInt(doc.get("id"))]++;
+ }
+
+ for (int i = 0; i < numDocsPerRun; i++) {
+ assertEquals(numRuns, counts[i]);
+ }
+
+ // max field length is 2, so "dot" is also indexed but not "org"
+ hits = searcher.search(new TermQuery(new Term("content", "dot")));
+ assertEquals(numRuns, hits.length());
+
+ hits = searcher.search(new TermQuery(new Term("content", "org")));
+ assertEquals(0, hits.length());
+
+ searcher.close();
+ reader.close();
+
+ // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
+ // to remove earlier checkpoints
+ for (int i = 0; i < shards.length; i++) {
+ Directory dir =
+ new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+ false, conf);
+ IndexWriter writer =
+ new IndexWriter(dir, false, null,
+ new KeepOnlyLastCommitDeletionPolicy());
+ writer.close();
+ }
+
+ // verify the number of segments, must be done after an writer with
+ // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
+ for (int i = 0; i < shards.length; i++) {
+ PathFilter cfsFilter = new PathFilter() {
+ public boolean accept(Path path) {
+ return path.getName().endsWith(".cfs");
+ }
+ };
+ FileStatus[] cfsFiles =
+ fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
+ assertEquals(1, cfsFiles.length);
+ }
+ }
+
+}