You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ca...@apache.org on 2017/12/17 23:12:10 UTC

svn commit: r1818521 - in /jackrabbit/oak/trunk: oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/ oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/ oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/ oak-lucene/src/mai...

Author: catholicon
Date: Sun Dec 17 23:12:10 2017
New Revision: 1818521

URL: http://svn.apache.org/viewvc?rev=1818521&view=rev
Log:
OAK-6950: Active deletion can delete blobs from a shared store when a clone setup is created

Approach being used: expose a jmx method which would:
1. set an in-memory flag to indicate that active deletion might be unsafe of index files at head state of the repository.
2. OakDirectory would respect this flag and add a property in all new index files created after the flag is set to indicate that these files must not participate in active deletion.
3. Wait for current indexing cycles to wrap (to avoid any new index files that might have got created in current cycle before the in-mem flag was set)
4. Set the same proerty on index files in current repository state to indicate that these files must not participate in active deletion.

Added:
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBean.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImpl.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectoryTestBase.java

Modified: jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-api/src/main/java/org/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java Sun Dec 17 23:12:10 2017
@@ -143,6 +143,13 @@ public interface IndexStatsMBean {
     String getTemporaryCheckpoints();
 
     /**
+     * Returns the number of executions.
+     *
+     * @return the execution count
+     */
+    long getTotalExecutionCount();
+
+    /**
      * Returns the number of executions as a {@code org.apache.jackrabbit.api.stats.TimeSeries}.
      *
      * @return the execution count time series

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Sun Dec 17 23:12:10 2017
@@ -1150,6 +1150,11 @@ public class AsyncIndexUpdate implements
         }
 
         @Override
+        public long getTotalExecutionCount() {
+            return execStats.getExecutionCounter().getCount();
+        }
+
+        @Override
         public CompositeData getExecutionCount() {
             return execStats.getExecutionCount();
         }

Modified: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdateTest.java Sun Dec 17 23:12:10 2017
@@ -1133,6 +1133,23 @@ public class AsyncIndexUpdateTest {
         assertEquals(0, lastExecutionStats(async.getIndexStats().getExecutionCount()));
     }
 
+    @Test
+    public void executionCountUpdatesOnRunWithoutAnyChangeInRepo() throws Exception {
+        AsyncIndexUpdate async = new AsyncIndexUpdate("async",
+                new MemoryNodeStore(),
+                new PropertyIndexEditorProvider(),
+                statsProvider, false);
+
+        long execCnt1 = async.getIndexStats().getTotalExecutionCount();
+        runOneCycle(async);
+        long execCnt2 = async.getIndexStats().getTotalExecutionCount();
+        runOneCycle(async);
+        long execCnt3 = async.getIndexStats().getTotalExecutionCount();
+
+        assertNotEquals("execCnt1 " + execCnt1 + " and execCnt2 " + execCnt2 + " are same", execCnt1, execCnt2);
+        assertNotEquals("execCnt2 " + execCnt2 + " and execCnt3 " + execCnt3 + " are same", execCnt2, execCnt3);
+    }
+
 
     private static long lastExecutionStats(CompositeData cd){
         //Last stat is the last entry in the array

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBean.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBean.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBean.java Sun Dec 17 23:12:10 2017
@@ -21,6 +21,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import javax.annotation.Nonnull;
 import javax.management.openmbean.CompositeData;
+import java.util.Map;
 
 /**
  * MBean for starting and monitoring the progress of
@@ -56,4 +57,25 @@ public interface ActiveDeletedBlobCollec
      */
     @Nonnull
     CompositeData getActiveCollectionStatus();
+
+    /**
+     * @return true: if recording deleted blob for active deletion is unsafe; false: otherwise
+     */
+    @Nonnull
+    boolean isActiveDeletionUnsafe();
+
+    /**
+     * Flag current blobs (head state) referred by all indexes so that they won't
+     * be marked to be collected by active deletion later. It would also set an
+     * in-memory flag so that new blobs also are flagged to be not marked for deletion
+     * by active deletion
+     */
+    @Nonnull
+    void flagActiveDeletionUnsafeForCurrentState();
+
+    /**
+     * Resets the in-memory flag so that new blobs are not flagged anymore and hence
+     * would get marked for active deletion when active deletion is active.
+     */
+    void flagActiveDeletionSafe();
 }

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImpl.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImpl.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImpl.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImpl.java Sun Dec 17 23:12:10 2017
@@ -19,10 +19,21 @@
 
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
 import org.apache.jackrabbit.oak.commons.jmx.ManagementOperation;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
+import org.apache.jackrabbit.oak.plugins.index.IndexPathService;
 import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollector;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
 import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
 import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
 import org.apache.jackrabbit.oak.stats.Clock;
