You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2011/10/20 14:44:40 UTC

svn commit: r1186766 - in /nutch/branches/nutchgora: CHANGES.txt src/test/org/apache/nutch/storage/TestGoraStorage.java

Author: lewismc
Date: Thu Oct 20 12:44:39 2011
New Revision: 1186766

URL: http://svn.apache.org/viewvc?rev=1186766&view=rev
Log:
commit to address NUTCH-1135 and update to changes.txt

Modified:
    nutch/branches/nutchgora/CHANGES.txt
    nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java

Modified: nutch/branches/nutchgora/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1186766&r1=1186765&r2=1186766&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Thu Oct 20 12:44:39 2011
@@ -2,6 +2,8 @@ Nutch Change Log
 
 Release nutchgora - Current Development
 
+* NUTCH-1081 & 1135 ant tests fail & Fix TestGoraStorage for Nutchgora (Ferdy via lewismc)
+
 * NUTCH-1156 building errors with gora-hbase as a backend; update ivy.xml to use correct dependancies (Ferdy via lewismc)
 
 * NUTCH-1109 Add Sonar targets to Ant build.xml (lewismc)

Modified: nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java?rev=1186766&r1=1186765&r2=1186766&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java (original)
+++ nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java Thu Oct 20 12:44:39 2011
@@ -16,228 +16,195 @@
  ******************************************************************************/
 package org.apache.nutch.storage;
 
