You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by da...@apache.org on 2015/06/05 12:07:57 UTC

svn commit: r1683700 - in /jackrabbit/oak/trunk: oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java oak-lucene/pom.xml oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java

Author: davide
Date: Fri Jun  5 10:07:56 2015
New Revision: 1683700

URL: http://svn.apache.org/r1683700
Log:
OAK-2961 - Async index fails with OakState0001: Unresolved conflicts in /:async

- ignored the test
- generic class for extending document cluster tests. DocumentClusterIT.
- test dependencies for running the IT

Added:
    jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java
Modified:
    jackrabbit/oak/trunk/oak-lucene/pom.xml

Added: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java?rev=1683700&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java (added)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java Fri Jun  5 10:07:56 2015
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.jcr;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest.dispose;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jcr.Credentials;
+import javax.jcr.Repository;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+
+/**
+ * abstract class that can be inherited by an IT who has to run tests against a cluster of
+ * DocumentMKs for having some utility methods available.
+ */
+public abstract class DocumentClusterIT {
+    List<Repository> repos = new ArrayList<Repository>();
+    List<DocumentMK> mks = new ArrayList<DocumentMK>();
+
+    /**
+     * the number of nodes we'd like to run against
+     */
+    static final int NUM_CLUSTER_NODES = Integer.getInteger("it.documentmk.cluster.nodes", 5);
+    
+    /**
+     * credentials for logging in as {@code admin}
+     */
+    static final Credentials ADMIN = new SimpleCredentials("admin", "admin".toCharArray());
+    
+    static final int NOT_PROVIDED = Integer.MIN_VALUE;
+    
+    @Before
+    public void before() throws Exception {
+        dropDB(this.getClass());
+        
+        List<Repository> rs = new ArrayList<Repository>();
+        List<DocumentMK> ds = new ArrayList<DocumentMK>();
+        
+        initRepository(this.getClass(), rs, ds, 1, NOT_PROVIDED);
+        
+        Repository repository = rs.get(0);
+        DocumentMK mk = ds.get(0);
+        
+        Session session = repository.login(ADMIN);
+        session.logout();
+        dispose(repository);
+        mk.dispose(); // closes connection as well
+    }
+
+    @After
+    public void after() throws Exception {
+        for (Repository repo : repos) {
+            dispose(repo);
+        }
+        for (DocumentMK mk : mks) {
+            mk.dispose();
+        }
+        dropDB(this.getClass());
+    }
+
+    /**
+     * raise the exception passed into the provided Map
+     * 
+     * @param exceptions
+     * @param log may be null. If valid Logger it will be logged
+     * @throws Exception
+     */
+    static void raiseExceptions(@Nonnull final Map<String, Exception> exceptions, 
+                                @Nullable final Logger log) throws Exception {
+        if (exceptions != null) {
+            for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
+                if (log != null) {
+                    log.error("Exception in thread {}", entry.getKey(), entry.getValue());
+                }
+                throw entry.getValue();
+            }
+        }
+    }
+
+    /**
+     * <p> 
+     * ensures that the cluster is aligned by running all the background operations
+     * </p>
+     * 
+     * <p>
+     * In order to use this you have to initialise the cluster with {@code setAsyncDelay(0)}.
+     * </p>
+     * 
+     * @param mks the list of {@link DocumentMK} composing the cluster. Cannot be null.
+     */
+    static void alignCluster(@Nonnull final List<DocumentMK> mks) {
+        for (int i = 0; i < 2; i++) {
+            for (DocumentMK mk : mks) {
+                mk.getNodeStore().runBackgroundOperations();
+            }            
+        }
+    }
+    
+    /**
+     * set up the cluster connections
+     * 
+     * @param clazz class used for logging into Mongo itself
+     * @param mks the list of mks to work on.
+     * @param repos list of {@link Repository} created on each {@code mks}
+     * @throws Exception
+     */
+    void setUpCluster(@Nonnull final Class<?> clazz, 
+                             @Nonnull final List<DocumentMK> mks,
+                             @Nonnull final List<Repository> repos) throws Exception {
+        setUpCluster(clazz, mks, repos, NOT_PROVIDED);
+    }
+
+    void setUpCluster(@Nonnull final Class<?> clazz, 
+                             @Nonnull final List<DocumentMK> mks,
+                             @Nonnull final List<Repository> repos,
+                             final int asyncDelay) throws Exception {
+        for (int i = 0; i < NUM_CLUSTER_NODES; i++) {
+            initRepository(clazz, repos, mks, i + 1, asyncDelay);
+        }        
+    }
+
+    static MongoConnection createConnection(@Nonnull final Class<?> clazz) throws Exception {
+        return OakMongoNSRepositoryStub.createConnection(
+                checkNotNull(clazz).getSimpleName());
+    }
+
+    static void dropDB(@Nonnull final Class<?> clazz) throws Exception {
+        MongoConnection con = createConnection(checkNotNull(clazz));
+        try {
+            con.getDB().dropDatabase();
+        } finally {
+            con.close();
+        }
+    }
+
+    /**
+     * initialise the repository
+     * 
+     * @param clazz the current class. Used for logging. Cannot be null.
+     * @param repos list to which add the created repository. Cannot be null.
+     * @param mks list to which add the created MK. Cannot be null.
+     * @param clusterId the cluster ID to use. Must be greater than 0.
+     * @param asyncDelay the async delay to set. For default use {@link #NOT_PROVIDED}
+     * @throws Exception
+     */
+    protected void initRepository(@Nonnull final Class<?> clazz, 
+                                  @Nonnull final List<Repository> repos, 
+                                  @Nonnull final List<DocumentMK> mks,
+                                  final int clusterId,
+                                  final int asyncDelay) throws Exception {
+        DocumentMK.Builder builder = new DocumentMK.Builder(); 
+        builder.setMongoDB(createConnection(checkNotNull(clazz)).getDB());
+        if (asyncDelay != NOT_PROVIDED) {
+            builder.setAsyncDelay(asyncDelay);
+        }
+        builder.setClusterId(clusterId);
+        
+        DocumentMK mk = builder.open();
+        Jcr j = new Jcr(mk.getNodeStore());
+        
+        Set<IndexEditorProvider> ieps = additionalIndexEditorProviders();
+        if (ieps != null) {
+            for (IndexEditorProvider p : ieps) {
+                j = j.with(p);
+            }
+        }
+        
+        if (isAsyncIndexing()) {
+            j = j.withAsyncIndexing();
+        }
+        
+        Repository repository = j.createRepository();
+        
+        checkNotNull(repos).add(repository);
+        checkNotNull(mks).add(mk);
+    }
+    
+    /**
+     * <p>
+     * the default {@link #initRepository(Class, List, List, int, int)} uses this for registering
+     * any additional {@link IndexEditorProvider}. Override and return all the provider you'd like
+     * to have running other than the OOTB one.
+     * </p>
+     * 
+     * <p>
+     * the default implementation returns {@code null}
+     * </p>
+     * @return
+     */
+    protected Set<IndexEditorProvider> additionalIndexEditorProviders() {
+        return null;
+    }
+    
+    /**
+     * override to change default behaviour. If {@code true} will enable the async indexing in the
+     * cluster. Default is {@code false}
+     * 
+     * @return
+     */
+    protected boolean isAsyncIndexing() {
+        return false;
+    }
+}