@@ -32,20 +43,30 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.management.openmbean.CompositeData;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+import static org.apache.jackrabbit.oak.api.Type.STRING;
+import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
 import static org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.Status.failed;
 import static org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.Status.initiated;
 import static org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.done;
 import static org.apache.jackrabbit.oak.commons.jmx.ManagementOperation.newManagementOperation;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.PROP_UNSAFE_FOR_ACTIVE_DELETION;
 
 public class ActiveDeletedBlobCollectorMBeanImpl implements ActiveDeletedBlobCollectorMBean {
     private static final Logger LOG = LoggerFactory.getLogger(ActiveDeletedBlobCollectorMBeanImpl.class);
 
-    public static final String OP_NAME = "Active lucene index blobs collection";
+    private static final String OP_NAME = "Active lucene index blobs collection";
 
     /**
      * Actively deleted blob must be deleted for at least this long (in seconds)
@@ -53,7 +74,7 @@ public class ActiveDeletedBlobCollectorM
     private final long MIN_BLOB_AGE_TO_ACTIVELY_DELETE = Long.getLong("oak.active.deletion.minAge",
             TimeUnit.HOURS.toSeconds(24));
 
-    private final Clock clock = Clock.SIMPLE;
+    Clock clock = Clock.SIMPLE; // package private for tests
 
     @Nonnull
     private final ActiveDeletedBlobCollector activeDeletedBlobCollector;
@@ -67,20 +88,40 @@ public class ActiveDeletedBlobCollectorM
     @Nonnull
     private final Executor executor;
 
+    private final NodeStore store;
+
+    private final IndexPathService indexPathService;
+
+    private final AsyncIndexInfoService asyncIndexInfoService;
 
     private ManagementOperation<Void> gcOp = done(OP_NAME, null);
 
     /**
      * @param activeDeletedBlobCollector    deleted index blobs collector
+     * @param whiteboard                    An instance of {@link Whiteboard}. It will be
+     *                                      used to get checkpoing manager mbean.
+     * @param store                         {@link NodeStore} instance to access repository state
+     * @param indexPathService              {@link IndexPathService} instance to collect indexes available in
+     *                                                              the repository
+     * @param asyncIndexInfoService         {@link AsyncIndexInfoService} instance to acess state of async
+     *                                                                   indexer lanes
+     * @param blobStore                     An instance of {@link GarbageCollectableBlobStore}. It will be
+     *                                      used to purge blobs which have been deleted from lucene indexes.
      * @param executor                      executor for running the collection task
      */
-    public ActiveDeletedBlobCollectorMBeanImpl(
+    ActiveDeletedBlobCollectorMBeanImpl(
             @Nonnull ActiveDeletedBlobCollector activeDeletedBlobCollector,
             @Nonnull Whiteboard whiteboard,
+            @Nonnull NodeStore store,
+            @Nonnull IndexPathService indexPathService,
+            @Nonnull AsyncIndexInfoService asyncIndexInfoService,
             @Nonnull GarbageCollectableBlobStore blobStore,
             @Nonnull Executor executor) {
         this.activeDeletedBlobCollector = checkNotNull(activeDeletedBlobCollector);
         this.whiteboard = checkNotNull(whiteboard);
+        this.store = store;
+        this.indexPathService = indexPathService;
+        this.asyncIndexInfoService = asyncIndexInfoService;
         this.blobStore = checkNotNull(blobStore);
         this.executor = checkNotNull(executor);
 
@@ -128,6 +169,133 @@ public class ActiveDeletedBlobCollectorM
         return gcOp.getStatus().toCompositeData();
     }
 
+    @Override
+    public boolean isActiveDeletionUnsafe() {
+        return activeDeletedBlobCollector.isActiveDeletionUnsafe();
+    }
+
+    @Override
+    public void flagActiveDeletionUnsafeForCurrentState() {
+        activeDeletedBlobCollector.flagActiveDeletionUnsafe(true);
+
+        if (!waitForRunningIndexCycles()) {
+            LOG.warn("Some indexers were still found running. Resume and quit gracefully");
+            activeDeletedBlobCollector.flagActiveDeletionUnsafe(false);
+        }
+
+        try {
+            markCurrentIndexFilesUnsafeForActiveDeletion();
+        } catch (CommitFailedException e) {
+            LOG.warn("Could not set current index files unsafe for active deletion. Resume and quit gracefully", e);
+            activeDeletedBlobCollector.flagActiveDeletionUnsafe(false);
+        }
+    }
+
+    @Override
+    public void flagActiveDeletionSafe() {
+        activeDeletedBlobCollector.flagActiveDeletionUnsafe(false);
+    }
+
+    /**
+     * Wait for running index cycles for 2 minutes.
+     *
+     * @return true if all running index cycles have been through; false otherwise
+     */
+    private boolean waitForRunningIndexCycles() {
+        Map<IndexStatsMBean, Long> origIndexLaneToExecutinoCountMap = Maps.asMap(
+                Sets.newHashSet(StreamSupport.stream(asyncIndexInfoService.getAsyncLanes().spliterator(), false)
+                        .map(lane -> asyncIndexInfoService.getInfo(lane).getStatsMBean())
+                        .filter(bean -> {
+                            String beanStatus;
+                            try {
+                                if (bean != null) {
+                                    beanStatus = bean.getStatus();
+                                } else {
+                                    return false;
+                                }
+                            } catch (Exception e) {
+                                LOG.warn("Exception during getting status for {}. Ignoring this indexer lane", bean.getName(), e);
+                                return false;
+                            }
+                            return STATUS_RUNNING.equals(beanStatus);
+                        })
+                        .collect(Collectors.toList())),
+                IndexStatsMBean::getTotalExecutionCount);
+
+        if (!origIndexLaneToExecutinoCountMap.isEmpty()) {
+            LOG.info("Found running index lanes ({}). Sleep a bit before continuing.",
+                    transform(origIndexLaneToExecutinoCountMap.keySet(), IndexStatsMBean::getName));
+            try {
+                clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(1));
+            } catch (InterruptedException e) {
+                LOG.info("Thread interrupted during initial wait", e);
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        long start = clock.getTime();
+        while (!origIndexLaneToExecutinoCountMap.isEmpty()) {
+            Map.Entry<IndexStatsMBean, Long> indexLaneEntry = origIndexLaneToExecutinoCountMap.entrySet().iterator().next();
+            IndexStatsMBean indexLaneBean = indexLaneEntry.getKey();
+
+            long oldExecCnt = indexLaneEntry.getValue();
+            long newExecCnt = indexLaneBean.getTotalExecutionCount();
+            String beanStatus = indexLaneBean.getStatus();
+
+            if (!STATUS_RUNNING.equals(beanStatus) || oldExecCnt != newExecCnt) {
+                origIndexLaneToExecutinoCountMap.remove(indexLaneBean);
+                LOG.info("Lane {} has moved - oldExecCnt {}, newExecCnt {}", indexLaneBean.getName(), oldExecCnt, newExecCnt);
+            } else if (clock.getTime() - start > TimeUnit.MINUTES.toMillis(2)) {
+                LOG.warn("Timed out while waiting for running index lane executions");
+                break;
+            } else {
+                LOG.info("Lane {} still has execution count {}. Waiting....", indexLaneBean.getName(), newExecCnt);
+
+                try {
+                    clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(1));
+                } catch (InterruptedException e) {
+                    LOG.info("Thread interrupted", e);
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+        }
+
+        return origIndexLaneToExecutinoCountMap.isEmpty();
+    }
+
+    private void markCurrentIndexFilesUnsafeForActiveDeletion() throws CommitFailedException {
+        NodeBuilder rootBuilder = store.getRoot().builder();
+        for (String indexPath : indexPathService.getIndexPaths()) {
+            markCurrentIndexFilesUnsafeForActiveDeletionFor(rootBuilder, indexPath);
+        }
+
+        store.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+    }
+
+    private void markCurrentIndexFilesUnsafeForActiveDeletionFor(NodeBuilder rootBuilder, String indexPath) {
+        NodeBuilder indexPathBuilder = getBuilderForPath(rootBuilder, indexPath);
+        if (!TYPE_LUCENE.equals(indexPathBuilder.getProperty(TYPE_PROPERTY_NAME).getValue(STRING))) {
+            LOG.debug("Ignoring index {} as it's not a lucene index", indexPath);
+            return;
+        }
+
+        NodeBuilder dataNodeBuilder = indexPathBuilder.getChildNode(INDEX_DATA_CHILD_NAME);
+        for (String indexFileName : dataNodeBuilder.getChildNodeNames()) {
+            NodeBuilder indexFileBuilder = dataNodeBuilder.getChildNode(indexFileName);
+
+            indexFileBuilder.setProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION, true);
+        }
+    }
+
+    private static NodeBuilder getBuilderForPath(NodeBuilder rootBuilder, String path) {
+        NodeBuilder builder = rootBuilder;
+        for (String elem : PathUtils.elements(path)) {
+            builder = builder.getChildNode(elem);
+        }
+        return builder;
+    }
+
     private long getSafeTimestampForDeletedBlobs() {
         long timestamp = clock.getTime() - TimeUnit.SECONDS.toMillis(MIN_BLOB_AGE_TO_ACTIVELY_DELETE);
 

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Sun Dec 17 23:12:10 2017
@@ -757,7 +757,8 @@ public class LuceneIndexProviderService
             File blobCollectorWorkingDir = new File(indexDir, "deleted-blobs");
             activeDeletedBlobCollector = ActiveDeletedBlobCollectorFactory.newInstance(blobCollectorWorkingDir, executorService);
             ActiveDeletedBlobCollectorMBean bean =
-                    new ActiveDeletedBlobCollectorMBeanImpl(activeDeletedBlobCollector, whiteboard, blobStore, executorService);
+                    new ActiveDeletedBlobCollectorMBeanImpl(activeDeletedBlobCollector, whiteboard, nodeStore,
+                            indexPathService, asyncIndexInfoService, blobStore, executorService);
 
             oakRegs.add(registerMBean(whiteboard, ActiveDeletedBlobCollectorMBean.class, bean,
                     ActiveDeletedBlobCollectorMBean.TYPE, "Active lucene files collection"));

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorFactory.java Sun Dec 17 23:12:10 2017
@@ -70,9 +70,15 @@ public class ActiveDeletedBlobCollectorF
         void purgeBlobsDeleted(long before, GarbageCollectableBlobStore blobStore);
 
         void cancelBlobCollection();
+
+        void flagActiveDeletionUnsafe(boolean toFlag);
+
+        boolean isActiveDeletionUnsafe();
     }
 
     public static ActiveDeletedBlobCollector NOOP = new ActiveDeletedBlobCollector() {
+        private volatile boolean activeDeletionUnsafe = false;
+
         @Override
         public BlobDeletionCallback getBlobDeletionCallback() {
             return BlobDeletionCallback.NOOP;
@@ -87,6 +93,16 @@ public class ActiveDeletedBlobCollectorF
         public void cancelBlobCollection() {
 
         }
+
+        @Override
+        public void flagActiveDeletionUnsafe(boolean toFlag) {
+            activeDeletionUnsafe = toFlag;
+        }
+
+        @Override
+        public boolean isActiveDeletionUnsafe() {
+            return activeDeletionUnsafe;
+        }
     };
 
     public interface BlobDeletionCallback extends IndexCommitCallback {
@@ -99,6 +115,9 @@ public class ActiveDeletedBlobCollectorF
          *            blobs.
          */
         void deleted(String blobId, Iterable<String> ids);
+
+        boolean isMarkingForActiveDeletionUnsafe();
+
         BlobDeletionCallback NOOP = new BlobDeletionCallback() {
             @Override
             public void deleted(String blobId, Iterable<String> ids) {
@@ -107,6 +126,11 @@ public class ActiveDeletedBlobCollectorF
             @Override
             public void commitProgress(IndexProgress indexProgress) {
             }
+
+            @Override
+            public boolean isMarkingForActiveDeletionUnsafe() {
+                return ActiveDeletedBlobCollectorFactory.NOOP.isActiveDeletionUnsafe();
+            }
         };
     }
 
@@ -144,6 +168,8 @@ public class ActiveDeletedBlobCollectorF
         private final ExecutorService executorService;
 
         private volatile boolean cancelled;
+        private volatile boolean activeDeletionUnsafe = false;
+
 
         private static final String BLOB_FILE_PATTERN_PREFIX = "blobs-";
         private static final String BLOB_FILE_PATTERN_SUFFIX = ".txt";
@@ -327,6 +353,16 @@ public class ActiveDeletedBlobCollectorF
             cancelled = true;
         }
 
+        @Override
+        public void flagActiveDeletionUnsafe(boolean toFlag) {
+            activeDeletionUnsafe = toFlag;
+        }
+
+        @Override
+        public boolean isActiveDeletionUnsafe() {
+            return activeDeletionUnsafe;
+        }
+
         private long readLastCheckedBlobTimestamp() {
             File blobCollectorInfoFile = new File(rootDirectory, "collection-info.txt");
             if (!blobCollectorInfoFile.exists()) {
@@ -485,6 +521,11 @@ public class ActiveDeletedBlobCollectorF
 
                 deletedBlobs.clear();
             }
+
+            @Override
+            public boolean isMarkingForActiveDeletionUnsafe() {
+                return activeDeletionUnsafe;
+            }
         }
 
         private class BlobIdInfoStruct {

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectory.java Sun Dec 17 23:12:10 2017
@@ -35,6 +35,7 @@ import org.apache.lucene.store.IndexOutp
 import org.apache.lucene.store.Lock;
 import org.apache.lucene.store.LockFactory;
 import org.apache.lucene.store.NoLockFactory;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
@@ -49,6 +50,7 @@ import static com.google.common.base.Pre
 import static org.apache.jackrabbit.JcrConstants.JCR_DATA;
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.api.Type.BINARY;
+import static org.apache.jackrabbit.oak.api.Type.BOOLEAN;
 import static org.apache.jackrabbit.oak.api.Type.STRINGS;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
@@ -60,9 +62,11 @@ import static org.apache.jackrabbit.oak.
  */
 public class OakDirectory extends Directory {
     static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(OakDirectory.class.getName() + ".perf"));
+    static final Logger LOG = LoggerFactory.getLogger(OakDirectory.class.getName());
     public static final String PROP_DIR_LISTING = "dirListing";
     static final String PROP_BLOB_SIZE = "blobSize";
     static final String PROP_UNIQUE_KEY = "uniqueKey";
+    public static final String PROP_UNSAFE_FOR_ACTIVE_DELETION = "unsafeForActiveDeletion";
     static final int UNIQUE_KEY_SIZE = 16;
 
     private final static SecureRandom secureRandom = new SecureRandom();
@@ -147,19 +151,25 @@ public class OakDirectory extends Direct
         checkArgument(!readOnly, "Read only directory");
         fileNames.remove(name);
         NodeBuilder f = directoryBuilder.getChildNode(name);
-        PropertyState property = f.getProperty(JCR_DATA);
-        if (property != null) {
-            if (property.getType() == BINARIES || property.getType() == BINARY) {
-                for (Blob b : property.getValue(BINARIES)) {
-                    //Mark the blob as deleted. Also, post index path, type of directory
-                    //(:suggest, :data, etc) and filename being deleted
-                    String blobId = b.getContentIdentity();
-                    if (blobId != null) {
-                        blobDeletionCallback.deleted(blobId,
-                                Lists.newArrayList(definition.getIndexPath(), dataNodeName, name));
+
+        if (!f.hasProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION)
+                || !f.getProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION).getValue(BOOLEAN)) {
+            PropertyState property = f.getProperty(JCR_DATA);
+            if (property != null) {
+                if (property.getType() == BINARIES || property.getType() == BINARY) {
+                    for (Blob b : property.getValue(BINARIES)) {
+                        //Mark the blob as deleted. Also, post index path, type of directory
+                        //(:suggest, :data, etc) and filename being deleted
+                        String blobId = b.getContentIdentity();
+                        if (blobId != null) {
+                            blobDeletionCallback.deleted(blobId,
+                                    Lists.newArrayList(definition.getIndexPath(), dataNodeName, name));
+                        }
                     }
                 }
             }
+        } else {
+            LOG.debug("Not marking {} under {} for active deletion", name, indexName);
         }
         f.remove();
         markDirty();
@@ -198,6 +208,10 @@ public class OakDirectory extends Direct
         String key = StringUtils.convertBytesToHex(uniqueKey);
         file.setProperty(PROP_UNIQUE_KEY, key);
         file.setProperty(PROP_BLOB_SIZE, definition.getBlobSize());
+        if (blobDeletionCallback.isMarkingForActiveDeletionUnsafe()) {
+            file.setProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION, true);
+            LOG.debug("Setting {} under {} as unsafe for active deletion", name, indexName);
+        }
 
         fileNames.add(name);
         markDirty();

Added: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java?rev=1818521&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java (added)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java Sun Dec 17 23:12:10 2017
@@ -0,0 +1,585 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.jackrabbit.oak.InitialContent;
+import org.apache.jackrabbit.oak.Oak;
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.ContentRepository;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.api.Root;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMKBuilderProvider;
+import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
+import org.apache.jackrabbit.oak.plugins.document.memory.MemoryDocumentStore;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfo;
+import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
+import org.apache.jackrabbit.oak.plugins.index.IndexPathService;
+import org.apache.jackrabbit.oak.plugins.index.IndexPathServiceImpl;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollector;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.IndexConsistencyChecker;
+import org.apache.jackrabbit.oak.plugins.index.nodetype.NodeTypeIndexProvider;
+import org.apache.jackrabbit.oak.plugins.index.property.PropertyIndexEditorProvider;
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.spi.blob.MemoryBlobStore;
+import org.apache.jackrabbit.oak.spi.commit.CommitHook;
+import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
+import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
+import org.apache.jackrabbit.oak.spi.mount.Mounts;
+import org.apache.jackrabbit.oak.spi.security.OpenSecurityProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+import java.io.File;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
+import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_RUNNING;
+import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
+import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.TYPE_PROPERTY_NAME;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.TYPE_LUCENE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.PROP_UNSAFE_FOR_ACTIVE_DELETION;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ActiveDeletedBlobCollectorMBeanImplTest {
+
+    @Rule
+    public final TemporaryFolder temporaryFolder = new TemporaryFolder(new File("target"));
+
+    @Rule
+    public final OsgiContext context = new OsgiContext();
+
+    @Rule
+    public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
+
+    private Whiteboard wb;
+
+    private NodeStore nodeStore;
+
+    private List<String> indexPaths = Lists.newArrayList();
+
+    private final Clock clock = new Clock.Virtual();
+
+    @Before
+    public void setUp() {
+        wb = new OsgiWhiteboard(context.bundleContext());
+        nodeStore = new MemoryNodeStore();
+    }
+
+    @After
+    public void after() {
+        indexPaths.clear();
+        ActiveDeletedBlobCollectorFactory.NOOP.flagActiveDeletionUnsafe(false);
+    }
+
+    @Test
+    public void onlyRunningIndexesRequireToBeWaitedOn() {
+        IndexPathService indexPathService = MockRegistrar.getIndexPathsService(indexPaths);
+
+        final StatusSupplier statusSupplier = new StatusSupplier();
+
+        final AtomicLong returnExecCount = new AtomicLong(2L);
+
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", statusSupplier, returnExecCount::get)
+        ));
+
+        ActiveDeletedBlobCollectorMBean bean = getTestBean(indexPathService, asyncIndexInfoService);
+
+        long start = clock.getTime();
+        bean.flagActiveDeletionUnsafeForCurrentState();
+        long elapsed = clock.getTime() - start;
+        assertTrue("Non running index lane was polled for " + TimeUnit.MILLISECONDS.toSeconds(elapsed) + " seconds.",
+                elapsed < TimeUnit.SECONDS.toMillis(5));
+
+        // running index with stalled exec count waits for 2 minutes
+        statusSupplier.status = STATUS_RUNNING;
+        start = clock.getTime();
+        bean.flagActiveDeletionUnsafeForCurrentState();
+        elapsed = clock.getTime() - start;
+        assertTrue("Running index lane without changing execCnt was polled for " + TimeUnit.MILLISECONDS.toSeconds(elapsed) + " seconds.",
+                elapsed > TimeUnit.SECONDS.toMillis(120) && elapsed < TimeUnit.SECONDS.toMillis(125));
+
+        // running index with not stalled exec count doesn't wait
+        statusSupplier.status = STATUS_RUNNING;
+        asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", statusSupplier, returnExecCount::incrementAndGet)
+        ));
+        bean = getTestBean(indexPathService, asyncIndexInfoService);
+
+        start = clock.getTime();
+        bean.flagActiveDeletionUnsafeForCurrentState();
+        elapsed = clock.getTime() - start;
+        assertTrue("Running index lane without changing execCnt was polled for " + TimeUnit.MILLISECONDS.toSeconds(elapsed) + " seconds.",
+                elapsed < TimeUnit.SECONDS.toMillis(5));
+    }
+
+    @Test
+    public void headIndexFilesGetMarkedUnsafe() throws Exception {
+        String indexPath = "/fooIndex";
+        createFakeIndex(indexPath);
+
+        IndexPathService indexPathService = MockRegistrar.getIndexPathsService(indexPaths);
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> STATUS_DONE, () -> 2L)
+        ));
+
+        ActiveDeletedBlobCollectorMBean bean = getTestBean(indexPathService, asyncIndexInfoService);
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+
+        NodeState indexFile = getFakeIndexFile(indexPath);
+
+        assertTrue(indexFile.getBoolean(PROP_UNSAFE_FOR_ACTIVE_DELETION));
+    }
+
+    @Test
+    public void pauseResumeSetsInMemFlag() {
+        IndexPathService indexPathService = MockRegistrar.getIndexPathsService(indexPaths);
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> STATUS_DONE, () -> 2L)
+        ));
+
+        ActiveDeletedBlobCollectorMBean bean = getTestBean(indexPathService, asyncIndexInfoService);
+
+        assertFalse("Bean should delegate the call correctly",
+                bean.isActiveDeletionUnsafe());
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+
+        assertTrue("Active deleted blob collector isn't notified to stop marking",
+                ActiveDeletedBlobCollectorFactory.NOOP.isActiveDeletionUnsafe());
+
+        assertTrue("Bean should delegate the call correctly",
+                bean.isActiveDeletionUnsafe());
+
+        bean.flagActiveDeletionSafe();
+
+        assertFalse("Active deleted blob collector isn't notified to resume marking",
+                ActiveDeletedBlobCollectorFactory.NOOP.isActiveDeletionUnsafe());
+
+        assertFalse("Bean should delegate the call correctly",
+                bean.isActiveDeletionUnsafe());
+    }
+
+    @Test
+    public void timedOutWhileWaitingForIndexerShouldAutoResume() {
+        IndexPathService indexPathService = MockRegistrar.getIndexPathsService(indexPaths);
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> STATUS_RUNNING, () -> 2L)
+        ));
+
+        ActiveDeletedBlobCollectorMBean bean = getTestBean(indexPathService, asyncIndexInfoService);
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+
+        assertFalse("Timing out on running indexer didn't resume marking blobs",
+                bean.isActiveDeletionUnsafe());
+    }
+
+    @Test
+    public void failureToFlagAllIndexFilesShouldAutoResume() {
+        IndexPathService indexPathService = MockRegistrar.getIndexPathsService(indexPaths);
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> STATUS_DONE, () -> 2L)
+        ));
+
+        NodeStore failingNodeStore = new MemoryNodeStore() {
+            @Nonnull
+            @Override
+            public synchronized NodeState merge(@Nonnull NodeBuilder builder, @Nonnull CommitHook commitHook,
+                                                @Nonnull CommitInfo info) throws CommitFailedException {
+                throw new CommitFailedException("TestFail", 1, "We must never merge");
+            }
+        };
+
+        ActiveDeletedBlobCollectorMBeanImpl bean =
+                new ActiveDeletedBlobCollectorMBeanImpl(ActiveDeletedBlobCollectorFactory.NOOP, wb, failingNodeStore,
+                        indexPathService, asyncIndexInfoService,
+                        new MemoryBlobStore(), sameThreadExecutor());
+        bean.clock = clock;
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+
+        assertFalse("Failure to update head index files didn't resume marking blobs",
+                bean.isActiveDeletionUnsafe());
+    }
+
+    @Test
+    public void orderOfFlaggingWaitForIndexersAndUpdateIndexFiles() {
+        final AtomicBoolean isPaused = new AtomicBoolean();
+        final AtomicBoolean hadWaitedForIndex = new AtomicBoolean();
+
+        IndexPathService indexPathService = mock(IndexPathService.class);
+        when(indexPathService.getIndexPaths()).then(mockObj -> {
+            assertTrue("Must wait for indexers before going to update index files", hadWaitedForIndex.get());
+            return indexPaths;
+        });
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> {
+                    assertTrue("Must pause before waiting for indexers", isPaused.get());
+                    hadWaitedForIndex.set(true);
+                    return STATUS_DONE;
+                }, () -> 2L)
+        ));
+
+        ActiveDeletedBlobCollectorMBeanImpl bean =
+                new ActiveDeletedBlobCollectorMBeanImpl(new PauseNotifyingActiveDeletedBlobCollector(() -> {
+                    isPaused.set(true);
+                    return null;
+                }), wb, nodeStore,
+                        indexPathService, asyncIndexInfoService,
+                        new MemoryBlobStore(), sameThreadExecutor());
+        bean.clock = clock;
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+    }
+
+    @Test
+    public void clonedNSWithSharedDS() throws Exception {
+        MemoryBlobStore bs = new MemoryBlobStore();
+        bs.setBlockSizeMin(48);
+
+        MemoryDocumentStore mds1 = new MemoryDocumentStore();
+
+        DocumentNodeStore dns1 = builderProvider.newBuilder()
+                .setDocumentStore(mds1).setBlobStore(bs).build();
+
+        // Create initial repo with InitialContent. It has enough data to create blobs
+        LuceneIndexEditorProvider editorProvider = new LuceneIndexEditorProvider();
+        ContentRepository repository = new Oak(dns1)
+                .with(new InitialContent())
+                .with(new OpenSecurityProvider())
+                .with(editorProvider)
+                .with(new PropertyIndexEditorProvider())
+                .with(new NodeTypeIndexProvider())
+                .createContentRepository();
+        ContentSession session = repository.login(null, null);
+        Root root = session.getLatestRoot();
+        TestUtil.createFulltextIndex(root.getTree("/"), "testIndex");
+        root.commit();
+
+        // pause active deletion
+        IndexPathService indexPathService = new IndexPathServiceImpl(dns1);
+        AsyncIndexInfoService asyncIndexInfoService = MockRegistrar.getAsyncIndexInfoService(newArrayList(
+                new IndexMBeanInfoSupplier("foo-async", () -> STATUS_DONE, () -> 2L)
+        ));
+        ActiveDeletedBlobCollectorMBeanImpl bean =
+                new ActiveDeletedBlobCollectorMBeanImpl(ActiveDeletedBlobCollectorFactory.NOOP, wb, dns1,
+                        indexPathService, asyncIndexInfoService,
+                        new MemoryBlobStore(), sameThreadExecutor());
+        bean.clock = clock;
+
+        bean.flagActiveDeletionUnsafeForCurrentState();
+
+        // we try here to create some churn and we want some files to get created at dns1
+        // BUT get deleted at dns2. "segments_1" is one such file.
+        // since our "creation" of churn is assumed, we should assert that dns1 has "segments_1"
+        // (and later dns2 doesn't have it)
+        root = session.getLatestRoot();
+        assertTrue("First pass indexing should generate segments_1",
+                root.getTree("/oak:index/testIndex/:data/segments_1").exists());
+
+        // shutdown first instance
+        dns1.dispose();
+
+        // clone
+        MemoryDocumentStore mds2 = mds1.copy();
+        DocumentNodeStore dns2 = builderProvider.newBuilder().setDocumentStore(mds2).setBlobStore(bs).build();
+
+        // create some churn to delete some index files - using clone store
+        // we'd setup lucene editor with active deletion collector
+
+        DeletedFileTrackingADBC deletedFileTrackingADBC = new DeletedFileTrackingADBC(
+                new File(temporaryFolder.getRoot(), "adbc-workdir"));
+        editorProvider = new LuceneIndexEditorProvider(null, null,
+                new ExtractedTextCache(0, 0),
+                null, Mounts.defaultMountInfoProvider(),
+                deletedFileTrackingADBC);
+        repository = new Oak(dns2)
+                .with(new OpenSecurityProvider())
+                .with(editorProvider)
+                .with(new PropertyIndexEditorProvider())
+                .with(new NodeTypeIndexProvider())
+                .createContentRepository();
+        session = repository.login(null, null);
+        root = session.getLatestRoot();
+        Tree rootTree = root.getTree("/");
+        for (int i = 0; i < 20; i++) {
+            Tree child = rootTree.addChild("a" + i);
+            for (int j = 0; j < 20; j++) {
+                child.setProperty("foo" + j, "bar" + j);
+            }
+        }
+
+        root.commit();
+        //since our index is not async, we are unable to track commit progress automatically.
+        // OR, iow, we need to play the rold of AsyncIndexUpdate explicitly
+        deletedFileTrackingADBC.blobDeletionCallback.commitProgress(COMMIT_SUCCEDED);
+
+        deletedFileTrackingADBC.purgeBlobsDeleted(Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(1), bs);
+
+        root = session.getLatestRoot();
+        assertFalse("Churn created via dns2 should delete segments_1",
+                root.getTree("/oak:index/testIndex/:data/segments_1").exists());
+
+        dns2.dispose();
+
+        // validate index using dns1 which should still have valid index data even
+        // after dns2's churn
+        dns1 = builderProvider.newBuilder().setDocumentStore(mds1).setBlobStore(bs).build();
+
+        IndexConsistencyChecker checker = new IndexConsistencyChecker(dns1.getRoot(), "/oak:index/testIndex",
+                new File(temporaryFolder.getRoot(), "checker-workdir"));
+        IndexConsistencyChecker.Result result = checker.check(IndexConsistencyChecker.Level.BLOBS_ONLY);
+        assertFalse("Nodestore1 can't read blobs: " + result.missingBlobIds + " while reading index",
+                result.missingBlobs);
+    }
+
+    private class IndexMBeanInfoSupplier {
+        private final String name;
+        private final Supplier<String> statusSupplier;
+        private final Supplier<Long> execCntSupplier;
+
+        IndexMBeanInfoSupplier(String name, Supplier<String> statusSupplier, Supplier<Long> execCntSupplier) {
+            this.name = name;
+            this.statusSupplier = statusSupplier;
+            this.execCntSupplier = execCntSupplier;
+        }
+
+        String getName() {
+            return name;
+        }
+
+        String getStatus() {
+            return statusSupplier.get();
+        }
+
+        long getExecCnt() {
+            return execCntSupplier.get();
+        }
+    }
+
+    private static class MockRegistrar {
+        static AsyncIndexInfoService getAsyncIndexInfoService(List<IndexMBeanInfoSupplier> infoSuppliers) {
+
+            AsyncIndexInfoService service = mock(AsyncIndexInfoService.class);
+
+            List<String> asyncLanes = Lists.newArrayList();
+
+            for (IndexMBeanInfoSupplier info : infoSuppliers) {
+                String lane = info.getName();
+
+                IndexStatsMBean bean = mock(IndexStatsMBean.class);
+                when(bean.getName()).thenReturn(lane);
+                when(bean.getStatus()).then(mockObj -> info.getStatus());
+                when(bean.getTotalExecutionCount()).then(mockObj -> info.getExecCnt());
+
+                when(service.getInfo(lane)).then(mockObj -> new AsyncIndexInfo(
+                        lane,
+                        1324L,
+                        4567L,
+                        STATUS_RUNNING.equals(info.getStatus()),
+                        bean
+                ));
+
+                asyncLanes.add(lane);
+            }
+            when(service.getAsyncLanes()).thenReturn(asyncLanes);
+
+            return service;
+        }
+
+        static IndexPathService getIndexPathsService(List<String> indexPaths) {
+            IndexPathService service = mock(IndexPathService.class);
+            when(service.getIndexPaths()).thenReturn(indexPaths);
+            return service;
+        }
+    }
+
+    private static class StatusSupplier implements Supplier<String> {
+        String status = STATUS_DONE;
+
+        @Override
+        public String get() {
+            return status;
+        }
+    }
+
+    private static class PauseNotifyingActiveDeletedBlobCollector
+            implements ActiveDeletedBlobCollector {
+        private final Callable callback;
+
+        PauseNotifyingActiveDeletedBlobCollector (@Nonnull Callable callback) {
+            this.callback = callback;
+        }
+        @Override
+        public BlobDeletionCallback getBlobDeletionCallback() {
+            return ActiveDeletedBlobCollectorFactory.NOOP.getBlobDeletionCallback();
+        }
+
+        @Override
+        public void purgeBlobsDeleted(long before, GarbageCollectableBlobStore blobStore) {
+            ActiveDeletedBlobCollectorFactory.NOOP.purgeBlobsDeleted(before, blobStore);
+        }
+
+        @Override
+        public void cancelBlobCollection() {
+            ActiveDeletedBlobCollectorFactory.NOOP.cancelBlobCollection();
+        }
+
+        @Override
+        public void flagActiveDeletionUnsafe(boolean toFlag) {
+            try {
+                callback.call();
+            } catch (Exception e) {
+                // ignored
+            }
+            ActiveDeletedBlobCollectorFactory.NOOP.flagActiveDeletionUnsafe(toFlag);
+        }
+
+        @Override
+        public boolean isActiveDeletionUnsafe() {
+            return ActiveDeletedBlobCollectorFactory.NOOP.isActiveDeletionUnsafe();
+        }
+    }
+
+    private static class DeletedFileTrackingADBC implements ActiveDeletedBlobCollector {
+        final List<String> deletedFiles = newArrayList();
+        BlobDeletionCallback blobDeletionCallback = null;
+
+        private final ActiveDeletedBlobCollector delegate;
+
+        DeletedFileTrackingADBC(File tempFolder) {
+            delegate = ActiveDeletedBlobCollectorFactory.newInstance(tempFolder, sameThreadExecutor());
+        }
+
+        @Override
+        public BlobDeletionCallback getBlobDeletionCallback() {
+            final BlobDeletionCallback deletionCallback = delegate.getBlobDeletionCallback();
+            blobDeletionCallback = new BlobDeletionCallback() {
+                @Override
+                public void deleted(String blobId, Iterable<String> ids) {
+                    deletedFiles.add(Iterables.getLast(ids));
+                    deletionCallback.deleted(blobId, ids);
+                }
+
+                @Override
+                public boolean isMarkingForActiveDeletionUnsafe() {
+                    return deletionCallback.isMarkingForActiveDeletionUnsafe();
+                }
+
+                @Override
+                public void commitProgress(IndexProgress indexProgress) {
+                    deletionCallback.commitProgress(indexProgress);
+                }
+            };
+            return blobDeletionCallback;
+        }
+
+        @Override
+        public void purgeBlobsDeleted(long before, GarbageCollectableBlobStore blobStore) {
+            delegate.purgeBlobsDeleted(before, blobStore);
+        }
+
+        @Override
+        public void cancelBlobCollection() {
+            delegate.cancelBlobCollection();
+        }
+
+        @Override
+        public void flagActiveDeletionUnsafe(boolean toFlag) {
+            delegate.flagActiveDeletionUnsafe(toFlag);
+        }
+
+        @Override
+        public boolean isActiveDeletionUnsafe() {
+            return delegate.isActiveDeletionUnsafe();
+        }
+    }
+
+    private void createFakeIndex(String indexPath) throws CommitFailedException {
+        String indexFileName = "fakeIndexFile";
+
+        // create index in node store
+        NodeBuilder rootBuilder = nodeStore.getRoot().builder();
+        NodeBuilder nodeBuilder = rootBuilder;
+        for (String elem : PathUtils.elements(indexPath)) {
+            nodeBuilder = nodeBuilder.child(elem);
+        }
+        nodeBuilder.setProperty(TYPE_PROPERTY_NAME, TYPE_LUCENE);
+
+        nodeBuilder = nodeBuilder.child(":data");
+        nodeBuilder.child(indexFileName);
+
+        nodeStore.merge(rootBuilder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
+
+        indexPaths.add(indexPath);
+    }
+
+    private NodeState getFakeIndexFile(String indexPath) {
+        String indexFileName = "fakeIndexFile";
+
+        NodeState state = nodeStore.getRoot();
+        for (String elem : PathUtils.elements(indexPath)) {
+            state = state.getChildNode(elem);
+        }
+        state = state.getChildNode(":data").getChildNode(indexFileName);
+        return state;
+    }
+
+    private ActiveDeletedBlobCollectorMBean getTestBean(IndexPathService indexPathService, AsyncIndexInfoService asyncIndexInfoService) {
+        ActiveDeletedBlobCollectorMBeanImpl bean =
+                new ActiveDeletedBlobCollectorMBeanImpl(ActiveDeletedBlobCollectorFactory.NOOP, wb, nodeStore,
+                        indexPathService, asyncIndexInfoService,
+                        new MemoryBlobStore(), sameThreadExecutor());
+        bean.clock = clock;
+
+        return bean;
+    }
+}

