You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/07/11 09:40:23 UTC

svn commit: r1801582 - in /jackrabbit/oak/trunk: oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/ oak-core/...

Author: chetanm
Date: Tue Jul 11 09:40:23 2017
New Revision: 1801582

URL: http://svn.apache.org/viewvc?rev=1801582&view=rev
Log:
OAK-6271 - Support for importing index files

MBean support
-- AbortingIndexerLock - Lock meant for SegmentNodeStore setups
   which works by aborting current run via IndexStatsMBean. This
   is done because for non clustered NodeStore AsyncIndexUpdate
   does not check for timeout

Added:
    jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java

Added: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java?rev=1801582&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java (added)
+++ jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java Tue Jul 11 09:40:23 2017
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.api.jmx;
+
+
+import java.io.IOException;
+
+import aQute.bnd.annotation.ProviderType;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+
+@ProviderType
+public interface IndexerMBean {
+    String TYPE = "Indexer";
+
+    boolean importIndex(
+            @Name("indexDirPath")
+            @Description("Path on server file system where index content generated by oak-run is present")
+                    String indexDirPath) throws IOException, CommitFailedException;
+}

Propchange: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexerMBean.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java?rev=1801582&r1=1801581&r2=1801582&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/package-info.java Tue Jul 11 09:40:23 2017
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-@Version("4.6.0")
+@Version("4.7.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.api.jmx;
 

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java?rev=1801582&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java Tue Jul 11 09:40:23 2017
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.jmx.IndexerMBean;
+import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.index.importer.AbortingIndexerLock;
+import org.apache.jackrabbit.oak.plugins.index.importer.AsyncIndexerLock;
+import org.apache.jackrabbit.oak.plugins.index.importer.ClusterNodeStoreLock;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporter;
+import org.apache.jackrabbit.oak.plugins.index.importer.IndexImporterProvider;
+import org.apache.jackrabbit.oak.spi.state.Clusterable;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.osgi.framework.BundleContext;
+
+import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
+
+@Component
+public class IndexerMBeanImpl extends AnnotatedStandardMBean implements IndexerMBean {
+    @Reference
+    private NodeStore nodeStore;
+
+    @Reference
+    private AsyncIndexInfoService asyncIndexInfoService;
+
+    private WhiteboardIndexEditorProvider editorProvider = new WhiteboardIndexEditorProvider();
+    private Registration mbeanReg;
+    private Tracker<IndexImporterProvider> providerTracker;
+
+    public IndexerMBeanImpl() {
+        super(IndexerMBean.class);
+    }
+
+    @Override
+    public boolean importIndex(String indexDirPath) throws IOException, CommitFailedException {
+        IndexImporter importer = new IndexImporter(nodeStore, new File(indexDirPath), editorProvider, createLock());
+        providerTracker.getServices().forEach(importer::addImporterProvider);
+        importer.importIndex();
+        return true;
+    }
+
+    private AsyncIndexerLock createLock() {
+        if (nodeStore instanceof Clusterable) {
+            return new ClusterNodeStoreLock(nodeStore);
+        }
+        return new AbortingIndexerLock(asyncIndexInfoService);
+    }
+
+
+    //~---------------------------------------< OSGi >
+
+    @Activate
+    private void activate(BundleContext context) {
+        Whiteboard wb = new OsgiWhiteboard(context);
+        editorProvider.start(wb);
+        mbeanReg = registerMBean(wb,
+                IndexerMBean.class,
+                this,
+                IndexerMBean.TYPE,
+                "Indexer operations related MBean");
+        providerTracker = wb.track(IndexImporterProvider.class);
+    }
+
+    @Deactivate
+    private void deactivate() {
+        if (mbeanReg != null) {
+            mbeanReg.unregister();
+        }
+        editorProvider.stop();
+        if (providerTracker != null) {
+            providerTracker.stop();
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/IndexerMBeanImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java?rev=1801582&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java Tue Jul 11 09:40:23 2017
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.importer;
+
+
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfo;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Lock implementation for single node setup like for SegmentNodeStore
+ * It works by check async indexer status via IndexStatsMBean and
+ * then aborting it if found to be running
+ */
+public class AbortingIndexerLock implements AsyncIndexerLock<SimpleToken> {
+    public static final int TIMEOUT_SECONDS = 300;
+    private final Logger log = LoggerFactory.getLogger(getClass());
+    private final AsyncIndexInfoService infoService;
+    private final Clock clock;
+
+    public AbortingIndexerLock(AsyncIndexInfoService infoService) {
+        this(infoService, Clock.SIMPLE);
+    }
+
+    public AbortingIndexerLock(AsyncIndexInfoService infoService, Clock clock) {
+        this.infoService = infoService;
+        this.clock = clock;
+    }
+
+    @Override
+    public SimpleToken lock(String asyncIndexerLane) {
+        IndexStatsMBean mbean = getIndexStatsMBean(asyncIndexerLane);
+
+        if (IndexStatsMBean.STATUS_RUNNING.equals(mbean.getStatus())){
+            log.info("Aborting current indexing run of async indexer for lane [{}]", asyncIndexerLane);
+        }
+
+        mbean.abortAndPause();
+        retry(mbean, TIMEOUT_SECONDS, 1000);
+        log.info("Aborted and paused async indexer for lane [{}]", asyncIndexerLane);
+        return new SimpleToken(asyncIndexerLane);
+    }
+
+    @Override
+    public void unlock(SimpleToken token) {
+        getIndexStatsMBean(token.laneName).resume();
+        log.info("Resumed async indexer for lane [{}]", token.laneName);
+    }
+
+    private IndexStatsMBean getIndexStatsMBean(String asyncIndexerLane) {
+        AsyncIndexInfo info = infoService.getInfo(asyncIndexerLane);
+        checkNotNull(info, "No AsyncIndexInfo found for lane [%s]", asyncIndexerLane);
+        IndexStatsMBean mbean = info.getStatsMBean();
+        return checkNotNull(mbean, "No IndexStatsMBean associated with [%s]", asyncIndexerLane);
+    }
+
+    private void retry(IndexStatsMBean mbean, int timeoutSeconds, int intervalBetweenTriesMsec) {
+        long timeout = clock.getTime() + timeoutSeconds * 1000L;
+        while (clock.getTime() < timeout) {
+            try {
+                if (!IndexStatsMBean.STATUS_RUNNING.equals(mbean.getStatus())) {
+                    return;
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+
+            try {
+                int delta = (int) (timeout -  clock.getTime() / 1000);
+                log.info("Async indexer for lane [{}] found to be running. Would wait for {} seconds " +
+                        "more for it to stop", mbean.getName(), delta);
+                Thread.sleep(intervalBetweenTriesMsec);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+        }
+
+        throw new IllegalStateException("RetryLoop failed, condition is false after " + timeoutSeconds + " seconds");
+    }
+}
+
+final class SimpleToken implements AsyncIndexerLock.LockToken {
+    final String laneName;
+
+    SimpleToken(String laneName) {
+        this.laneName = laneName;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLock.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java?rev=1801582&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java Tue Jul 11 09:40:23 2017
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.importer;
+
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfo;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class AbortingIndexerLockTest {
+
+    private AsyncIndexInfoService infoService;
+    private IndexStatsMBean statsMBean = mock(IndexStatsMBean.class);
+
+    @Before
+    public void setup(){
+        infoService = mock(AsyncIndexInfoService.class);
+
+        AsyncIndexInfo info = new AsyncIndexInfo("async", -1,-1, false, statsMBean);
+        when(infoService.getInfo("async")).thenReturn(info);
+    }
+
+    @Test
+    public void lockBasics() throws Exception{
+        AbortingIndexerLock lock = new AbortingIndexerLock(infoService);
+
+        when(statsMBean.getStatus()).thenReturn(IndexStatsMBean.STATUS_DONE);
+
+        SimpleToken lockToken = lock.lock("async");
+        assertNotNull(lockToken);
+        verify(statsMBean).abortAndPause();
+
+        lock.unlock(lockToken);
+        verify(statsMBean).resume();
+
+    }
+
+    @Test
+    public void lockWithRetry() throws Exception{
+        AbortingIndexerLock lock = new AbortingIndexerLock(infoService);
+
+        when(statsMBean.getStatus())
+                .thenReturn(IndexStatsMBean.STATUS_RUNNING)
+                .thenReturn(IndexStatsMBean.STATUS_RUNNING)
+                .thenReturn(IndexStatsMBean.STATUS_DONE);
+
+        SimpleToken lockToken = lock.lock("async");
+        assertNotNull(lockToken);
+        verify(statsMBean).abortAndPause();
+
+        verify(statsMBean, times(3)).getStatus();
+    }
+
+    @Test
+    public void lockTimedout() throws Exception{
+        Clock.Virtual clock = new Clock.Virtual();
+        AbortingIndexerLock lock = new AbortingIndexerLock(infoService, clock);
+
+        when(statsMBean.getStatus())
+                .thenReturn(IndexStatsMBean.STATUS_RUNNING)
+                .thenReturn(IndexStatsMBean.STATUS_RUNNING)
+                .then(invocation -> {
+                    clock.waitUntil(AbortingIndexerLock.TIMEOUT_SECONDS * 1000 + 1);
+                    return IndexStatsMBean.STATUS_RUNNING;
+                });
+
+        try {
+            lock.lock("async");
+            fail();
+        } catch (IllegalStateException ignore) {
+
+        }
+
+        verify(statsMBean).abortAndPause();
+
+        verify(statsMBean, times(3)).getStatus();
+    }
+
+}
\ No newline at end of file

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/importer/AbortingIndexerLockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native