You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2017/01/11 05:21:13 UTC

svn commit: r1778241 - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ test/ja...

Author: chetanm
Date: Wed Jan 11 05:21:13 2017
New Revision: 1778241

URL: http://svn.apache.org/viewvc?rev=1778241&view=rev
Log:
OAK-4808 - Index external changes as part of NRT indexing

Provide a builder for ExternalIndexObserver which wraps it in a BackgroundObserver and also provides some filtering support i.e. only those changes are enqueue in BackgroundObserver which have non empty index paths

Also register the observers and journal property service with OSGi in LuceneIndexProviderService

Added:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserver.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexedPaths.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserverTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1778241&r1=1778240&r2=1778241&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Wed Jan 11 05:21:13 2017
@@ -50,11 +50,14 @@ import org.apache.jackrabbit.oak.api.jmx
 import org.apache.jackrabbit.oak.cache.CacheStats;
 import org.apache.jackrabbit.oak.commons.PropertiesUtil;
 import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.document.spi.JournalPropertyService;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.aggregate.NodeAggregator;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.DocumentQueue;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.ExternalObserverBuilder;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LocalIndexObserver;
+import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.LuceneJournalPropertyService;
 import org.apache.jackrabbit.oak.plugins.index.lucene.hybrid.NRTIndexFactory;
 import org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
@@ -229,6 +232,8 @@ public class LuceneIndexProviderService
 
     private BackgroundObserver backgroundObserver;
 
+    private BackgroundObserver externalIndexObserver;
+
     @Reference
     ScorerProviderFactory scorerFactory;
 
@@ -321,6 +326,10 @@ public class LuceneIndexProviderService
             backgroundObserver.close();
         }
 
+        if (externalIndexObserver != null){
+            externalIndexObserver.close();
+        }
+
         if (indexProvider != null) {
             indexProvider.close();
             indexProvider = null;
@@ -510,6 +519,20 @@ public class LuceneIndexProviderService
         documentQueue = new DocumentQueue(queueSize, tracker, getExecutorService(), statisticsProvider);
         LocalIndexObserver localIndexObserver = new LocalIndexObserver(documentQueue, statisticsProvider);
         regs.add(bundleContext.registerService(Observer.class.getName(), localIndexObserver, null));
+
+        regs.add(bundleContext.registerService(JournalPropertyService.class.getName(),
+                new LuceneJournalPropertyService(), null));
+        ExternalObserverBuilder builder = new ExternalObserverBuilder(documentQueue, tracker, statisticsProvider,
+                getExecutorService(), 1000);
+
+        Observer observer = builder.build();
+        externalIndexObserver = builder.getBackgroundObserver();
+        regs.add(bundleContext.registerService(Observer.class.getName(), observer, null));
+        oakRegs.add(registerMBean(whiteboard,
+                BackgroundObserverMBean.class,
+                externalIndexObserver.getMBean(),
+                BackgroundObserverMBean.TYPE,
+                "LuceneExternalIndexObserver queue stats"));
         log.info("Hybrid indexing enabled for configured indexes with queue size of {}", queueSize);
     }
 

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserver.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserver.java?rev=1778241&r1=1778240&r2=1778241&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserver.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserver.java Wed Jan 11 05:21:13 2017
@@ -19,7 +19,6 @@
 
 package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
 
-import java.io.IOException;
 import java.util.Set;
 
 import javax.annotation.Nonnull;
@@ -60,29 +59,41 @@ public class ExternalIndexObserver imple
 
     @Override
     public boolean excludes(@Nonnull NodeState root, @Nonnull CommitInfo info) {
-        return !info.isExternal();
-    }
-
-    @Override
-    public void contentChanged(@Nonnull NodeState after, @Nonnull CommitInfo info) {
         //Only interested in external changes
         if (!info.isExternal()) {
-            return;
+            return true;
         }
 
         CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
         //Commit done internally i.e. one not using Root/Tree API
         if (commitContext == null) {
-            return;
+            return true;
         }
 
         IndexedPaths indexedPaths = (IndexedPaths) commitContext.get(LuceneDocumentHolder.NAME);
         //Nothing to be indexed
         if (indexedPaths == null) {
             log.debug("IndexPaths not found. Journal support missing");
+            return true;
+        }
+
+        if (indexedPaths.isEmpty()){
+            return true;
+        }
+
+        return false;
+    }
+
+    @Override
+    public void contentChanged(@Nonnull NodeState after, @Nonnull CommitInfo info) {
+        //Only interested in external changes
+        if (excludes(after, info)) {
             return;
         }
 
+        CommitContext commitContext = (CommitContext) info.getInfo().get(CommitContext.NAME);
+        IndexedPaths indexedPaths = (IndexedPaths) commitContext.get(LuceneDocumentHolder.NAME);
+
         commitContext.remove(LuceneDocumentHolder.NAME);
 
         log.trace("Received indexed paths {}", indexedPaths);
@@ -131,7 +142,7 @@ public class ExternalIndexObserver imple
                             droppedCount++;
                         }
                     }
-                } catch (IOException e) {
+                } catch (Exception e) {
                     log.warn("Ignoring making LuceneDocument for path {} for index {} due to exception", path, indexPath, e);
                 }
             }