Propchange: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/ActiveDeletedBlobCollectorMBeanImplTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java Sun Dec 17 23:12:10 2017
@@ -464,6 +464,37 @@ public class ActiveDeletedBlobCollectorT
         warnLogCustomizer.finished();
     }
 
+    // OAK-6950
+    @Test
+    public void pauseMarkingDeletedBlobs() {
+        BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+        assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
+
+        adbc.flagActiveDeletionUnsafe(true);
+        bdc = adbc.getBlobDeletionCallback();
+        assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
+
+        adbc.flagActiveDeletionUnsafe(false);
+        bdc = adbc.getBlobDeletionCallback();
+        assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
+    }
+
+    // OAK-6950
+    @Test
+    public void pauseMarkingDeletedBlobsNOOP() {
+        adbc = ActiveDeletedBlobCollectorFactory.NOOP;
+        BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+        assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
+
+        adbc.flagActiveDeletionUnsafe(true);
+        bdc = adbc.getBlobDeletionCallback();
+        assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
+
+        adbc.flagActiveDeletionUnsafe(false);
+        bdc = adbc.getBlobDeletionCallback();
+        assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
+    }
+
     private void verifyBlobsDeleted(String ... blobIds) throws IOException {
         List<String> chunkIds = new ArrayList<>();
         for (String blobId : blobIds) {

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectoryTestBase.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectoryTestBase.java?rev=1818521&r1=1818520&r2=1818521&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectoryTestBase.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/OakDirectoryTestBase.java Sun Dec 17 23:12:10 2017
@@ -27,10 +27,13 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.PROP_BLOB_SIZE;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.PROP_UNIQUE_KEY;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.PROP_UNSAFE_FOR_ACTIVE_DELETION;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.directory.OakDirectory.UNIQUE_KEY_SIZE;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -48,6 +51,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.NullInputStream;
 import org.apache.jackrabbit.oak.api.Blob;
@@ -505,6 +509,11 @@ abstract public class OakDirectoryTestBa
                         @Override
                         public void commitProgress(IndexProgress indexProgress) {
                         }
+
+                        @Override
+                        public boolean isMarkingForActiveDeletionUnsafe() {
+                            return false;
+                        }
                     })
                 .build();
 