-import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.NutchConfiguration;
+import org.apache.commons.io.IOUtils;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
-
-import junit.framework.TestCase;
-
-public class TestGoraStorage extends TestCase {
-  Configuration conf;
-  
-  public void init() throws Exception {
-    conf = NutchConfiguration.create();
-  }
-  
-  public void setUp() throws Exception {
-    conf = NutchConfiguration.create();
-    DataStore<String,WebPage> store;
-    
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    store.deleteByQuery(store.newQuery());
-    store.close();
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.AbstractNutchTest;
+import org.apache.nutch.util.CrawlTestUtil;
+import org.hsqldb.Server;
+
+/**
+ * Tests basic Gora functionality by writing and reading webpages.
+ */
+public class TestGoraStorage extends AbstractNutchTest {
+
+  /**
+   * Sequentially read and write pages to a store.
+   * 
+   * @throws Exception
+   */
+  public void testSinglethreaded() throws Exception {
+    String id = "singlethread";
+    readWrite(id, webPageStore);
   }
-  
-  private class Worker extends Thread {
-    DataStore<String,WebPage> store;
+
+  private static void readWrite(String id, DataStore<String, WebPage> store) 
+      throws IOException {
     WebPage page = new WebPage();
-    int start, count, id;
-    int reopenMark, reopenCount = 0;
-    boolean reopens = false;
-    
-    public Worker(int id, int start, int count, boolean reopens) {
-      this.id = id;
-      this.start = start;
-      this.count = count;
-      reopenMark = new Random().nextInt(count / 4) + count / 4;
-      this.reopens = reopens;
-    }
-    
-    public void run() {
-      threadCount.incrementAndGet();
-      try {
-        store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-        for (int i = 0; i < count; i++) {
-          if (i > 0 && ((count % 10) == 0)) {
-            System.out.println(" -W" + id + "(" + i + "/" + count + ")");
-          }
-          if (reopens && (i > 0) && (i % reopenMark) == 0) {
-            System.out.println(" -W" + id + " reopen " + (++reopenCount));
-            store.flush();
-            store.close();
-            store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-          }
-          page.setTitle(new Utf8(String.valueOf(start + i)));
-          store.put(String.valueOf(start + i), page);
+    int max = 1000;
+    for (int i = 0; i < max; i++) {
+      // store a page with title
+      String key = "key-" + id + "-" + i;
+      String title = "title" + i;
+      page.setTitle(new Utf8(title));
+      store.put(key, page);
+      store.flush();
+
+      // retrieve page and check title
+      page = store.get(key);
+      assertNotNull(page);
+      assertEquals(title, page.getTitle().toString());
+    }
+
+    // scan over the rows
+    Result<String, WebPage> result = store.execute(store.newQuery());
+    int count = 0;
+    while (result.next()) {
+      // only count keys in the store for the current id
+      if (result.getKey().contains(id))
+        count++;
+    }
+    // check amount
+    assertEquals(max, count);
+  }
+
+  /**
+   * Tests multiple thread reading and writing to the same store, this should be
+   * no problem because {@link DataStore} implementations claim to be thread
+   * safe.
+   * 
+   * @throws Exception
+   */
+  public void testMultithreaded() throws Exception {
+    // create a fixed thread pool
+    int numThreads = 8;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+    // define a list of tasks
+    Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+    for (int i = 0; i < numThreads; i++) {
+      tasks.add(new Callable<Integer>() {
+        @Override
+        public Integer call() {
           try {
-            sleep(10);
-          } catch (Exception e) {};
+            // run a sequence
+            readWrite(Thread.currentThread().getName(), webPageStore);
+            // everything ok, return 0
+            return 0;
+          } catch (Exception e) {
+            e.printStackTrace();
+            // this will fail the test
+            return 1;
+          }
         }
-        store.flush();
-        store.close();
-      } catch (Exception e) {
-        fail(e.getMessage());
-      }
-      threadCount.decrementAndGet();
+      });
     }
-  }
-  
-  private AtomicInteger threadCount = new AtomicInteger(0);
-  
-  public void testMultithread() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
-    DataStore<String,WebPage> store;
-    
-    for (int i = 0; i < NUM; i++) {
-      Worker w = new Worker(i, i * COUNT, COUNT, true);
-      w.start();
-    }
-    while (threadCount.get() > 0) {
-      try {
-        Thread.sleep(5000);
-        System.out.println("-threads " + threadCount.get() + "/" + NUM);
-      } catch (Exception e) {};
-    }
-    System.out.println("Verifying...");
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    Result<String,WebPage> res = store.execute(store.newQuery());
-    int size = COUNT * NUM;
-    BitSet keys = new BitSet(size);
-    while (res.next()) {
-      String key = res.getKey();
-      WebPage p = res.get();
-      assertEquals(key, p.getTitle().toString());
-      int pos = Integer.parseInt(key);
-      assertTrue(pos < size && pos >= 0);
-      if (keys.get(pos)) {
-        fail("key " + key + " already set!");
-      }
-      keys.set(pos);
+
+    // submit them at once
+    List<Future<Integer>> results = pool.invokeAll(tasks);
+
+    // check results
+    for (Future<Integer> result : results) {
+      assertEquals(0, (int) result.get());
     }
-    assertEquals(size, keys.cardinality());
   }
   
+  /**
+   * Tests multiple processes reading and writing to the same store backend, 
+   * this is to simulate a multi process Nutch environment (i.e. MapReduce).
+   * 
+   * @throws Exception
+   */
   public void testMultiProcess() throws Exception {
-    int COUNT = 1000;
-    int NUM = 100;
-    DataStore<String,WebPage> store;
-    List<Process> procs = new ArrayList<Process>();
-    
-    for (int i = 0; i < NUM; i++) {
-      Process p = launch(i, i * COUNT, COUNT);
-      procs.add(p);
-    }
-    
-    while (procs.size() > 0) {
-      try {
-        Thread.sleep(5000);
-      } catch (Exception e) {};
-      Iterator<Process> it = procs.iterator();
-      while (it.hasNext()) {
-        Process p = it.next();
-        int code = 1;
-        try {
-          code = p.exitValue();
-          assertEquals(0, code);
-          it.remove();
-          p.destroy();
-        } catch (IllegalThreadStateException e) {
-          // not ready yet
+    // create and start a hsql server, a stand-alone (memory backed) db
+    // (important: a stand-alone server should be used because simple
+    //  file based access i.e. jdbc:hsqldb:file is NOT process-safe.)
+    Server server = new Server();
+    server.setDaemon(true);
+    server.setSilent(true); // disables LOTS of trace
+    final String className = getClass().getName();
+    server.setDatabasePath(0, "mem:" + className);
+    server.setDatabaseName(0, className);
+    server.start();
+    
+    // create a fixed thread pool
+    int numThreads = 4;
+    ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+    
+    // spawn multiple processes, each thread spawns own process
+    Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+    for (int i = 0; i < numThreads; i++) {
+      tasks.add(new Callable<Integer>() {
+        @Override
+        public Integer call() {
+          try {
+            String separator = System.getProperty("file.separator");
+            String classpath = System.getProperty("java.class.path");
+            String path = System.getProperty("java.home") + separator + "bin"
+                + separator + "java";
+            ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp", 
+                classpath, className);
+            processBuilder.redirectErrorStream(true);
+            Process process = processBuilder.start();
+            InputStream in = process.getInputStream();
+            int exit = process.waitFor();
+            //print the output of the process
+            System.out.println("===Process stream for " + Thread.currentThread() 
+                + "\n" + IOUtils.toString(in) + "===End of process stream.");
+            in.close();
+            // process should exit with zero code
+            return exit;
+          } catch (Exception e) {
+            e.printStackTrace();
+            // this will fail the test
+            return 1;
+          }
         }
-      }
-      System.out.println("* running " + procs.size() + "/" + NUM);
+      });
     }
-    System.out.println("Verifying...");
-    store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
-    Result<String,WebPage> res = store.execute(store.newQuery());
-    int size = COUNT * NUM;
-    BitSet keys = new BitSet(size);
-    while (res.next()) {
-      String key = res.getKey();
-      WebPage p = res.get();
-      assertEquals(key, p.getTitle().toString());
-      int pos = Integer.parseInt(key);
-      assertTrue(pos < size && pos >= 0);
-      if (keys.get(pos)) {
-        fail("key " + key + " already set!");
-      }
-      keys.set(pos);
-    }
-    if (size != keys.cardinality()) {
-      System.out.println("ERROR Missing keys:");
-      for (int i = 0; i < size; i++) {
-        if (keys.get(i)) continue;
-        System.out.println(" " + i);
-      }
-      fail("key count should be " + size + " but is " + keys.cardinality());
+
+    // submit them at once
+    List<Future<Integer>> results = pool.invokeAll(tasks);
+
+    // check results
+    for (Future<Integer> result : results) {
+      assertEquals(0, (int) result.get());
     }
-  }
-  
-  private Process launch(int id, int start, int count) throws Exception {
-    //  Build exec child jmv args.
-    Vector<String> vargs = new Vector<String>(8);
-    File jvm =                                  // use same jvm as parent
-      new File(new File(System.getProperty("java.home"), "bin"), "java");
-
-    vargs.add(jvm.toString());
-
-    // Add child (task) java-vm options.
-    // tmp dir
-    String prop = System.getProperty("java.io.tmpdir");
-    vargs.add("-Djava.io.tmpdir=" + prop);
-    // library path
-    prop = System.getProperty("java.library.path");
-    if (prop != null) {
-      vargs.add("-Djava.library.path=" + prop);      
-    }
-    // working dir
-    prop = System.getProperty("user.dir");
-    vargs.add("-Duser.dir=" + prop);    
-    // combat the stupid Xerces issue
-    vargs.add("-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
-    // prepare classpath
-    String sep = System.getProperty("path.separator");
-    StringBuffer classPath = new StringBuffer();
-    // start with same classpath as parent process
-    classPath.append(System.getProperty("java.class.path"));
-    //classPath.append(sep);
-    // Add classpath.
-    vargs.add("-classpath");
-    vargs.add(classPath.toString());
     
-    // append class name and args
-    vargs.add(TestGoraStorage.class.getName());
-    vargs.add(String.valueOf(id));
-    vargs.add(String.valueOf(start));
-    vargs.add(String.valueOf(count));
-    ProcessBuilder builder = new ProcessBuilder(vargs);
-    return builder.start();
+    //stop db
+    server.stop();
   }
-  
+
   public static void main(String[] args) throws Exception {
-    if (args.length < 3) {
-      System.err.println("Usage: TestGoraStore <id> <startKey> <numRecords>");
-      System.exit(-1);
-    }
-    TestGoraStorage test = new TestGoraStorage();
-    test.init();
-    int id = Integer.parseInt(args[0]);
-    int start = Integer.parseInt(args[1]);
-    int count = Integer.parseInt(args[2]);
-    Worker w = test.new Worker(id, start, count, true);
-    w.run();
-    System.exit(0);
+    // entry point for the multiprocess test
+    System.out.println("Starting!");
+
+    Configuration localConf = CrawlTestUtil.createConfiguration();
+    localConf.set("storage.data.store.class", "org.apache.gora.sql.store.SqlStore");
+
+    //connect to local sql service
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url",
+        "jdbc:hsqldb:hsql://localhost/"+TestGoraStorage.class.getName());
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
+    DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
+
+    DataStore<String, WebPage> store = StorageUtils.createWebStore(localConf,
+        String.class, WebPage.class);
+    readWrite("single_id", store);
+    System.out.println("Done.");
   }
 }