You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by da...@apache.org on 2015/04/29 14:18:40 UTC

svn commit: r1676729 - in /jackrabbit/oak/trunk: oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/ oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/ oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/

Author: davide
Date: Wed Apr 29 12:18:34 2015
New Revision: 1676729

URL: http://svn.apache.org/r1676729
Log:
OAK-2813 - Create a benchmark for measuring the lag of async index

- benchmark to measure Lucene Property Index on the same thread as Global Full-text index,
- benchmark to measure Lucene Property Index on a separate thread from Global full-text.
- hook in benchmark suite for halting the benchmarks
- hook in benchmark suite for halting any separate thread fired by a benchmark

Added:
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFTSeparated.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFullTextTest.java
Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneInitializerHelper.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/AbstractTest.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
    jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/WikipediaImport.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneInitializerHelper.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneInitializerHelper.java?rev=1676729&r1=1676728&r2=1676729&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneInitializerHelper.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/util/LuceneInitializerHelper.java Wed Apr 29 12:18:34 2015
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.lucene.util;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLuceneIndexDefinition;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneIndexHelper.newLuceneFileIndexDefinition;
@@ -72,11 +73,25 @@ public class LuceneInitializerHelper imp
         this.storageEnabled = storageEnabled;
     }
 
+    /**
+     * set the {@code async} property to "async".
+     * @return
+     */
     public LuceneInitializerHelper async() {
-        async = "async";
-        return this;
+        return async("async");
     }
 