Added: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java?rev=1778241&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java Wed Jan 11 05:21:13 2017
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene.hybrid;
+
+import java.util.concurrent.Executor;
+
+import javax.annotation.Nonnull;
+
+import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import org.apache.jackrabbit.oak.plugins.observation.Filter;
+import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ExternalObserverBuilder {
+    private static final Logger log = LoggerFactory.getLogger(ExternalIndexObserver.class);
+    private final IndexingQueue indexingQueue;
+    private final IndexTracker indexTracker;
+    private final StatisticsProvider statisticsProvider;
+    private final Executor executor;
+    private final int queueSize;
+    private BackgroundObserver backgroundObserver;
+    private FilteringObserver filteringObserver;
+
+    public ExternalObserverBuilder(IndexingQueue indexingQueue, IndexTracker indexTracker,
+                                   StatisticsProvider statisticsProvider,
+                                   Executor executor, int queueSize) {
+        this.indexingQueue = checkNotNull(indexingQueue);
+        this.indexTracker = checkNotNull(indexTracker);
+        this.statisticsProvider = checkNotNull(statisticsProvider);
+        this.executor = checkNotNull(executor);
+        this.queueSize = queueSize;
+    }
+
+    public Observer build() {
+        if (filteringObserver != null) {
+            return filteringObserver;
+        }
+        ExternalIndexObserver externalObserver = new ExternalIndexObserver(indexingQueue, indexTracker, statisticsProvider);
+        backgroundObserver = new WarningObserver(externalObserver, executor, queueSize);
+        filteringObserver = new FilteringObserver(backgroundObserver, externalObserver);
+        return filteringObserver;
+    }
+
+    public BackgroundObserver getBackgroundObserver() {
+        return backgroundObserver;
+    }
+
+    private static class WarningObserver extends BackgroundObserver {
+        private final int queueLength;
+
+        public WarningObserver(@Nonnull Observer observer, @Nonnull Executor executor, int queueLength) {
+            super(observer, executor, queueLength);
+            this.queueLength = queueLength;
+        }
+
+        @Override
+        protected void added(int queueSize) {
+            //TODO Have a variant of BackgroundObserver which drops elements from the tail
+            //as for indexing case its fine to drop older stuff
+            if (queueSize >= queueLength) {
+                log.warn("External observer queue is full");
+            }
+        }
+    }
+
+    private static class FilteringObserver implements Observer {
+        private final Observer delegate;
+        private final Filter filter;
+
+        private FilteringObserver(Observer delegate, Filter filter) {
+            this.delegate = delegate;
+            this.filter = filter;
+        }
+
+        @Override
+        public void contentChanged(@Nonnull NodeState root, @Nonnull CommitInfo info) {
+            //TODO Optimize for the case where new async index update is detected. Then
+            //existing items in queue should not be processed
+
+            //We need to only pass on included changes. Not using FilteringAwareObserver
+            //As here the filtering logic only relies on CommitContext and not concerned
+            //with before state
+            if (!filter.excludes(root, info)) {
+                delegate.contentChanged(root, info);
+            }
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalObserverBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexedPaths.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexedPaths.java?rev=1778241&r1=1778240&r2=1778241&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexedPaths.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/IndexedPaths.java Wed Jan 11 05:21:13 2017
@@ -63,6 +63,10 @@ class IndexedPaths implements JournalPro
         return indexedPaths.toString();
     }
 
+    public boolean isEmpty(){
+        return indexedPaths.isEmpty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java?rev=1778241&r1=1778240&r2=1778241&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java Wed Jan 11 05:21:13 2017
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.oak.api.jmx
 import org.apache.jackrabbit.oak.plugins.blob.datastore.CachingFileDataStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreBlobStore;
 import org.apache.jackrabbit.oak.plugins.blob.datastore.DataStoreUtils;
+import org.apache.jackrabbit.oak.plugins.document.spi.JournalPropertyService;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.ExtractedText;
 import org.apache.jackrabbit.oak.plugins.index.fulltext.PreExtractedTextProvider;
@@ -116,6 +117,8 @@ public class LuceneIndexProviderServiceT
 
         assertNotNull(FieldUtils.readDeclaredField(service, "documentQueue", true));
 
+        assertNotNull(context.getService(JournalPropertyService.class));
+
         MockOsgi.deactivate(service);
     }
 

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserverTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserverTest.java?rev=1778241&r1=1778240&r2=1778241&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserverTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ExternalIndexObserverTest.java Wed Jan 11 05:21:13 2017
@@ -22,12 +22,14 @@ package org.apache.jackrabbit.oak.plugin
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.jackrabbit.oak.core.SimpleCommitContext;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexDefinition;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
 import org.apache.jackrabbit.oak.plugins.index.lucene.util.IndexDefinitionBuilder;
 import org.apache.jackrabbit.oak.spi.commit.CommitContext;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.Observer;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.junit.Before;
 import org.junit.Rule;
@@ -147,6 +149,10 @@ public class ExternalIndexObserverTest {
 
     @Test
     public void docAddedToQueue() throws Exception {
+        assertIndexing(observer);
+    }
+
+    private void assertIndexing(Observer observer){
         Multimap<String, String> indexedPaths = HashMultimap.create();
         indexedPaths.put("/a", "/oak:index/foo");
 
@@ -166,6 +172,22 @@ public class ExternalIndexObserverTest {
         assertEquals("/oak:index/foo", doc.getValue().getIndexPath());
     }
 
+    @Test
+    public void builder() throws Exception{
+        ExternalObserverBuilder builder =
+                new ExternalObserverBuilder(queue, tracker,NOOP, MoreExecutors.sameThreadExecutor(), 10);
+        Observer o = builder.build();
+        o.contentChanged(INITIAL_CONTENT, CommitInfo.EMPTY_EXTERNAL);
+        verifyZeroInteractions(queue);
+    }
+
+    @Test
+    public void builder_NonFiltered() throws Exception{
+        ExternalObserverBuilder builder =
+                new ExternalObserverBuilder(queue, tracker,NOOP, MoreExecutors.sameThreadExecutor(), 10);
+        assertIndexing(builder.build());
+    }
+
     private CommitInfo newCommitInfo() {
         return new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN,
                 ImmutableMap.<String, Object>of(CommitContext.NAME, commitContext), true);