You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/07/11 05:16:26 UTC

svn commit: r1801561 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java

Author: chetanm
Date: Tue Jul 11 05:16:26 2017
New Revision: 1801561

URL: http://svn.apache.org/viewvc?rev=1801561&view=rev
Log:
OAK-6271 - Support for importing index files

Ensure the lock on async indexer lane is released in case of any exception
after taking the lock

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java?rev=1801561&r1=1801560&r2=1801561&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporter.java Tue Jul 11 05:16:26 2017
@@ -139,37 +139,49 @@ public class IndexImporter {
 
     private void bringAsyncIndexUpToDate(String laneName, List<IndexInfo> indexInfos) throws CommitFailedException {
         LockToken lockToken = interruptCurrentIndexing(laneName);
-
-        String checkpoint = getAsync().getString(laneName);
-        checkNotNull(checkpoint, "No current checkpoint found for lane [%s]", laneName);
-
-        //TODO Support case where checkpoint got lost or complete reindexing is done
-
-        NodeState after = nodeStore.retrieve(checkpoint);
-        checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]",checkpoint, laneName);
-        NodeState before = indexedState;
-
-        NodeBuilder builder = nodeStore.getRoot().builder();
-
-        IndexUpdate indexUpdate = new IndexUpdate(
-                indexEditorProvider,
-                AsyncLaneSwitcher.getTempLaneName(laneName),
-                nodeStore.getRoot(),
-                builder,
-                IndexUpdateCallback.NOOP
-        );
-
-        CommitFailedException exception =
-                EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
-
-        if (exception != null) {
-            throw exception;
+        boolean success = false;
+        try {
+            String checkpoint = getAsync().getString(laneName);
+            checkNotNull(checkpoint, "No current checkpoint found for lane [%s]", laneName);
+
+            //TODO Support case where checkpoint got lost or complete reindexing is done
+
+            NodeState after = nodeStore.retrieve(checkpoint);
+            checkNotNull(after, "No state found for checkpoint [%s] for lane [%s]",checkpoint, laneName);
+            NodeState before = indexedState;
+
+            NodeBuilder builder = nodeStore.getRoot().builder();
+
+            IndexUpdate indexUpdate = new IndexUpdate(
+                    indexEditorProvider,
+                    AsyncLaneSwitcher.getTempLaneName(laneName),
+                    nodeStore.getRoot(),
+                    builder,
+                    IndexUpdateCallback.NOOP
+            );
+
+            CommitFailedException exception =
+                    EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
+
+            if (exception != null) {
+                throw exception;
+            }
+
+            revertLaneChange(builder, indexInfos);
+
+            mergeWithConcurrentCheck(nodeStore, builder);
+            success = true;
+        } finally {
+            try {
+                resumeCurrentIndexing(lockToken);
+            } catch (CommitFailedException | RuntimeException e) {
+                log.warn("Error occurred while releasing indexer lock", e);
+                if (success) {
+                    throw e;
+                }
+            }
         }
 
-        revertLaneChange(builder, indexInfos);
-
-        mergeWithConcurrentCheck(nodeStore, builder);
-        resumeCurrentIndexing(lockToken);
         log.info("Import done for indexes {}", indexInfos);
     }
 

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java?rev=1801561&r1=1801560&r2=1801561&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/IndexImporterTest.java Tue Jul 11 05:16:26 2017
@@ -24,12 +24,17 @@ import java.io.IOException;
 import java.util.Properties;
 import java.util.Set;
 
+import javax.annotation.Nonnull;
+
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexUpdate;
 import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexLookup;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
@@ -40,6 +45,7 @@ import org.apache.jackrabbit.oak.query.a
 import org.apache.jackrabbit.oak.query.ast.SelectorImpl;
 import org.apache.jackrabbit.oak.query.index.FilterImpl;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Editor;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.query.Filter;
 import org.apache.jackrabbit.oak.spi.query.QueryEngineSettings;
@@ -52,11 +58,13 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.ImmutableSet.of;
+import static java.util.Arrays.asList;
 import static org.apache.jackrabbit.JcrConstants.NT_BASE;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.INDEX_DEFINITIONS_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.IndexUtils.createIndexDefinition;
 import static org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock.NOOP_LOCK;
+import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
@@ -207,6 +215,104 @@ public class IndexImporterTest {
         assertNull(store.retrieve(checkpoint));
     }
 
+    @Test
+    public void laneName() throws Exception{
+        NodeBuilder builder = EMPTY_NODE.builder();
+        builder.setProperty(IndexConstants.ASYNC_PROPERTY_NAME, "async");
+        assertEquals("async", IndexImporter.getAsyncLaneName("foo", builder.getNodeState()));
+
+        builder = EMPTY_NODE.builder();
+        builder.setProperty(IndexConstants.ASYNC_PROPERTY_NAME, asList("async", "nrt"), Type.STRINGS);
+        assertEquals("async", IndexImporter.getAsyncLaneName("foo", builder.getNodeState()));
+
+        builder = EMPTY_NODE.builder();
+        builder.setProperty(IndexConstants.ASYNC_PROPERTY_NAME, asList("async", "nrt"), Type.STRINGS);
+        AsyncLaneSwitcher.switchLane(builder, "tmp-async");
+        assertEquals("async", IndexImporter.getAsyncLaneName("foo", builder.getNodeState()));
+
+        builder = EMPTY_NODE.builder();
+        builder.setProperty(IndexConstants.ASYNC_PROPERTY_NAME, asList("async", "nrt"), Type.STRINGS);
+        AsyncLaneSwitcher.switchLane(builder, "tmp-async");
+        assertEquals("async", IndexImporter.getAsyncLaneName("foo", builder.getNodeState()));
+    }
+
+    @Test
+    public void laneUnlockedInCaseOfFailure() throws Exception{
+        NodeBuilder builder = store.getRoot().builder();
+        createIndexDefinition(builder.child(INDEX_DEFINITIONS_NAME),
+                "fooIndex", true, false, ImmutableSet.of("foo"), null)
+                .setProperty(ASYNC_PROPERTY_NAME, "async");
+        builder.child("a").setProperty("foo", "abc");
+        builder.child("b").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        new AsyncIndexUpdate("async", store, provider).run();
+
+        String checkpoint = createIndexDirs("/oak:index/fooIndex");
+
+        builder = store.getRoot().builder();
+        builder.child("c").setProperty("foo", "abc");
+        builder.child("d").setProperty("foo", "abc");
+        store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        new AsyncIndexUpdate("async", store, provider).run();
+
+        IndexImporterProvider importerProvider = new IndexImporterProvider() {
+            @Override
+            public void importIndex(NodeState root, NodeBuilder defn, File indexDir) {
+
+            }
+
+            @Override
+            public String getType() {
+                return "property";
+            }
+        };
+
+        final String exceptionMessage = "TEST MESSAGE";
+        ClusterNodeStoreLock lock = new ClusterNodeStoreLock(store);
+        provider = new PropertyIndexEditorProvider() {
+            @Override
+            public Editor getIndexEditor(@Nonnull String type, @Nonnull NodeBuilder definition,
+                                         @Nonnull NodeState root, @Nonnull IndexUpdateCallback callback) {
+                throw new RuntimeException(exceptionMessage);
+            }
+        };
+
+        IndexImporter importer = new IndexImporter(store, temporaryFolder.getRoot(), provider, lock);
+        importer.addImporterProvider(importerProvider);
+
+        try {
+            importer.importIndex();
+            fail();
+        } catch (RuntimeException ignore) {
+
+        }
+
+        assertFalse(lock.isLocked("async"));
+
+        AsyncIndexerLock lock2 = new AsyncIndexerLock() {
+            @Override
+            public LockToken lock(String asyncIndexerLane) throws CommitFailedException {
+                return mock(LockToken.class);
+            }
+
+            @Override
+            public void unlock(LockToken token) throws CommitFailedException {
+                throw new IllegalStateException("Exception in unlock");
+            }
+        };
+        IndexImporter importer2 = new IndexImporter(store, temporaryFolder.getRoot(), provider, lock2);
+        importer2.addImporterProvider(importerProvider);
+
+        try {
+            importer2.importIndex();
+            fail();
+        } catch (RuntimeException ignore) {
+            assertEquals(exceptionMessage, ignore.getMessage());
+        }
+    }
+
     private static FilterImpl createFilter(NodeState root, String nodeTypeName) {
         NodeTypeInfoProvider nodeTypes = new NodeStateNodeTypeInfoProvider(root);
         NodeTypeInfo type = nodeTypes.getNodeTypeInfo(nodeTypeName);