You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2011/10/20 14:44:40 UTC
svn commit: r1186766 - in /nutch/branches/nutchgora: CHANGES.txt
src/test/org/apache/nutch/storage/TestGoraStorage.java
Author: lewismc
Date: Thu Oct 20 12:44:39 2011
New Revision: 1186766
URL: http://svn.apache.org/viewvc?rev=1186766&view=rev
Log:
commit to address NUTCH-1135 and update to changes.txt
Modified:
nutch/branches/nutchgora/CHANGES.txt
nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java
Modified: nutch/branches/nutchgora/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/CHANGES.txt?rev=1186766&r1=1186765&r2=1186766&view=diff
==============================================================================
--- nutch/branches/nutchgora/CHANGES.txt (original)
+++ nutch/branches/nutchgora/CHANGES.txt Thu Oct 20 12:44:39 2011
@@ -2,6 +2,8 @@ Nutch Change Log
Release nutchgora - Current Development
+* NUTCH-1081 & 1135 ant tests fail & Fix TestGoraStorage for Nutchgora (Ferdy via lewismc)
+
* NUTCH-1156 building errors with gora-hbase as a backend; update ivy.xml to use correct dependancies (Ferdy via lewismc)
* NUTCH-1109 Add Sonar targets to Ant build.xml (lewismc)
Modified: nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java
URL: http://svn.apache.org/viewvc/nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java?rev=1186766&r1=1186765&r2=1186766&view=diff
==============================================================================
--- nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java (original)
+++ nutch/branches/nutchgora/src/test/org/apache/nutch/storage/TestGoraStorage.java Thu Oct 20 12:44:39 2011
@@ -16,228 +16,195 @@
******************************************************************************/
package org.apache.nutch.storage;
-import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.List;
-import java.util.Random;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.nutch.util.NutchConfiguration;
+import org.apache.commons.io.IOUtils;
import org.apache.gora.query.Result;
import org.apache.gora.store.DataStore;
-
-import junit.framework.TestCase;
-
-public class TestGoraStorage extends TestCase {
- Configuration conf;
-
- public void init() throws Exception {
- conf = NutchConfiguration.create();
- }
-
- public void setUp() throws Exception {
- conf = NutchConfiguration.create();
- DataStore<String,WebPage> store;
-
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
- store.deleteByQuery(store.newQuery());
- store.close();
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.AbstractNutchTest;
+import org.apache.nutch.util.CrawlTestUtil;
+import org.hsqldb.Server;
+
+/**
+ * Tests basic Gora functionality by writing and reading webpages.
+ */
+public class TestGoraStorage extends AbstractNutchTest {
+
+ /**
+ * Sequentially read and write pages to a store.
+ *
+ * @throws Exception
+ */
+ public void testSinglethreaded() throws Exception {
+ String id = "singlethread";
+ readWrite(id, webPageStore);
}
-
- private class Worker extends Thread {
- DataStore<String,WebPage> store;
+
+ private static void readWrite(String id, DataStore<String, WebPage> store)
+ throws IOException {
WebPage page = new WebPage();
- int start, count, id;
- int reopenMark, reopenCount = 0;
- boolean reopens = false;
-
- public Worker(int id, int start, int count, boolean reopens) {
- this.id = id;
- this.start = start;
- this.count = count;
- reopenMark = new Random().nextInt(count / 4) + count / 4;
- this.reopens = reopens;
- }
-
- public void run() {
- threadCount.incrementAndGet();
- try {
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
- for (int i = 0; i < count; i++) {
- if (i > 0 && ((count % 10) == 0)) {
- System.out.println(" -W" + id + "(" + i + "/" + count + ")");
- }
- if (reopens && (i > 0) && (i % reopenMark) == 0) {
- System.out.println(" -W" + id + " reopen " + (++reopenCount));
- store.flush();
- store.close();
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
- }
- page.setTitle(new Utf8(String.valueOf(start + i)));
- store.put(String.valueOf(start + i), page);
+ int max = 1000;
+ for (int i = 0; i < max; i++) {
+ // store a page with title
+ String key = "key-" + id + "-" + i;
+ String title = "title" + i;
+ page.setTitle(new Utf8(title));
+ store.put(key, page);
+ store.flush();
+
+ // retrieve page and check title
+ page = store.get(key);
+ assertNotNull(page);
+ assertEquals(title, page.getTitle().toString());
+ }
+
+ // scan over the rows
+ Result<String, WebPage> result = store.execute(store.newQuery());
+ int count = 0;
+ while (result.next()) {
+ // only count keys in the store for the current id
+ if (result.getKey().contains(id))
+ count++;
+ }
+ // check amount
+ assertEquals(max, count);
+ }
+
+ /**
+ * Tests multiple thread reading and writing to the same store, this should be
+ * no problem because {@link DataStore} implementations claim to be thread
+ * safe.
+ *
+ * @throws Exception
+ */
+ public void testMultithreaded() throws Exception {
+ // create a fixed thread pool
+ int numThreads = 8;
+ ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+ // define a list of tasks
+ Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+ for (int i = 0; i < numThreads; i++) {
+ tasks.add(new Callable<Integer>() {
+ @Override
+ public Integer call() {
try {
- sleep(10);
- } catch (Exception e) {};
+ // run a sequence
+ readWrite(Thread.currentThread().getName(), webPageStore);
+ // everything ok, return 0
+ return 0;
+ } catch (Exception e) {
+ e.printStackTrace();
+ // this will fail the test
+ return 1;
+ }
}
- store.flush();
- store.close();
- } catch (Exception e) {
- fail(e.getMessage());
- }
- threadCount.decrementAndGet();
+ });
}
- }
-
- private AtomicInteger threadCount = new AtomicInteger(0);
-
- public void testMultithread() throws Exception {
- int COUNT = 1000;
- int NUM = 100;
- DataStore<String,WebPage> store;
-
- for (int i = 0; i < NUM; i++) {
- Worker w = new Worker(i, i * COUNT, COUNT, true);
- w.start();
- }
- while (threadCount.get() > 0) {
- try {
- Thread.sleep(5000);
- System.out.println("-threads " + threadCount.get() + "/" + NUM);
- } catch (Exception e) {};
- }
- System.out.println("Verifying...");
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
- Result<String,WebPage> res = store.execute(store.newQuery());
- int size = COUNT * NUM;
- BitSet keys = new BitSet(size);
- while (res.next()) {
- String key = res.getKey();
- WebPage p = res.get();
- assertEquals(key, p.getTitle().toString());
- int pos = Integer.parseInt(key);
- assertTrue(pos < size && pos >= 0);
- if (keys.get(pos)) {
- fail("key " + key + " already set!");
- }
- keys.set(pos);
+
+ // submit them at once
+ List<Future<Integer>> results = pool.invokeAll(tasks);
+
+ // check results
+ for (Future<Integer> result : results) {
+ assertEquals(0, (int) result.get());
}
- assertEquals(size, keys.cardinality());
}
+ /**
+ * Tests multiple processes reading and writing to the same store backend,
+ * this is to simulate a multi process Nutch environment (i.e. MapReduce).
+ *
+ * @throws Exception
+ */
public void testMultiProcess() throws Exception {
- int COUNT = 1000;
- int NUM = 100;
- DataStore<String,WebPage> store;
- List<Process> procs = new ArrayList<Process>();
-
- for (int i = 0; i < NUM; i++) {
- Process p = launch(i, i * COUNT, COUNT);
- procs.add(p);
- }
-
- while (procs.size() > 0) {
- try {
- Thread.sleep(5000);
- } catch (Exception e) {};
- Iterator<Process> it = procs.iterator();
- while (it.hasNext()) {
- Process p = it.next();
- int code = 1;
- try {
- code = p.exitValue();
- assertEquals(0, code);
- it.remove();
- p.destroy();
- } catch (IllegalThreadStateException e) {
- // not ready yet
+ // create and start a hsql server, a stand-alone (memory backed) db
+ // (important: a stand-alone server should be used because simple
+ // file based access i.e. jdbc:hsqldb:file is NOT process-safe.)
+ Server server = new Server();
+ server.setDaemon(true);
+ server.setSilent(true); // disables LOTS of trace
+ final String className = getClass().getName();
+ server.setDatabasePath(0, "mem:" + className);
+ server.setDatabaseName(0, className);
+ server.start();
+
+ // create a fixed thread pool
+ int numThreads = 4;
+ ExecutorService pool = Executors.newFixedThreadPool(numThreads);
+
+ // spawn multiple processes, each thread spawns own process
+ Collection<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
+ for (int i = 0; i < numThreads; i++) {
+ tasks.add(new Callable<Integer>() {
+ @Override
+ public Integer call() {
+ try {
+ String separator = System.getProperty("file.separator");
+ String classpath = System.getProperty("java.class.path");
+ String path = System.getProperty("java.home") + separator + "bin"
+ + separator + "java";
+ ProcessBuilder processBuilder = new ProcessBuilder(path, "-cp",
+ classpath, className);
+ processBuilder.redirectErrorStream(true);
+ Process process = processBuilder.start();
+ InputStream in = process.getInputStream();
+ int exit = process.waitFor();
+ //print the output of the process
+ System.out.println("===Process stream for " + Thread.currentThread()
+ + "\n" + IOUtils.toString(in) + "===End of process stream.");
+ in.close();
+ // process should exit with zero code
+ return exit;
+ } catch (Exception e) {
+ e.printStackTrace();
+ // this will fail the test
+ return 1;
+ }
}
- }
- System.out.println("* running " + procs.size() + "/" + NUM);
+ });
}
- System.out.println("Verifying...");
- store = StorageUtils.createDataStore(conf, String.class, WebPage.class);
- Result<String,WebPage> res = store.execute(store.newQuery());
- int size = COUNT * NUM;
- BitSet keys = new BitSet(size);
- while (res.next()) {
- String key = res.getKey();
- WebPage p = res.get();
- assertEquals(key, p.getTitle().toString());
- int pos = Integer.parseInt(key);
- assertTrue(pos < size && pos >= 0);
- if (keys.get(pos)) {
- fail("key " + key + " already set!");
- }
- keys.set(pos);
- }
- if (size != keys.cardinality()) {
- System.out.println("ERROR Missing keys:");
- for (int i = 0; i < size; i++) {
- if (keys.get(i)) continue;
- System.out.println(" " + i);
- }
- fail("key count should be " + size + " but is " + keys.cardinality());
+
+ // submit them at once
+ List<Future<Integer>> results = pool.invokeAll(tasks);
+
+ // check results
+ for (Future<Integer> result : results) {
+ assertEquals(0, (int) result.get());
}
- }
-
- private Process launch(int id, int start, int count) throws Exception {
- // Build exec child jmv args.
- Vector<String> vargs = new Vector<String>(8);
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
-
- vargs.add(jvm.toString());
-
- // Add child (task) java-vm options.
- // tmp dir
- String prop = System.getProperty("java.io.tmpdir");
- vargs.add("-Djava.io.tmpdir=" + prop);
- // library path
- prop = System.getProperty("java.library.path");
- if (prop != null) {
- vargs.add("-Djava.library.path=" + prop);
- }
- // working dir
- prop = System.getProperty("user.dir");
- vargs.add("-Duser.dir=" + prop);
- // combat the stupid Xerces issue
- vargs.add("-Djavax.xml.parsers.DocumentBuilderFactory=com.sun.org.apache.xerces.internal.jaxp.DocumentBuilderFactoryImpl");
- // prepare classpath
- String sep = System.getProperty("path.separator");
- StringBuffer classPath = new StringBuffer();
- // start with same classpath as parent process
- classPath.append(System.getProperty("java.class.path"));
- //classPath.append(sep);
- // Add classpath.
- vargs.add("-classpath");
- vargs.add(classPath.toString());
- // append class name and args
- vargs.add(TestGoraStorage.class.getName());
- vargs.add(String.valueOf(id));
- vargs.add(String.valueOf(start));
- vargs.add(String.valueOf(count));
- ProcessBuilder builder = new ProcessBuilder(vargs);
- return builder.start();
+ //stop db
+ server.stop();
}
-
+
public static void main(String[] args) throws Exception {
- if (args.length < 3) {
- System.err.println("Usage: TestGoraStore <id> <startKey> <numRecords>");
- System.exit(-1);
- }
- TestGoraStorage test = new TestGoraStorage();
- test.init();
- int id = Integer.parseInt(args[0]);
- int start = Integer.parseInt(args[1]);
- int count = Integer.parseInt(args[2]);
- Worker w = test.new Worker(id, start, count, true);
- w.run();
- System.exit(0);
+ // entry point for the multiprocess test
+ System.out.println("Starting!");
+
+ Configuration localConf = CrawlTestUtil.createConfiguration();
+ localConf.set("storage.data.store.class", "org.apache.gora.sql.store.SqlStore");
+
+ //connect to local sql service
+ DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.driver","org.hsqldb.jdbcDriver");
+ DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.url",
+ "jdbc:hsqldb:hsql://localhost/"+TestGoraStorage.class.getName());
+ DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.user","sa");
+ DataStoreFactory.properties.setProperty("gora.sqlstore.jdbc.password","");
+
+ DataStore<String, WebPage> store = StorageUtils.createWebStore(localConf,
+ String.class, WebPage.class);
+ readWrite("single_id", store);
+ System.out.println("Done.");
}
}