Modified: jackrabbit/oak/trunk/oak-lucene/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/pom.xml?rev=1683700&r1=1683699&r2=1683700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-lucene/pom.xml Fri Jun  5 10:07:56 2015
@@ -222,11 +222,21 @@
 
     <!-- Test Dependencies -->
     <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mongodb</groupId>
+      <artifactId>mongo-java-driver</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.jackrabbit</groupId>
       <artifactId>oak-core</artifactId>
       <version>${project.version}</version>
@@ -245,6 +255,13 @@
       <version>${project.version}</version>
       <type>test-jar</type>
       <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.jackrabbit</groupId>
+      <artifactId>oak-commons</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.jackrabbit</groupId>

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java?rev=1683700&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java Fri Jun  5 10:07:56 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.jcr;
+
+import static org.apache.jackrabbit.JcrConstants.NT_UNSTRUCTURED;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.NT_OAK_UNSTRUCTURED;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import org.apache.jackrabbit.oak.commons.FixturesHelper;
+import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.read.ListAppender;
+import ch.qos.logback.core.spi.FilterReply;
+
+import com.google.common.collect.ImmutableSet;
+
+public class AsyncConflictsIT extends DocumentClusterIT {
+    private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures();
+    private static final String INDEX_DEF_NODE = "asyncconflict";
+    private static final String INDEX_PROPERTY = "number";
+    private static final Logger LOG = LoggerFactory.getLogger(AsyncConflictsIT.class);
+    
+    @BeforeClass
+    public static void assumptions() {
+        assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS));
+        assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
+    }
+    
+    @Test @Ignore("OAK-2961")
+    public void updates() throws Exception {
+        final Map<String, Exception> exceptions = Collections.synchronizedMap(new HashMap<String, Exception>());
+        final Random generator = new Random(3);
+        final ListAppender<ILoggingEvent> logAppender = subscribeAppender();
+        
+        setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED);
+        defineIndex(repos.get(0));
+                
+        final int numberNodes = 10000;
+        
+        LOG.info("adding {} nodes", numberNodes);
+        Session s = repos.get(0).login(ADMIN);
+        Node test = s.getRootNode().addNode("test");
+        test.setPrimaryType(NT_OAK_UNSTRUCTURED);
+        
+        try {
+            for (int i = 0; i < numberNodes; i++) {
+                test.addNode("node" + i);
+                test.setProperty(INDEX_PROPERTY, generator.nextInt(numberNodes/3));
+                if (i % 1024 == 0) {
+                    s.save();
+                }
+            }
+            
+            s.save();
+        } catch (Exception e) {
+            exceptions.put(Thread.currentThread().getName(), e);
+        } finally {
+            s.logout();
+        }
+        
+        LOG.info("Nodes added.");
+        
+        // issuing re-index
+        LOG.info("issuing re-index and wait for finish");
+        s = repos.get(0).login(ADMIN);
+        try {
+            Node index = s.getNode("/oak:index/" + INDEX_DEF_NODE);
+            index.setProperty(REINDEX_PROPERTY_NAME, true);
+            s.save();
+        } catch (Exception e) {
+            exceptions.put(Thread.currentThread().getName(), e);
+        } finally {
+            s.logout();
+        }
+        while (!isReindexFinished()) {
+            Thread.sleep(5000);
+        }
+        
+        raiseExceptions(exceptions, LOG);
+        
+        // if following fails it means the Async index failed at least once.
+        assertTrue(
+            String.format("We should have not any '%s' in the logs", AsyncLogFilter.MESSAGE),
+            logAppender.list.isEmpty());
+        
+        unsubscribe(logAppender);
+    }
+    
+    private boolean isReindexFinished() throws RepositoryException {
+        Session s = repos.get(0).login(ADMIN);
+        try {
+            boolean reindex = s.getNode("/oak:index/" + INDEX_DEF_NODE)
+                .getProperty(REINDEX_PROPERTY_NAME).getBoolean();
+            return !reindex;
+        } finally {
+            s.logout();
+        }
+    }
+    
+    private void defineIndex(@Nonnull final Repository repo) throws RepositoryException {
+        Session session = repo.login(ADMIN);
+        try {
+            Node n = session.getRootNode().getNode("oak:index");
+            
+            n = n.addNode(INDEX_DEF_NODE);
+            n.setPrimaryType(IndexConstants.INDEX_DEFINITIONS_NODE_TYPE);
+            n.setProperty("compatVersion", 2);
+            n.setProperty(TYPE_PROPERTY_NAME, "lucene");
+            n.setProperty(ASYNC_PROPERTY_NAME, "async");
+            n = n.addNode("indexRules");
+            n.setPrimaryType(NT_UNSTRUCTURED);
+            n = n.addNode("nt:unstructured");
+            n = n.addNode("properties");
+            n.setPrimaryType(NT_UNSTRUCTURED);
+            n = n.addNode("number");
+            n.setPrimaryType(NT_UNSTRUCTURED);
+            n.setProperty("propertyIndex", true);
+            n.setProperty("name", INDEX_PROPERTY);
+            
+            session.save();
+        } finally {
+            session.logout();
+        }
+    }
+
+    @Override
+    protected Set<IndexEditorProvider> additionalIndexEditorProviders() {
+        return ImmutableSet.of((IndexEditorProvider) new LuceneIndexEditorProvider());
+    }
+
+    @Override
+    protected boolean isAsyncIndexing() {
+        return true;
+    }
+    
+    private ListAppender<ILoggingEvent> subscribeAppender() {
+        Filter<ILoggingEvent> filter = new AsyncLogFilter();
+        filter.start();
+        ListAppender<ILoggingEvent> appender = new ListAppender<ILoggingEvent>();
+        appender.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
+        appender.setName("asynclogcollector");
+        appender.addFilter(filter);
+        appender.start();
+        ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
+            ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(appender);
+        return appender;
+        
+    }
+    
+    private void unsubscribe(@Nonnull final Appender<ILoggingEvent> appender) {
+        ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
+            ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).detachAppender(appender);
+    }
+    
+    private static class AsyncLogFilter extends Filter<ILoggingEvent> {
+        public static final String MESSAGE = "Unresolved conflicts in /:async";
+        
+        @Override
+        public FilterReply decide(ILoggingEvent event) {
+            final IThrowableProxy tp = event.getThrowableProxy();
+            
+            if (event.getLevel().isGreaterOrEqual(Level.WARN) &&
+                tp != null && 
+                tp.getMessage().contains(MESSAGE)) {
+                return FilterReply.ACCEPT;
+            } else {
+                return FilterReply.DENY;
+            }
+        }
+        
+    }
+}