+    /**
+     * will set the {@code async} property to the provided value
+     * 
+     * @param async
+     * @return
+     */
+    public LuceneInitializerHelper async(@Nonnull final String async) {
+        this.async = checkNotNull(async);
+        return this;
+    }
+    
     @Override
     public void initialize(@Nonnull NodeBuilder builder) {
         if (builder.hasChildNode(INDEX_DEFINITIONS_NAME)

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/AbstractTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/AbstractTest.java?rev=1676729&r1=1676728&r2=1676729&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/AbstractTest.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/AbstractTest.java Wed Apr 29 12:18:34 2015
@@ -24,6 +24,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.annotation.Nullable;
 import javax.jcr.Credentials;
 import javax.jcr.GuestCredentials;
 import javax.jcr.Repository;
@@ -31,10 +32,14 @@ import javax.jcr.RepositoryException;
 import javax.jcr.Session;
 import javax.jcr.SimpleCredentials;
 
+import jline.internal.Log;
+
 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
 import org.apache.commons.math.stat.descriptive.SynchronizedDescriptiveStatistics;
 import org.apache.jackrabbit.oak.benchmark.util.Profiler;
 import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract base class for individual performance benchmarks.
@@ -67,6 +72,8 @@ abstract class AbstractTest<T> extends B
     
     private static final boolean PROFILE = Boolean.getBoolean("profile");
     
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
+    
     private Repository repository;
 
     private Credentials credentials;
@@ -80,7 +87,56 @@ abstract class AbstractTest<T> extends B
     private Profiler profiler;
 
     private PrintStream out;
+    
+    /**
+     * <p>
+     * used to signal the {@link #runTest(int)} if stop running future test planned or not. If set
+     * to true, it will exit the loop not performing any more tests.
+     * </p>
+     * 
+     * <p>
+     * useful when the running of the benchmark makes sense for as long as other processes didn't
+     * complete.
+     * </p>
+     * 
+     * <p>
+     * Set this variable from within the benchmark itself by using {@link #issueHaltRequest(String)}
+     * </p>
+     * 
+     * <p>
+     * <strong>it works only for concurrency level of 1 ({@code --concurrency 1} the
+     * default)</strong>
+     * </p>
+     */
+    private boolean haltRequested;
 
+    
+    /**
+     * If concurrency level is 1 ({@code --concurrency 1}, the default) it will issue a request to
+     * halt any future runs of a single benchmark. Useful when the benchmark makes sense only if run
+     * in conjunction of any other parallel operations.
+     * 
+     * @param message an optional message that can be provided. It will logged at info level.
+     */
+    protected void issueHaltRequest(@Nullable final String message) {
+        String m = message == null ? "" : message;
+        LOG.info("halt requested. {}", m);
+        haltRequested = true;
+    }
+
+    /**
+     * <p>
+     * this method will be called during the {@link #tearDown()} before the {@link #afterSuite()}.
+     * Override it if you have background processes you wish to stop.
+     * </p>
+     * <p>
+     * For example in case of big imports, the suite could be keep running for as long as the import
+     * is running, even if the tests are actually no longer executed.
+     * </p>
+     */
+    protected void issueHaltChildThreads() {
+    }
+    
     @Override
     public void setPrintStream(PrintStream out) {
         this.out = out;
@@ -110,6 +166,8 @@ abstract class AbstractTest<T> extends B
 
         this.running = true;
 
+        haltRequested = false;
+        
         beforeSuite();
         if (PROFILE) {
             profiler = new Profiler().startCollecting();
@@ -153,7 +211,13 @@ abstract class AbstractTest<T> extends B
             
             // Run a few iterations to warm up the system
             long warmupEnd = System.currentTimeMillis() + WARMUP;
-            while (System.currentTimeMillis() < warmupEnd) {
+            boolean stop = false;
+            while (System.currentTimeMillis() < warmupEnd && !stop) {
+                if (!stop) {
+                    // we want to execute this at lease once. after that we consider the
+                    // `haltRequested` flag.
+                    stop = haltRequested;
+                }
                 execute();
             }
 
@@ -228,7 +292,13 @@ abstract class AbstractTest<T> extends B
         if (concurrencyLevel == 1) {
             // Run test iterations, and capture the execution times
             long runtimeEnd = System.currentTimeMillis() + RUNTIME;
-            while (System.currentTimeMillis() < runtimeEnd) {
+            boolean stop = false;
+            while (System.currentTimeMillis() < runtimeEnd && !stop) {
+                if (!stop) {
+                    // we want to execute this at lease once. after that we consider the
+                    // `haltRequested` flag.
+                    stop = haltRequested;
+                }
                 statistics.addValue(execute());
             }
 
@@ -305,6 +375,7 @@ abstract class AbstractTest<T> extends B
      * @throws Exception if the benchmark can not be cleaned up
      */
     public void tearDown() throws Exception {
+        issueHaltChildThreads();
         this.running = false;
         for (Thread thread : threads) {
             thread.join();

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java?rev=1676729&r1=1676728&r2=1676729&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/BenchmarkRunner.java Wed Apr 29 12:18:34 2015
@@ -290,7 +290,15 @@ public class BenchmarkRunner {
                     wikipedia.value(options),
                     flatStructure.value(options),
                     report.value(options), withStorage.value(options), withServer.value(options)),
-            new FindAuthorizableWithScopeTest(numberOfUsers.value(options), setScope.value(options))
+            new FindAuthorizableWithScopeTest(numberOfUsers.value(options), setScope.value(options)),
+            new LucenePropertyFullTextTest(
+                wikipedia.value(options),
+                flatStructure.value(options),
+                report.value(options), withStorage.value(options)),
+            new LucenePropertyFTSeparated(
+                wikipedia.value(options),
+                flatStructure.value(options),
+                report.value(options), withStorage.value(options))
         };
 
         Set<String> argset = Sets.newHashSet(nonOption.values(options));

Added: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFTSeparated.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFTSeparated.java?rev=1676729&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFTSeparated.java (added)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFTSeparated.java Wed Apr 29 12:18:34 2015
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.benchmark;
+
+import static com.google.common.collect.ImmutableSet.of;
+
+import java.io.File;
+
+import javax.jcr.Repository;
+
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
+import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneInitializerHelper;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+
+/**
+ * same as {@link LucenePropertyFullTextTest} but will initialise a repository where the global
+ * full-text runs on a separate thread from lucene property.
+ */
+public class LucenePropertyFTSeparated extends LucenePropertyFullTextTest {
+
+    public LucenePropertyFTSeparated(final File dump, 
+                                     final boolean flat, 
+                                     final boolean doReport,
+                                     final Boolean storageEnabled) {
+        super(dump, flat, doReport, storageEnabled);
+        currentTest = this.getClass().getSimpleName();
+    }
+
+    @Override
+    protected Repository[] createRepository(RepositoryFixture fixture) throws Exception {
+        if (fixture instanceof OakRepositoryFixture) {
+            currentFixture = fixture.toString();
+            return ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() {
+                @Override
+                public Jcr customize(Oak oak) {
+                    LuceneIndexProvider provider = new LuceneIndexProvider();
+                    oak.with((QueryIndexProvider) provider)
+                       .with((Observer) provider)
+                       .with(new LuceneIndexEditorProvider())
+                        .with(
+                            (new LuceneInitializerHelper("luceneGlobal", storageEnabled))
+                                .async("async-slow"))
+                       // the WikipediaImporter set a property `title`
+                       .with(new LucenePropertyInitialiser("luceneTitle", of("title")))
+                       .withAsyncIndexing("async", 5)
+                       .withAsyncIndexing("async-slow", 5);
+                    return new Jcr(oak);
+                }
+            });
+        }
+        return super.createRepository(fixture);
+    }
+}

Added: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFullTextTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFullTextTest.java?rev=1676729&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFullTextTest.java (added)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/LucenePropertyFullTextTest.java Wed Apr 29 12:18:34 2015
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.benchmark;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.ImmutableSet.of;
+import static org.apache.jackrabbit.oak.api.Type.BOOLEAN;
+import static org.apache.jackrabbit.oak.api.Type.LONG;
+import static org.apache.jackrabbit.oak.api.Type.NAME;
+import static org.apache.jackrabbit.oak.api.Type.STRING;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NODE_TYPE;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.COMPAT_MODE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_RULES;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_NODE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.PROP_PROPERTY_INDEX;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
+
+import java.io.File;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.ValueFactory;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.RowIterator;
+
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.benchmark.wikipedia.WikipediaImport;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.fixture.JcrCreator;
+import org.apache.jackrabbit.oak.fixture.OakRepositoryFixture;
+import org.apache.jackrabbit.oak.fixture.RepositoryFixture;
+import org.apache.jackrabbit.oak.jcr.Jcr;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.util.LuceneInitializerHelper;
+import org.apache.jackrabbit.oak.plugins.tree.TreeFactory;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.lifecycle.RepositoryInitializer;
+import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Perform a benchmark on how long it takes for an ingested item to be available in a Lucene
+ * Property index when indexed in conjunction with a Global full-text lucene (same thread). It makes
+ * use of the {@link WikipediaImport} to use a Wikipedia dump for content injestion.
+ * </p>
+ * <p>
+ * Suggested dump: 
+ * {@linkplain https://dumps.wikimedia.org/enwiki/20150403/enwiki-20150403-pages-articles.xml.bz2}
+ * </p>
+ * <p>
+ * Usage example:
+ * </p>
+ * 
+ * <pre>
+ * java -Druntime=900 -Dlogback.configurationFile=logback-benchmark.xml \
+ *      -jar ~/.m2/repository/org/apache/jackrabbit/oak-run/1.4-SNAPSHOT/oak-run-1.4-SNAPSHOT.jar \
+ *      benchmark --wikipedia enwiki-20150403-pages-articles.xml.bz2 \
+ *      --base ~/tmp/oak/ LucenePropertyFullTextTest Oak-Tar Oak-Mongo
+ * </pre>
+ * <p>
+ * it will run the benchmark for 15 minutes against TarNS and MongoNS.
+ * </p>
+ */
+public class LucenePropertyFullTextTest extends AbstractTest<LucenePropertyFullTextTest.TestContext> {
+    private static final Logger LOG = LoggerFactory.getLogger(LucenePropertyFullTextTest.class);
+    private WikipediaImport importer;    
+    private Thread asyncImporter;
+    private boolean benchmarkCompleted, importerCompleted;
+    Boolean storageEnabled;
+    String currentFixture, currentTest;
+    
+    /**
+     * context used across the tests
+     */
+    class TestContext {
+        final Session session = loginWriter();
+        final String title;
+        
+        public TestContext(@Nonnull final String title) {
+            this.title = checkNotNull(title);
+        }
+    }
+
+    /**
+     * helper class to initialise the Lucene Property index definition
+     */
+    static class LucenePropertyInitialiser implements RepositoryInitializer {
+        private String name;
+        private Set<String> properties;
+        
+        public LucenePropertyInitialiser(@Nonnull final String name, 
+                                         @Nonnull final Set<String> properties) {
+            this.name = checkNotNull(name);
+            this.properties = checkNotNull(properties);
+        }
+                
+        private boolean isAlreadyThere(@Nonnull final NodeBuilder root) {
+            return checkNotNull(root).hasChildNode(INDEX_DEFINITIONS_NAME) &&
+                root.getChildNode(INDEX_DEFINITIONS_NAME).hasChildNode(name);
+        }
+        
+        @Override
+        public void initialize(final NodeBuilder builder) {
+            if (!isAlreadyThere(builder)) {
+                Tree t = TreeFactory.createTree(builder.child(INDEX_DEFINITIONS_NAME));
+                t = t.addChild(name);
+                t.setProperty("jcr:primaryType", INDEX_DEFINITIONS_NODE_TYPE, NAME);
+                t.setProperty(COMPAT_MODE, 2L, LONG);
+                t.setProperty(TYPE_PROPERTY_NAME, TYPE_LUCENE, STRING);
+                t.setProperty(ASYNC_PROPERTY_NAME, "async", STRING);
+                t.setProperty(REINDEX_PROPERTY_NAME, true);
+                
+                t = t.addChild(INDEX_RULES);
+                t.setOrderableChildren(true);
+                t.setProperty("jcr:primaryType", "nt:unstructured", NAME);
+                
+                t = t.addChild("nt:base");
+                
+                Tree propnode = t.addChild(PROP_NODE);
+                propnode.setOrderableChildren(true);
+                propnode.setProperty("jcr:primaryType", "nt:unstructured", NAME);
+                
+                for (String p : properties) {
+                    Tree t1 = propnode.addChild(PathUtils.getName(p));
+                    t1.setProperty(PROP_PROPERTY_INDEX, true, BOOLEAN);
+                    t1.setProperty(PROP_NAME, p);
+                }
+            }
+        }
+    }
+    
+    /**
+     * reference to the last added title. Used for looking up with queries.
+     */
+    private AtomicReference<String> lastTitle = new AtomicReference<String>();
+    
+    public LucenePropertyFullTextTest(final File dump, 
+                                      final boolean flat, 
+                                      final boolean doReport, 
+                                      final Boolean storageEnabled) {
+        this.importer = new WikipediaImport(dump, flat, doReport) {
+
+            @Override
+            protected void pageAdded(String title, String text) {
+                LOG.trace("Setting title: {}", title);
+                lastTitle.set(title);
+            }
+        };
+        this.storageEnabled = storageEnabled;
+        this.currentTest = this.getClass().getSimpleName();
+    }
+
+    @Override
+    protected Repository[] createRepository(RepositoryFixture fixture) throws Exception {
+        if (fixture instanceof OakRepositoryFixture) {
+            currentFixture = fixture.toString();
+            return ((OakRepositoryFixture) fixture).setUpCluster(1, new JcrCreator() {
+                @Override
+                public Jcr customize(Oak oak) {
+                    LuceneIndexProvider provider = new LuceneIndexProvider();
+                    oak.with((QueryIndexProvider) provider)
+                       .with((Observer) provider)
+                       .with(new LuceneIndexEditorProvider())
+                       .with((new LuceneInitializerHelper("luceneGlobal", storageEnabled)).async())
+                       // the WikipediaImporter set a property `title`
+                       .with(new LucenePropertyInitialiser("luceneTitle", of("title")))
+                       .withAsyncIndexing("async", 5);
+                    return new Jcr(oak);
+                }
+            });
+        }
+        return super.createRepository(fixture);
+    }
+
+    @Override
+    protected void beforeSuite() throws Exception {
+        LOG.debug("beforeSuite() - {} - {}", currentFixture, currentTest);
+        benchmarkCompleted = false;
+        importerCompleted = false;
+        asyncImporter = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    importer.importWikipedia(loginWriter());
+                } catch (Exception e) {
+                    LOG.error("Error while importing the dump. Trying to halt everything.", e);
+                    importerCompleted = true;
+                } finally {
+                    if (!benchmarkCompleted) {
+                        importerCompleted = true;
+                        issueHaltRequest("Wikipedia import completed.");
+                    }
+                }
+            }
+        });
+        asyncImporter.start();
+
+        // allowing the async index to catch up. 
+        TimeUnit.SECONDS.sleep(10);
+    }
+
+    @Override
+    protected void afterSuite() throws Exception {
+        LOG.debug("afterSuite() - {} - {}", currentFixture, currentTest);
+        asyncImporter.join();
+    }
+    
+    @Override
+    protected void runTest() throws Exception {
+        if (lastTitle.get() == null) {
+            return;
+        }
+        runTest(new TestContext(lastTitle.get()));
+    }
+
+    @Override
+    protected void runTest(final TestContext ec) throws Exception {
+        if (importerCompleted) {
+            return;
+        }
+        final long maxWait = TimeUnit.MINUTES.toMillis(5);
+        final long waitUnit = 50;
+        long sleptSoFar = 0;
+        
+        while (!performQuery(ec) && sleptSoFar < maxWait) {
+            LOG.trace("title '{}' not found. Waiting and retry. sleptSoFar: {}ms", ec.title,
+                sleptSoFar);
+            sleptSoFar += waitUnit;
+            TimeUnit.MILLISECONDS.sleep(waitUnit);
+        }
+        
+        if (sleptSoFar < maxWait) {
+            // means we exited the loop as we found it.
+            LOG.info("{} - {} - title '{}' found with a wait/try of {}ms", currentFixture,
+                currentTest, ec.title, sleptSoFar);
+        } else {
+            LOG.warn("{} - {} - title '{}' timed out with a way/try of {}ms.", currentFixture,
+                currentTest, ec.title, sleptSoFar);
+        }
+    }
+    
+    private boolean performQuery(@Nonnull final TestContext ec) throws RepositoryException {
+        QueryManager qm = ec.session.getWorkspace().getQueryManager();
+        ValueFactory vf = ec.session.getValueFactory();
+        Query q = qm.createQuery("SELECT * FROM [nt:base] WHERE [title] = $title", Query.JCR_SQL2);
+        q.bindValue("title", vf.createValue(ec.title));
+        LOG.trace("statement: {} - title: {}", q.getStatement(), ec.title);        
+        RowIterator rows = q.execute().getRows();
+        if (rows.hasNext()) {
+            rows.nextRow().getPath();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    protected void issueHaltChildThreads() {
+        if (!importerCompleted) {
+            LOG.info("benchmark completed. Issuing an halt for the importer");
+            benchmarkCompleted = true;
+            this.importer.issueHaltImport();
+        }
+    }
+}

Modified: jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/WikipediaImport.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/WikipediaImport.java?rev=1676729&r1=1676728&r2=1676729&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/WikipediaImport.java (original)
+++ jackrabbit/oak/trunk/oak-run/src/main/java/org/apache/jackrabbit/oak/benchmark/wikipedia/WikipediaImport.java Wed Apr 29 12:18:34 2015
@@ -48,6 +48,12 @@ public class WikipediaImport extends Ben
     private final boolean doReport;
 
     private final boolean flat;
+    
+    /**
+     * Used in {@link #importWikipedia(Session)}. If set to true it will stop the loop for the
+     * import. Use {@link #issueHaltImport()} to issue an halt request.
+     */
+    private boolean haltImport;
 
     public WikipediaImport(File dump, boolean flat, boolean doReport) {
         this.dump = dump;
@@ -103,6 +109,14 @@ public class WikipediaImport extends Ben
         }
     }
 
+    /**
+     * will issue an halt request for the {@link #importWikipedia(Session)} so that it will stop
+     * importing.
+     */
+    public void issueHaltImport() {
+        haltImport = true;
+    }
+    
     public int importWikipedia(Session session) throws Exception {
         long start = System.currentTimeMillis();
         int count = 0;
@@ -138,8 +152,9 @@ public class WikipediaImport extends Ben
             source = new StreamSource(csf.createCompressorInputStream(
                     new BufferedInputStream(new FileInputStream(dump))));
         }
+        haltImport = false;
         XMLStreamReader reader = factory.createXMLStreamReader(source);
-        while (reader.hasNext()) {
+        while (reader.hasNext() && !haltImport) {
             switch (reader.next()) {
             case XMLStreamConstants.START_ELEMENT:
                 if ("title".equals(reader.getLocalName())) {