You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by al...@apache.org on 2016/08/16 09:20:52 UTC

svn commit: r1756506 - in /jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index: AsyncIndexUpdateClusterTestIT.java AsyncIndexUpdateLeaseTest.java

Author: alexparvulescu
Date: Tue Aug 16 09:20:52 2016
New Revision: 1756506

URL: http://svn.apache.org/viewvc?rev=1756506&view=rev
Log:
OAK-4668 Make async index more resilient on documentmk
 - it tests

Added:
    jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java

Added: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java?rev=1756506&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java (added)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java Tue Aug 16 09:20:52 2016
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index;
+
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.PropertyState;
+import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class AsyncIndexUpdateClusterTestIT {
+
+    private DocumentNodeStore ns1;
+    private DocumentNodeStore ns2;
+
+    private MemoryDocumentStore ds;
+    private MemoryBlobStore bs;
+
+    private Random random = new Random();
+    private final List<String> values = ImmutableList.of("a", "b", "c", "d",
+            "e");
+
+    private Closer closer = Closer.create();
+    private final AtomicBoolean illegalReindex = new AtomicBoolean(false);
+
+    @Before
+    public void before() throws Exception {
+        ns1 = create(0);
+        ns2 = create(1);
+    }
+
+    @After
+    public void after() {
+        shutdown();
+        ns1.dispose();
+        ns2.dispose();
+        assertFalse("Reindexing should not happen", illegalReindex.get());
+    }
+
+    private void shutdown() {
+        try {
+            closer.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void missingCheckpointDueToEventualConsistency() throws Exception {
+        IndexStatusListener l = new IndexStatusListener();
+
+        AsyncIndexUpdate async1 = createAsync(ns1, l);
+        closer.register(async1);
+        AsyncIndexUpdate async2 = createAsync(ns2, l);
+        closer.register(async2);
+
+        // Phase 1 - Base setup - Index definition creation and
+        // performing initial indexing
+        // Create index definition on NS1
+        NodeBuilder b1 = ns1.getRoot().builder();
+        createIndexDefinition(b1);
+        ns1.merge(b1, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        // Trigger indexing on NS1
+        async1.run();
+        l.initDone();
+
+        ScheduledExecutorService executorService = Executors
+                .newScheduledThreadPool(5);
+        closer.register(new ExecutorCloser(executorService));
+
+        executorService.scheduleWithFixedDelay(async1, 1, 3, TimeUnit.SECONDS);
+        executorService.scheduleWithFixedDelay(async2, 1, 2, TimeUnit.SECONDS);
+        executorService.scheduleWithFixedDelay(
+                new PropertyMutator(ns1, "node1"), 500, 500,
+                TimeUnit.MILLISECONDS);
+        executorService.scheduleWithFixedDelay(
+                new PropertyMutator(ns2, "node2"), 500, 500,
+                TimeUnit.MILLISECONDS);
+
+        for (int i = 0; i < 4 && !illegalReindex.get(); i++) {
+            TimeUnit.SECONDS.sleep(5);
+        }
+        shutdown();
+    }
+
+    private static AsyncIndexUpdate createAsync(DocumentNodeStore ns,
+            final IndexStatusListener l) {
+        IndexEditorProvider p = new TestEditorProvider(
+                new PropertyIndexEditorProvider(), l);
+        AsyncIndexUpdate aiu = new AsyncIndexUpdate("async", ns, p) {
+            protected boolean updateIndex(NodeState before,
+                    String beforeCheckpoint, NodeState after,
+                    String afterCheckpoint, String afterTime,
+                    AsyncUpdateCallback callback) throws CommitFailedException {
+                if (MISSING_NODE == before) {
+                    l.reindexing();
+                }
+                return super.updateIndex(before, beforeCheckpoint, after,
+                        afterCheckpoint, afterTime, callback);
+            }
+        };
+        aiu.setCloseTimeOut(1);
+        return aiu;
+    }
+
+    private static void createIndexDefinition(NodeBuilder builder) {
+        IndexUtils.createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "rootIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+    }
+
+    private DocumentNodeStore create(int clusterId) {
+        DocumentMK.Builder builder = new DocumentMK.Builder();
+        if (ds == null) {
+            ds = new MemoryDocumentStore();
+        }
+        if (bs == null) {
+            bs = new MemoryBlobStore();
+        }
+        builder.setDocumentStore(ds).setBlobStore(bs);
+
+        DocumentNodeStore store = builder.setClusterId(++clusterId)
+                .setLeaseCheck(false).open().getNodeStore();
+        return store;
+    }
+
+    private class PropertyMutator implements Runnable {
+        private final NodeStore nodeStore;
+        private final String nodeName;
+
+        public PropertyMutator(NodeStore nodeStore, String nodeName) {
+            this.nodeStore = nodeStore;
+            this.nodeName = nodeName;
+        }
+
+        @Override
+        public void run() {
+            NodeBuilder b = nodeStore.getRoot().builder();
+            b.child(nodeName).setProperty("foo",
+                    values.get(random.nextInt(values.size())));
+            try {
+                nodeStore.merge(b, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+            } catch (CommitFailedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private class IndexStatusListener {
+
+        boolean reindexOk = true;
+
+        public void reindexing() {
+            if (!reindexOk) {
+                illegalReindex.set(true);
+                shutdown();
+            }
+        }
+
+        public void initDone() {
+            reindexOk = false;
+        }
+
+        public void waitRandomly() {
+            try {
+                TimeUnit.SECONDS.sleep(random.nextInt(1));
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    private static class TestEditorProvider implements IndexEditorProvider {
+        private final IndexEditorProvider delegate;
+        private final IndexStatusListener listener;
+
+        private TestEditorProvider(IndexEditorProvider delegate,
+                IndexStatusListener listener) {
+            this.delegate = delegate;
+            this.listener = listener;
+        }
+
+        @Override
+        public Editor getIndexEditor(@Nonnull String type,
+                @Nonnull NodeBuilder definition, @Nonnull NodeState root,
+                @Nonnull IndexUpdateCallback callback)
+                throws CommitFailedException {
+            Editor e = delegate
+                    .getIndexEditor(type, definition, root, callback);
+            if (e != null) {
+                e = new TestEditor(e, listener);
+            }
+            return e;
+        }
+    }
+
+    private static class TestEditor implements Editor {
+        private final Editor editor;
+        private final TestEditor parent;
+        private final IndexStatusListener listener;
+
+        TestEditor(Editor editor, IndexStatusListener listener) {
+            this(editor, listener, null);
+        }
+
+        TestEditor(Editor editor, IndexStatusListener listener,
+                TestEditor parent) {
+            this.editor = editor;
+            this.listener = listener;
+            this.parent = parent;
+        }
+
+        @Override
+        public void enter(NodeState before, NodeState after)
+                throws CommitFailedException {
+            if (MISSING_NODE == before && parent == null) {
+                listener.reindexing();
+            }
+            editor.enter(before, after);
+        }
+
+        @Override
+        public void leave(NodeState before, NodeState after)
+                throws CommitFailedException {
+            listener.waitRandomly();
+            editor.leave(before, after);
+        }
+
+        @Override
+        public void propertyAdded(PropertyState after)
+                throws CommitFailedException {
+            editor.propertyAdded(after);
+        }
+
+        @Override
+        public void propertyChanged(PropertyState before, PropertyState after)
+                throws CommitFailedException {
+            editor.propertyChanged(before, after);
+        }
+
+        @Override
+        public void propertyDeleted(PropertyState before)
+                throws CommitFailedException {
+            editor.propertyDeleted(before);
+        }
+
+        @Override
+        public Editor childNodeAdded(String name, NodeState after)
+                throws CommitFailedException {
+            return createChildEditor(editor.childNodeAdded(name, after), name);
+        }
+
+        @Override
+        public Editor childNodeChanged(String name, NodeState before,
+                NodeState after) throws CommitFailedException {
+            return createChildEditor(
+                    editor.childNodeChanged(name, before, after), name);
+        }
+
+        @Override
+        public Editor childNodeDeleted(String name, NodeState before)
+                throws CommitFailedException {
+            return createChildEditor(editor.childNodeDeleted(name, before),
+                    name);
+        }
+
+        private TestEditor createChildEditor(Editor editor, String name) {
+            if (editor == null) {
+                return null;
+            } else {
+                return new TestEditor(editor, listener, this);
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateClusterTestIT.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java?rev=1756506&r1=1756505&r2=1756506&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java (original)
+++ jackrabbit/oak/trunk/oak-it/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateLeaseTest.java Tue Aug 16 09:20:52 2016
@@ -400,10 +400,10 @@ public class AsyncIndexUpdateLeaseTest e
         @Override
         protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
                                                              String name, long leaseTimeOut, String checkpoint,
-                                                             String afterCheckpoint, AsyncIndexStats indexStats,
+                                                             AsyncIndexStats indexStats,
                                                              AtomicBoolean stopFlag) {
             return new SpecialAsyncUpdateCallback(store, name, leaseTimeOut,
-                    checkpoint, afterCheckpoint, indexStats, stopFlag, listener);
+                    checkpoint, indexStats, stopFlag, listener);
         }
     }
 
@@ -412,16 +412,16 @@ public class AsyncIndexUpdateLeaseTest e
         private IndexStatusListener listener;
 
         public SpecialAsyncUpdateCallback(NodeStore store, String name,
-                                          long leaseTimeOut, String checkpoint, String afterCheckpoint,
+                                          long leaseTimeOut, String checkpoint,
                                           AsyncIndexStats indexStats, AtomicBoolean stopFlag, IndexStatusListener listener) {
-            super(store, name, leaseTimeOut, checkpoint, afterCheckpoint, indexStats, stopFlag);
+            super(store, name, leaseTimeOut, checkpoint, indexStats, stopFlag);
             this.listener = listener;
         }
 
         @Override
-        protected void prepare() throws CommitFailedException {
+        protected void prepare(String afterCheckpoint) throws CommitFailedException {
             listener.prePrepare();
-            super.prepare();
+            super.prepare(afterCheckpoint);
             listener.postPrepare();
         }