You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by su...@apache.org on 2012/10/12 06:35:46 UTC

svn commit: r1397432 [5/5] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/ hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/ h...

Modified: hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java?rev=1397432&r1=1397431&r2=1397432&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/src/contrib/index/src/test/org/apache/hadoop/contrib/index/mapred/TestIndexUpdater.java Fri Oct 12 04:35:42 2012
@@ -1,258 +1,258 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.contrib.index.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.NumberFormat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.contrib.index.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.index.IndexReader;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
-import org.apache.lucene.index.MultiReader;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.Hits;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.Directory;
-
-import junit.framework.TestCase;
-
-public class TestIndexUpdater extends TestCase {
-
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  // however, "we only allow 0 or 1 reducer in local mode" - from
-  // LocalJobRunner
-  private Configuration conf;
-  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
-  private Path inputPath = new Path("/myexample/data.txt");
-  private Path outputPath = new Path("/myoutput");
-  private Path indexPath = new Path("/myindex");
-  private int initNumShards = 3;
-  private int numMapTasks = 5;
-
-  private int numDataNodes = 3;
-  private int numTaskTrackers = 3;
-
-  private int numRuns = 3;
-  private int numDocsPerRun = 10; // num of docs in local input path
-
-  private FileSystem fs;
-  private MiniDFSCluster dfsCluster;
-  private MiniMRCluster mrCluster;
-
-  public TestIndexUpdater() throws IOException {
-    super();
-    if (System.getProperty("hadoop.log.dir") == null) {
-      String base = new File(".").getPath(); // getAbsolutePath();
-      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
-    }
-    conf = new Configuration();
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexReader;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
+import org.apache.lucene.index.MultiReader;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.Hits;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.Directory;
+
+import junit.framework.TestCase;
+
+public class TestIndexUpdater extends TestCase {
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  // however, "we only allow 0 or 1 reducer in local mode" - from
+  // LocalJobRunner
+  private Configuration conf;
+  private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
+  private Path inputPath = new Path("/myexample/data.txt");
+  private Path outputPath = new Path("/myoutput");
+  private Path indexPath = new Path("/myindex");
+  private int initNumShards = 3;
+  private int numMapTasks = 5;
+
+  private int numDataNodes = 3;
+  private int numTaskTrackers = 3;
+
+  private int numRuns = 3;
+  private int numDocsPerRun = 10; // num of docs in local input path
+
+  private FileSystem fs;
+  private MiniDFSCluster dfsCluster;
+  private MiniMRCluster mrCluster;
+
+  public TestIndexUpdater() throws IOException {
+    super();
+    if (System.getProperty("hadoop.log.dir") == null) {
+      String base = new File(".").getPath(); // getAbsolutePath();
+      System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
+    }
+    conf = new Configuration();
     //See MAPREDUCE-947 for more details. Setting to false prevents the creation of _SUCCESS.
     conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
-  }
-
-  protected void setUp() throws Exception {
-    super.setUp();
-    try {
-      dfsCluster =
-          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
-
-      fs = dfsCluster.getFileSystem();
-      if (fs.exists(inputPath)) {
-        fs.delete(inputPath, true);
-      }
-      fs.copyFromLocalFile(localInputPath, inputPath);
-
-      if (fs.exists(outputPath)) {
-        // do not create, mapred will create
-        fs.delete(outputPath, true);
-      }
-
-      if (fs.exists(indexPath)) {
-        fs.delete(indexPath, true);
-      }
-
-      mrCluster =
-          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
-
-    } catch (IOException e) {
-      if (dfsCluster != null) {
-        dfsCluster.shutdown();
-        dfsCluster = null;
-      }
-
-      if (fs != null) {
-        fs.close();
-        fs = null;
-      }
-
-      if (mrCluster != null) {
-        mrCluster.shutdown();
-        mrCluster = null;
-      }
-
-      throw e;
-    }
-
-  }
-
-  protected void tearDown() throws Exception {
-    if (dfsCluster != null) {
-      dfsCluster.shutdown();
-      dfsCluster = null;
-    }
-
-    if (fs != null) {
-      fs.close();
-      fs = null;
-    }
-
-    if (mrCluster != null) {
-      mrCluster.shutdown();
-      mrCluster = null;
-    }
-
-    super.tearDown();
-  }
-
-  public void testIndexUpdater() throws IOException {
-    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
-    // max field length, compound file and number of segments will be checked
-    // later
-    iconf.setIndexMaxFieldLength(2);
-    iconf.setIndexUseCompoundFile(true);
-    iconf.setIndexMaxNumSegments(1);
-    iconf.setMaxRAMSizeInBytes(20480);
-
-    long versionNumber = -1;
-    long generation = -1;
-
-    for (int i = 0; i < numRuns; i++) {
-      if (fs.exists(outputPath)) {
-        fs.delete(outputPath, true);
-      }
-
-      Shard[] shards = new Shard[initNumShards + i];
-      for (int j = 0; j < shards.length; j++) {
-        shards[j] =
-            new Shard(versionNumber, new Path(indexPath,
-                NUMBER_FORMAT.format(j)).toString(), generation);
-      }
-      run(i + 1, shards);
-    }
-  }
-
-  private void run(int numRuns, Shard[] shards) throws IOException {
-    IIndexUpdater updater = new IndexUpdater();
-    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
-        shards);
-
-    // verify the done files
-    Path[] doneFileNames = new Path[shards.length];
-    int count = 0;
-    FileStatus[] fileStatus = fs.listStatus(outputPath);
-    for (int i = 0; i < fileStatus.length; i++) {
-      FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
-      for (int j = 0; j < doneFiles.length; j++) {
-        doneFileNames[count++] = doneFiles[j].getPath();
-      }
-    }
-    assertEquals(shards.length, count);
-    for (int i = 0; i < count; i++) {
-      assertTrue(doneFileNames[i].getName().startsWith(
-          IndexUpdateReducer.DONE.toString()));
-    }
-
-    // verify the index
-    IndexReader[] readers = new IndexReader[shards.length];
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      readers[i] = IndexReader.open(dir);
-    }
-
-    IndexReader reader = new MultiReader(readers);
-    IndexSearcher searcher = new IndexSearcher(reader);
-    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
-
-    assertEquals(numRuns * numDocsPerRun, hits.length());
-
-    int[] counts = new int[numDocsPerRun];
-    for (int i = 0; i < hits.length(); i++) {
-      Document doc = hits.doc(i);
-      counts[Integer.parseInt(doc.get("id"))]++;
-    }
-
-    for (int i = 0; i < numDocsPerRun; i++) {
-      assertEquals(numRuns, counts[i]);
-    }
-
-    // max field length is 2, so "dot" is also indexed but not "org"
-    hits = searcher.search(new TermQuery(new Term("content", "dot")));
-    assertEquals(numRuns, hits.length());
-
-    hits = searcher.search(new TermQuery(new Term("content", "org")));
-    assertEquals(0, hits.length());
-
-    searcher.close();
-    reader.close();
-
-    // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
-    // to remove earlier checkpoints
-    for (int i = 0; i < shards.length; i++) {
-      Directory dir =
-          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
-              false, conf);
-      IndexWriter writer =
-          new IndexWriter(dir, false, null,
-              new KeepOnlyLastCommitDeletionPolicy());
-      writer.close();
-    }
-
-    // verify the number of segments, must be done after an writer with
-    // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
-    for (int i = 0; i < shards.length; i++) {
-      PathFilter cfsFilter = new PathFilter() {
-        public boolean accept(Path path) {
-          return path.getName().endsWith(".cfs");
-        }
-      };
-      FileStatus[] cfsFiles =
-          fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
-      assertEquals(1, cfsFiles.length);
-    }
-  }
-
-}
+  }
+
+  protected void setUp() throws Exception {
+    super.setUp();
+    try {
+      dfsCluster =
+          new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
+
+      fs = dfsCluster.getFileSystem();
+      if (fs.exists(inputPath)) {
+        fs.delete(inputPath, true);
+      }
+      fs.copyFromLocalFile(localInputPath, inputPath);
+
+      if (fs.exists(outputPath)) {
+        // do not create, mapred will create
+        fs.delete(outputPath, true);
+      }
+
+      if (fs.exists(indexPath)) {
+        fs.delete(indexPath, true);
+      }
+
+      mrCluster =
+          new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
+
+    } catch (IOException e) {
+      if (dfsCluster != null) {
+        dfsCluster.shutdown();
+        dfsCluster = null;
+      }
+
+      if (fs != null) {
+        fs.close();
+        fs = null;
+      }
+
+      if (mrCluster != null) {
+        mrCluster.shutdown();
+        mrCluster = null;
+      }
+
+      throw e;
+    }
+
+  }
+
+  protected void tearDown() throws Exception {
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+
+    if (fs != null) {
+      fs.close();
+      fs = null;
+    }
+
+    if (mrCluster != null) {
+      mrCluster.shutdown();
+      mrCluster = null;
+    }
+
+    super.tearDown();
+  }
+
+  public void testIndexUpdater() throws IOException {
+    IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
+    // max field length, compound file and number of segments will be checked
+    // later
+    iconf.setIndexMaxFieldLength(2);
+    iconf.setIndexUseCompoundFile(true);
+    iconf.setIndexMaxNumSegments(1);
+    iconf.setMaxRAMSizeInBytes(20480);
+
+    long versionNumber = -1;
+    long generation = -1;
+
+    for (int i = 0; i < numRuns; i++) {
+      if (fs.exists(outputPath)) {
+        fs.delete(outputPath, true);
+      }
+
+      Shard[] shards = new Shard[initNumShards + i];
+      for (int j = 0; j < shards.length; j++) {
+        shards[j] =
+            new Shard(versionNumber, new Path(indexPath,
+                NUMBER_FORMAT.format(j)).toString(), generation);
+      }
+      run(i + 1, shards);
+    }
+  }
+
+  private void run(int numRuns, Shard[] shards) throws IOException {
+    IIndexUpdater updater = new IndexUpdater();
+    updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
+        shards);
+
+    // verify the done files
+    Path[] doneFileNames = new Path[shards.length];
+    int count = 0;
+    FileStatus[] fileStatus = fs.listStatus(outputPath);
+    for (int i = 0; i < fileStatus.length; i++) {
+      FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
+      for (int j = 0; j < doneFiles.length; j++) {
+        doneFileNames[count++] = doneFiles[j].getPath();
+      }
+    }
+    assertEquals(shards.length, count);
+    for (int i = 0; i < count; i++) {
+      assertTrue(doneFileNames[i].getName().startsWith(
+          IndexUpdateReducer.DONE.toString()));
+    }
+
+    // verify the index
+    IndexReader[] readers = new IndexReader[shards.length];
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      readers[i] = IndexReader.open(dir);
+    }
+
+    IndexReader reader = new MultiReader(readers);
+    IndexSearcher searcher = new IndexSearcher(reader);
+    Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
+
+    assertEquals(numRuns * numDocsPerRun, hits.length());
+
+    int[] counts = new int[numDocsPerRun];
+    for (int i = 0; i < hits.length(); i++) {
+      Document doc = hits.doc(i);
+      counts[Integer.parseInt(doc.get("id"))]++;
+    }
+
+    for (int i = 0; i < numDocsPerRun; i++) {
+      assertEquals(numRuns, counts[i]);
+    }
+
+    // max field length is 2, so "dot" is also indexed but not "org"
+    hits = searcher.search(new TermQuery(new Term("content", "dot")));
+    assertEquals(numRuns, hits.length());
+
+    hits = searcher.search(new TermQuery(new Term("content", "org")));
+    assertEquals(0, hits.length());
+
+    searcher.close();
+    reader.close();
+
+    // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
+    // to remove earlier checkpoints
+    for (int i = 0; i < shards.length; i++) {
+      Directory dir =
+          new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
+              false, conf);
+      IndexWriter writer =
+          new IndexWriter(dir, false, null,
+              new KeepOnlyLastCommitDeletionPolicy());
+      writer.close();
+    }
+
+    // verify the number of segments, must be done after an writer with
+    // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
+    for (int i = 0; i < shards.length; i++) {
+      PathFilter cfsFilter = new PathFilter() {
+        public boolean accept(Path path) {
+          return path.getName().endsWith(".cfs");
+        }
+      };
+      FileStatus[] cfsFiles =
+          fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
+      assertEquals(1, cfsFiles.length);
+    }
+  }
+
+}