@@ -519,6 +528,116 @@ abstract public class OakDirectoryTestBa
         dir.close();
     }
 
+    // OAK-6950
+    @Test
+    public void blobsCreatedWhenActiveDeletionIsUnsafe() throws Exception {
+        final int fileSize = 1;
+
+        IndexDefinition def = new IndexDefinition(root, builder.getNodeState(), "/foo");
+        BlobFactory factory = in -> {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            IOUtils.copy(in, out);
+            byte[] data = out.toByteArray();
+            return new ArrayBasedBlob(data);
+        };
+
+        final AtomicBoolean markingForceActiveDeletionUnsafe = new AtomicBoolean();
+        OakDirectory dir = getOakDirectoryBuilder(builder, def).setReadOnly(false)
+                .with(factory).
+                        with(
+                                new ActiveDeletedBlobCollectorFactory.BlobDeletionCallback() {
+                                    @Override
+                                    public void deleted(String blobId, Iterable<String> ids) {
+                                    }
+
+                                    @Override
+                                    public void commitProgress(IndexProgress indexProgress) {
+                                    }
+
+                                    @Override
+                                    public boolean isMarkingForActiveDeletionUnsafe() {
+                                        return markingForceActiveDeletionUnsafe.get();
+                                    }
+                                })
+                .build();
+
+        // file1 created before marking was flagged as unsafe
+        writeFile(dir, "file1", fileSize);
+
+        markingForceActiveDeletionUnsafe.set(true);
+
+        // file2 created after marking was flagged as unsafe
+        writeFile(dir, "file2", fileSize);
+        dir.close();
+
+        NodeBuilder dataBuilder = builder.getChildNode(INDEX_DATA_CHILD_NAME);
+
+        assertNull("file1 must not get flagged to be unsafe to be actively deleted",
+                dataBuilder.getChildNode("file1").getProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION));
+        assertTrue("file2 must get flagged to be unsafe to be actively deleted",
+                dataBuilder.getChildNode("file2").getProperty(PROP_UNSAFE_FOR_ACTIVE_DELETION).getValue(Type.BOOLEAN));
+    }
+
+    // OAK-6950
+    @Test
+    public void dontReportFilesMarkedUnsafeForActiveDeletion() throws Exception {
+        AtomicInteger blobIdSuffix = new AtomicInteger();
+        IndexDefinition def = new IndexDefinition(root, builder.getNodeState(), "/foo");
+        BlobFactory factory = in -> {
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            IOUtils.copy(in, out);
+            byte[] data = out.toByteArray();
+            return new ArrayBasedBlob(data) {
+                @Override
+                public String getContentIdentity() {
+                    return Long.toString(length() - UNIQUE_KEY_SIZE) + "-id-" + blobIdSuffix.get();
+                }
+            };
+        };
+
+        final AtomicBoolean markingForceActiveDeletionUnsafe = new AtomicBoolean();
+        final Set<String> deletedBlobs = Sets.newHashSet();
+        OakDirectory dir = getOakDirectoryBuilder(builder, def).setReadOnly(false)
+            .with(factory).
+                    with(
+                        new ActiveDeletedBlobCollectorFactory.BlobDeletionCallback() {
+                            @Override
+                            public void deleted(String blobId, Iterable<String> ids) {
+                                deletedBlobs.add(blobId);
+                            }
+
+                            @Override
+                            public void commitProgress(IndexProgress indexProgress) {
+                            }
+
+                            @Override
+                            public boolean isMarkingForActiveDeletionUnsafe() {
+                                return markingForceActiveDeletionUnsafe.get();
+                            }
+                        })
+                .build();
+
+        // file1 created before marking was flagged as unsafe
+        blobIdSuffix.set(1);
+        writeFile(dir, "file1", fileSize);
+
+        markingForceActiveDeletionUnsafe.set(true);
+
+        // file2 created after marking was flagged as unsafe
+        blobIdSuffix.set(1);
+        writeFile(dir, "file2", fileSize);
+
+        dir.deleteFile("file1");
+        dir.deleteFile("file2");
+
+        dir.close();
+
+        deletedBlobs.forEach(deletedBlob -> {
+            assertTrue("Deleted blob id " + deletedBlob + " must belong to file1",
+                    deletedBlob.endsWith("-id-1"));
+        });
+    }
+
     @Test
     public void blobFactory() throws Exception {
         final AtomicInteger numBlobs = new AtomicInteger();