You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by da...@apache.org on 2015/06/05 12:07:57 UTC
svn commit: r1683700 - in /jackrabbit/oak/trunk:
oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
oak-lucene/pom.xml
oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java
Author: davide
Date: Fri Jun 5 10:07:56 2015
New Revision: 1683700
URL: http://svn.apache.org/r1683700
Log:
OAK-2961 - Async index fails with OakState0001: Unresolved conflicts in /:async
- ignored the test
- generic class for extending document cluster tests. DocumentClusterIT.
- test dependencies for running the IT
Added:
jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java
Modified:
jackrabbit/oak/trunk/oak-lucene/pom.xml
Added: jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java?rev=1683700&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java (added)
+++ jackrabbit/oak/trunk/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/DocumentClusterIT.java Fri Jun 5 10:07:56 2015
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.jcr;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest.dispose;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.jcr.Credentials;
+import javax.jcr.Repository;
+import javax.jcr.Session;
+import javax.jcr.SimpleCredentials;
+
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+
+/**
+ * abstract class that can be inherited by an IT who has to run tests against a cluster of
+ * DocumentMKs for having some utility methods available.
+ */
+public abstract class DocumentClusterIT {
+ List<Repository> repos = new ArrayList<Repository>();
+ List<DocumentMK> mks = new ArrayList<DocumentMK>();
+
+ /**
+ * the number of nodes we'd like to run against
+ */
+ static final int NUM_CLUSTER_NODES = Integer.getInteger("it.documentmk.cluster.nodes", 5);
+
+ /**
+ * credentials for logging in as {@code admin}
+ */
+ static final Credentials ADMIN = new SimpleCredentials("admin", "admin".toCharArray());
+
+ static final int NOT_PROVIDED = Integer.MIN_VALUE;
+
+ @Before
+ public void before() throws Exception {
+ dropDB(this.getClass());
+
+ List<Repository> rs = new ArrayList<Repository>();
+ List<DocumentMK> ds = new ArrayList<DocumentMK>();
+
+ initRepository(this.getClass(), rs, ds, 1, NOT_PROVIDED);
+
+ Repository repository = rs.get(0);
+ DocumentMK mk = ds.get(0);
+
+ Session session = repository.login(ADMIN);
+ session.logout();
+ dispose(repository);
+ mk.dispose(); // closes connection as well
+ }
+
+ @After
+ public void after() throws Exception {
+ for (Repository repo : repos) {
+ dispose(repo);
+ }
+ for (DocumentMK mk : mks) {
+ mk.dispose();
+ }
+ dropDB(this.getClass());
+ }
+
+ /**
+ * raise the exception passed into the provided Map
+ *
+ * @param exceptions
+ * @param log may be null. If valid Logger it will be logged
+ * @throws Exception
+ */
+ static void raiseExceptions(@Nonnull final Map<String, Exception> exceptions,
+ @Nullable final Logger log) throws Exception {
+ if (exceptions != null) {
+ for (Map.Entry<String, Exception> entry : exceptions.entrySet()) {
+ if (log != null) {
+ log.error("Exception in thread {}", entry.getKey(), entry.getValue());
+ }
+ throw entry.getValue();
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * ensures that the cluster is aligned by running all the background operations
+ * </p>
+ *
+ * <p>
+ * In order to use this you have to initialise the cluster with {@code setAsyncDelay(0)}.
+ * </p>
+ *
+ * @param mks the list of {@link DocumentMK} composing the cluster. Cannot be null.
+ */
+ static void alignCluster(@Nonnull final List<DocumentMK> mks) {
+ for (int i = 0; i < 2; i++) {
+ for (DocumentMK mk : mks) {
+ mk.getNodeStore().runBackgroundOperations();
+ }
+ }
+ }
+
+ /**
+ * set up the cluster connections
+ *
+ * @param clazz class used for logging into Mongo itself
+ * @param mks the list of mks to work on.
+ * @param repos list of {@link Repository} created on each {@code mks}
+ * @throws Exception
+ */
+ void setUpCluster(@Nonnull final Class<?> clazz,
+ @Nonnull final List<DocumentMK> mks,
+ @Nonnull final List<Repository> repos) throws Exception {
+ setUpCluster(clazz, mks, repos, NOT_PROVIDED);
+ }
+
+ void setUpCluster(@Nonnull final Class<?> clazz,
+ @Nonnull final List<DocumentMK> mks,
+ @Nonnull final List<Repository> repos,
+ final int asyncDelay) throws Exception {
+ for (int i = 0; i < NUM_CLUSTER_NODES; i++) {
+ initRepository(clazz, repos, mks, i + 1, asyncDelay);
+ }
+ }
+
+ static MongoConnection createConnection(@Nonnull final Class<?> clazz) throws Exception {
+ return OakMongoNSRepositoryStub.createConnection(
+ checkNotNull(clazz).getSimpleName());
+ }
+
+ static void dropDB(@Nonnull final Class<?> clazz) throws Exception {
+ MongoConnection con = createConnection(checkNotNull(clazz));
+ try {
+ con.getDB().dropDatabase();
+ } finally {
+ con.close();
+ }
+ }
+
+ /**
+ * initialise the repository
+ *
+ * @param clazz the current class. Used for logging. Cannot be null.
+ * @param repos list to which add the created repository. Cannot be null.
+ * @param mks list to which add the created MK. Cannot be null.
+ * @param clusterId the cluster ID to use. Must be greater than 0.
+ * @param asyncDelay the async delay to set. For default use {@link #NOT_PROVIDED}
+ * @throws Exception
+ */
+ protected void initRepository(@Nonnull final Class<?> clazz,
+ @Nonnull final List<Repository> repos,
+ @Nonnull final List<DocumentMK> mks,
+ final int clusterId,
+ final int asyncDelay) throws Exception {
+ DocumentMK.Builder builder = new DocumentMK.Builder();
+ builder.setMongoDB(createConnection(checkNotNull(clazz)).getDB());
+ if (asyncDelay != NOT_PROVIDED) {
+ builder.setAsyncDelay(asyncDelay);
+ }
+ builder.setClusterId(clusterId);
+
+ DocumentMK mk = builder.open();
+ Jcr j = new Jcr(mk.getNodeStore());
+
+ Set<IndexEditorProvider> ieps = additionalIndexEditorProviders();
+ if (ieps != null) {
+ for (IndexEditorProvider p : ieps) {
+ j = j.with(p);
+ }
+ }
+
+ if (isAsyncIndexing()) {
+ j = j.withAsyncIndexing();
+ }
+
+ Repository repository = j.createRepository();
+
+ checkNotNull(repos).add(repository);
+ checkNotNull(mks).add(mk);
+ }
+
+ /**
+ * <p>
+ * the default {@link #initRepository(Class, List, List, int, int)} uses this for registering
+ * any additional {@link IndexEditorProvider}. Override and return all the provider you'd like
+ * to have running other than the OOTB one.
+ * </p>
+ *
+ * <p>
+ * the default implementation returns {@code null}
+ * </p>
+ * @return
+ */
+ protected Set<IndexEditorProvider> additionalIndexEditorProviders() {
+ return null;
+ }
+
+ /**
+ * override to change default behaviour. If {@code true} will enable the async indexing in the
+ * cluster. Default is {@code false}
+ *
+ * @return
+ */
+ protected boolean isAsyncIndexing() {
+ return false;
+ }
+}
Modified: jackrabbit/oak/trunk/oak-lucene/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/pom.xml?rev=1683700&r1=1683699&r2=1683700&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/pom.xml (original)
+++ jackrabbit/oak/trunk/oak-lucene/pom.xml Fri Jun 5 10:07:56 2015
@@ -222,11 +222,21 @@
<!-- Test Dependencies -->
<dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongo-java-driver</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.jackrabbit</groupId>
<artifactId>oak-core</artifactId>
<version>${project.version}</version>
@@ -245,6 +255,13 @@
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-commons</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.jackrabbit</groupId>
Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java?rev=1683700&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/jcr/AsyncConflictsIT.java Fri Jun 5 10:07:56 2015
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.jcr;
+
+import static org.apache.jackrabbit.JcrConstants.NT_UNSTRUCTURED;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.nodetype.NodeTypeConstants.NT_OAK_UNSTRUCTURED;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.annotation.Nonnull;
+import javax.jcr.Node;
+import javax.jcr.Repository;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+
+import org.apache.jackrabbit.oak.commons.FixturesHelper;
+import org.apache.jackrabbit.oak.commons.FixturesHelper.Fixture;
+import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
+import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import ch.qos.logback.core.Appender;
+import ch.qos.logback.core.filter.Filter;
+import ch.qos.logback.core.read.ListAppender;
+import ch.qos.logback.core.spi.FilterReply;
+
+import com.google.common.collect.ImmutableSet;
+
+public class AsyncConflictsIT extends DocumentClusterIT {
+ private static final Set<Fixture> FIXTURES = FixturesHelper.getFixtures();
+ private static final String INDEX_DEF_NODE = "asyncconflict";
+ private static final String INDEX_PROPERTY = "number";
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncConflictsIT.class);
+
+ @BeforeClass
+ public static void assumptions() {
+ assumeTrue(FIXTURES.contains(Fixture.DOCUMENT_NS));
+ assumeTrue(OakMongoNSRepositoryStub.isMongoDBAvailable());
+ }
+
+ @Test @Ignore("OAK-2961")
+ public void updates() throws Exception {
+ final Map<String, Exception> exceptions = Collections.synchronizedMap(new HashMap<String, Exception>());
+ final Random generator = new Random(3);
+ final ListAppender<ILoggingEvent> logAppender = subscribeAppender();
+
+ setUpCluster(this.getClass(), mks, repos, NOT_PROVIDED);
+ defineIndex(repos.get(0));
+
+ final int numberNodes = 10000;
+
+ LOG.info("adding {} nodes", numberNodes);
+ Session s = repos.get(0).login(ADMIN);
+ Node test = s.getRootNode().addNode("test");
+ test.setPrimaryType(NT_OAK_UNSTRUCTURED);
+
+ try {
+ for (int i = 0; i < numberNodes; i++) {
+ test.addNode("node" + i);
+ test.setProperty(INDEX_PROPERTY, generator.nextInt(numberNodes/3));
+ if (i % 1024 == 0) {
+ s.save();
+ }
+ }
+
+ s.save();
+ } catch (Exception e) {
+ exceptions.put(Thread.currentThread().getName(), e);
+ } finally {
+ s.logout();
+ }
+
+ LOG.info("Nodes added.");
+
+ // issuing re-index
+ LOG.info("issuing re-index and wait for finish");
+ s = repos.get(0).login(ADMIN);
+ try {
+ Node index = s.getNode("/oak:index/" + INDEX_DEF_NODE);
+ index.setProperty(REINDEX_PROPERTY_NAME, true);
+ s.save();
+ } catch (Exception e) {
+ exceptions.put(Thread.currentThread().getName(), e);
+ } finally {
+ s.logout();
+ }
+ while (!isReindexFinished()) {
+ Thread.sleep(5000);
+ }
+
+ raiseExceptions(exceptions, LOG);
+
+ // if following fails it means the Async index failed at least once.
+ assertTrue(
+ String.format("We should have not any '%s' in the logs", AsyncLogFilter.MESSAGE),
+ logAppender.list.isEmpty());
+
+ unsubscribe(logAppender);
+ }
+
+ private boolean isReindexFinished() throws RepositoryException {
+ Session s = repos.get(0).login(ADMIN);
+ try {
+ boolean reindex = s.getNode("/oak:index/" + INDEX_DEF_NODE)
+ .getProperty(REINDEX_PROPERTY_NAME).getBoolean();
+ return !reindex;
+ } finally {
+ s.logout();
+ }
+ }
+
+ private void defineIndex(@Nonnull final Repository repo) throws RepositoryException {
+ Session session = repo.login(ADMIN);
+ try {
+ Node n = session.getRootNode().getNode("oak:index");
+
+ n = n.addNode(INDEX_DEF_NODE);
+ n.setPrimaryType(IndexConstants.INDEX_DEFINITIONS_NODE_TYPE);
+ n.setProperty("compatVersion", 2);
+ n.setProperty(TYPE_PROPERTY_NAME, "lucene");
+ n.setProperty(ASYNC_PROPERTY_NAME, "async");
+ n = n.addNode("indexRules");
+ n.setPrimaryType(NT_UNSTRUCTURED);
+ n = n.addNode("nt:unstructured");
+ n = n.addNode("properties");
+ n.setPrimaryType(NT_UNSTRUCTURED);
+ n = n.addNode("number");
+ n.setPrimaryType(NT_UNSTRUCTURED);
+ n.setProperty("propertyIndex", true);
+ n.setProperty("name", INDEX_PROPERTY);
+
+ session.save();
+ } finally {
+ session.logout();
+ }
+ }
+
+ @Override
+ protected Set<IndexEditorProvider> additionalIndexEditorProviders() {
+ return ImmutableSet.of((IndexEditorProvider) new LuceneIndexEditorProvider());
+ }
+
+ @Override
+ protected boolean isAsyncIndexing() {
+ return true;
+ }
+
+ private ListAppender<ILoggingEvent> subscribeAppender() {
+ Filter<ILoggingEvent> filter = new AsyncLogFilter();
+ filter.start();
+ ListAppender<ILoggingEvent> appender = new ListAppender<ILoggingEvent>();
+ appender.setContext((LoggerContext) LoggerFactory.getILoggerFactory());
+ appender.setName("asynclogcollector");
+ appender.addFilter(filter);
+ appender.start();
+ ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
+ ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(appender);
+ return appender;
+
+ }
+
+ private void unsubscribe(@Nonnull final Appender<ILoggingEvent> appender) {
+ ((LoggerContext) LoggerFactory.getILoggerFactory()).getLogger(
+ ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).detachAppender(appender);
+ }
+
+ private static class AsyncLogFilter extends Filter<ILoggingEvent> {
+ public static final String MESSAGE = "Unresolved conflicts in /:async";
+
+ @Override
+ public FilterReply decide(ILoggingEvent event) {
+ final IThrowableProxy tp = event.getThrowableProxy();
+
+ if (event.getLevel().isGreaterOrEqual(Level.WARN) &&
+ tp != null &&
+ tp.getMessage().contains(MESSAGE)) {
+ return FilterReply.ACCEPT;
+ } else {
+ return FilterReply.DENY;
+ }
+ }
+
+ }
+}