You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by re...@apache.org on 2018/04/26 12:13:22 UTC
svn commit: r1830204 - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/
Author: reschke
Date: Thu Apr 26 12:13:22 2018
New Revision: 1830204
URL: http://svn.apache.org/viewvc?rev=1830204&view=rev
Log:
OAK-7454: oak-lucene: fix broken line ends in repo
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/FulltextQueryTermsProvider.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/IndexFieldProvider.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexAugmentorFactoryTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/FulltextQueryTermsProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/FulltextQueryTermsProvider.java?rev=1830204&r1=1830203&r2=1830204&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/FulltextQueryTermsProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/FulltextQueryTermsProvider.java Thu Apr 26 12:13:22 2018
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index.lucene.spi;
-
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.search.Query;
-
-import javax.annotation.CheckForNull;
-import javax.annotation.Nonnull;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * Implementations of this interface would get callbacks while forming lucene full text queries.
- */
-public interface FulltextQueryTermsProvider {
- /**
- * Implementation which doesn't do anything useful... yet, abides with the contract.
- */
- FulltextQueryTermsProvider DEFAULT = new FulltextQueryTermsProvider() {
- @Override
- public Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition) {
- return null;
- }
-
- @Override
- public Set<String> getSupportedTypes() {
- return Collections.EMPTY_SET;
- }
- };
- /**
- * This method would get called while forming full text clause for full text clause not constrained on a particular
- * field.
- * @param text full text term
- * @param analyzer {@link Analyzer} being used while forming the query. Can be used to analyze text consistently.
- * @param indexDefinition {@link NodeState} of index definition
- * @return {@link Query} object to be OR'ed with query being prepared. {@code null}, if nothing is to be added.
- */
- @CheckForNull
- Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition);
-
- /**
- * This method is used to find which node types are supported by the implementation. Based, on the index
- * definition being used to query the document, only those implementations would get callback to
- * {@link FulltextQueryTermsProvider#getQueryTerm} which declare a matching node type. Note, node types are
- * exact matches and do not support inheritance.
- * @return {@link Set} of types supported by the implementation
- */
- @Nonnull
- Set<String> getSupportedTypes();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.spi;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.search.Query;
+
+import javax.annotation.CheckForNull;
+import javax.annotation.Nonnull;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Implementations of this interface would get callbacks while forming lucene full text queries.
+ */
+public interface FulltextQueryTermsProvider {
+ /**
+ * Implementation which doesn't do anything useful... yet, abides with the contract.
+ */
+ FulltextQueryTermsProvider DEFAULT = new FulltextQueryTermsProvider() {
+ @Override
+ public Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition) {
+ return null;
+ }
+
+ @Override
+ public Set<String> getSupportedTypes() {
+ return Collections.EMPTY_SET;
+ }
+ };
+ /**
+ * This method would get called while forming full text clause for full text clause not constrained on a particular
+ * field.
+ * @param text full text term
+ * @param analyzer {@link Analyzer} being used while forming the query. Can be used to analyze text consistently.
+ * @param indexDefinition {@link NodeState} of index definition
+ * @return {@link Query} object to be OR'ed with query being prepared. {@code null}, if nothing is to be added.
+ */
+ @CheckForNull
+ Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition);
+
+ /**
+ * This method is used to find which node types are supported by the implementation. Based, on the index
+ * definition being used to query the document, only those implementations would get callback to
+ * {@link FulltextQueryTermsProvider#getQueryTerm} which declare a matching node type. Note, node types are
+ * exact matches and do not support inheritance.
+ * @return {@link Set} of types supported by the implementation
+ */
+ @Nonnull
+ Set<String> getSupportedTypes();
+}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/IndexFieldProvider.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/IndexFieldProvider.java?rev=1830204&r1=1830203&r2=1830204&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/IndexFieldProvider.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/spi/IndexFieldProvider.java Thu Apr 26 12:13:22 2018
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index.lucene.spi;
-
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.lucene.document.Field;
-
-import javax.annotation.Nonnull;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * Implementations of this interface would get callbacks while indexing documents. It's the responsibility
- * of the implementation to exit as early as possible if it doesn't care about the document being indexed.
- */
-public interface IndexFieldProvider {
- /**
- * Implementation which doesn't do anything useful... yet, abides with the contract.
- */
- IndexFieldProvider DEFAULT = new IndexFieldProvider() {
- @Override
- public Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition) {
- return Collections.EMPTY_LIST;
- }
-
- @Override
- public Set<String> getSupportedTypes() {
- return Collections.EMPTY_SET;
- }
- };
-
- /**
- * This method would get called while indexing a document.
- *
- * @param path path of the document being indexed
- * @param document {@link NodeState} of the document being indexed
- * @param indexDefinition {@link NodeState} of index definition
- * @return {@link Iterable} of fields that are to be added to {@link org.apache.lucene.document.Document} being prepared
- */
- @Nonnull
- Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition);
-
- /**
- * This method is used to find which node types are supported by the implementation. Based, on the index
- * definition being used to index the document, only those implementations would get callback to
- * {@link IndexFieldProvider#getAugmentedFields} which declare a matching node type. Note, node types are
- * exact matches and do not support inheritance.
- * @return {@link Set} of types supported by the implementation
- */
- @Nonnull
- Set<String> getSupportedTypes();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.spi;
+
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.lucene.document.Field;
+
+import javax.annotation.Nonnull;
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Implementations of this interface would get callbacks while indexing documents. It's the responsibility
+ * of the implementation to exit as early as possible if it doesn't care about the document being indexed.
+ */
+public interface IndexFieldProvider {
+ /**
+ * Implementation which doesn't do anything useful... yet, abides with the contract.
+ */
+ IndexFieldProvider DEFAULT = new IndexFieldProvider() {
+ @Override
+ public Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition) {
+ return Collections.EMPTY_LIST;
+ }
+
+ @Override
+ public Set<String> getSupportedTypes() {
+ return Collections.EMPTY_SET;
+ }
+ };
+
+ /**
+ * This method would get called while indexing a document.
+ *
+ * @param path path of the document being indexed
+ * @param document {@link NodeState} of the document being indexed
+ * @param indexDefinition {@link NodeState} of index definition
+ * @return {@link Iterable} of fields that are to be added to {@link org.apache.lucene.document.Document} being prepared
+ */
+ @Nonnull
+ Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition);
+
+ /**
+ * This method is used to find which node types are supported by the implementation. Based, on the index
+ * definition being used to index the document, only those implementations would get callback to
+ * {@link IndexFieldProvider#getAugmentedFields} which declare a matching node type. Note, node types are
+ * exact matches and do not support inheritance.
+ * @return {@link Set} of types supported by the implementation
+ */
+ @Nonnull
+ Set<String> getSupportedTypes();
+}
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexAugmentorFactoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexAugmentorFactoryTest.java?rev=1830204&r1=1830203&r2=1830204&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexAugmentorFactoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexAugmentorFactoryTest.java Thu Apr 26 12:13:22 2018
@@ -1,214 +1,214 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.jackrabbit.oak.plugins.index.lucene;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.jackrabbit.oak.plugins.index.lucene.spi.FulltextQueryTermsProvider;
-import org.apache.jackrabbit.oak.plugins.index.lucene.spi.IndexFieldProvider;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.document.Field;
-import org.apache.lucene.document.StringField;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.BooleanClause;
-import org.apache.lucene.search.BooleanQuery;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.TermQuery;
-import org.apache.sling.testing.mock.osgi.MockOsgi;
-import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
-import org.hamcrest.CoreMatchers;
-import org.junit.Rule;
-import org.junit.Test;
-
-import javax.annotation.Nonnull;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public class IndexAugmentorFactoryTest {
- private IndexAugmentorFactory indexAugmentorFactory = new IndexAugmentorFactory();
-
- @Rule
- public final OsgiContext context = new OsgiContext();
-
- @Test
- public void compositeIndexProvider()
- {
- final String typeA = "type:A";
- final String typeB = "type:B";
- final String typeC = "type:C";
- final String typeD = "type:D";
-
- context.registerInjectActivateService(indexAugmentorFactory);
-
- new IdentifiableIndexFiledProvider("1", Sets.newHashSet(typeA, typeB));
- new IdentifiableIndexFiledProvider("2", Sets.newHashSet(typeC));
- new IdentifiableIndexFiledProvider("3", Sets.newHashSet(typeA, typeB));
-
- //register an instance which would be unregistered before validation
- IndexFieldProvider unreg = new IdentifiableIndexFiledProvider("4", Sets.newHashSet(typeD));
- indexAugmentorFactory.unbindIndexFieldProvider(unreg);
-
- validateComposedFields(typeA, "1", "3");
- validateComposedFields(typeC, "2");
- validateComposedFields(typeD);
-
- MockOsgi.deactivate(indexAugmentorFactory, context.bundleContext(), Collections.EMPTY_MAP);
-
- validateDeactivatedService();
- }
-
- @Test
- public void compositeQueryTermsProvider()
- {
- final String typeA = "type:A";
- final String typeB = "type:B";
- final String typeC = "type:C";
- final String typeD = "type:D";
- final String typeE = "type:E";
-
- context.registerInjectActivateService(indexAugmentorFactory);
-
- new IdentifiableQueryTermsProvider("1", Sets.newHashSet(typeA, typeB));
- new IdentifiableQueryTermsProvider("2", Sets.newHashSet(typeC));
- new IdentifiableQueryTermsProvider("3", Sets.newHashSet(typeA, typeB));
- new IdentifiableQueryTermsProvider(null, Sets.newHashSet(typeE));
-
- //register an instance which would be unregistered before validation
- FulltextQueryTermsProvider unreg = new IdentifiableQueryTermsProvider("4", Sets.newHashSet(typeD));
- indexAugmentorFactory.unbindFulltextQueryTermsProvider(unreg);
-
- validateComposedQueryTerms(typeA, "1", "3");
- validateComposedQueryTerms(typeC, "2");
- validateComposedQueryTerms(typeD);
- validateComposedQueryTerms(typeE);
-
- MockOsgi.deactivate(indexAugmentorFactory, context.bundleContext(), Collections.EMPTY_MAP);
-
- validateDeactivatedService();
- }
-
- void validateComposedFields(String type, String ... expected) {
- IndexFieldProvider compositeIndexProvider = indexAugmentorFactory.getIndexFieldProvider(type);
-
- if (expected.length > 0) {
- assertTrue("Composed index field provider doesn't declare correct supported type",
- compositeIndexProvider.getSupportedTypes().contains(type));
- }
-
- Iterable<Field> fields = compositeIndexProvider.getAugmentedFields(null, null, null);
- Set<String> ids = Sets.newHashSet();
- for (Field f : fields) {
- ids.add(f.stringValue());
- }
-
- assertEquals(expected.length, Iterables.size(ids));
- assertThat(ids, CoreMatchers.hasItems(expected));
- }
-
- void validateComposedQueryTerms(String type, String ... expected) {
- FulltextQueryTermsProvider compositeQueryTermsProvider = indexAugmentorFactory.getFulltextQueryTermsProvider(type);
-
- if (expected.length > 0) {
- assertTrue("Composed query terms provider doesn't declare correct supported type",
- compositeQueryTermsProvider.getSupportedTypes().contains(type));
- }
-
- Query q = compositeQueryTermsProvider.getQueryTerm(null, null, null);
- if (q == null) {
- assertEquals("No query terms generated for " + type + ".", 0, expected.length);
- } else {
- Set<String> ids = Sets.newHashSet();
- if (q instanceof BooleanQuery) {
- BooleanQuery query = (BooleanQuery) q;
- List<BooleanClause> clauses = query.clauses();
- for (BooleanClause clause : clauses) {
- assertEquals(SHOULD, clause.getOccur());
-
- Query subQuery = clause.getQuery();
- String subQueryStr = subQuery.toString();
- ids.add(subQueryStr.substring(0, subQueryStr.indexOf(":1")));
- }
- } else {
- String subQueryStr = q.toString();
- ids.add(subQueryStr.substring(0, subQueryStr.indexOf(":1")));
- }
-
- assertEquals(expected.length, Iterables.size(ids));
- assertThat(ids, CoreMatchers.hasItems(expected));
- }
- }
-
- private void validateDeactivatedService() {
- assertTrue("All data structures must be empty after deactivate", indexAugmentorFactory.isStateEmpty());
- }
-
- class IdentifiableIndexFiledProvider implements IndexFieldProvider {
- private final Field id;
- private final Set<String> nodeTypes;
-
- IdentifiableIndexFiledProvider(String id, Set<String> nodeTypes) {
- this.id = new StringField("id", id, Field.Store.NO);
- this.nodeTypes = nodeTypes;
- context.registerService(IndexFieldProvider.class, this);
- }
-
- @Nonnull
- @Override
- public Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition) {
- return Lists.newArrayList(id);
- }
-
- @Nonnull
- @Override
- public Set<String> getSupportedTypes() {
- return nodeTypes;
- }
- }
-
- class IdentifiableQueryTermsProvider implements FulltextQueryTermsProvider {
- private final Query id;
- private final Set<String> nodeTypes;
-
- IdentifiableQueryTermsProvider(String id, Set<String> nodeTypes) {
- this.id = (id == null)?null:new TermQuery(new Term(id, "1"));
- this.nodeTypes = nodeTypes;
- context.registerService(FulltextQueryTermsProvider.class, this);
- }
-
- @Override
- public Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition) {
- return id;
- }
-
- @Nonnull
- @Override
- public Set<String> getSupportedTypes() {
- return nodeTypes;
- }
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.index.lucene;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.plugins.index.lucene.spi.FulltextQueryTermsProvider;
+import org.apache.jackrabbit.oak.plugins.index.lucene.spi.IndexFieldProvider;
+import org.apache.jackrabbit.oak.spi.state.NodeState;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.TermQuery;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class IndexAugmentorFactoryTest {
+ private IndexAugmentorFactory indexAugmentorFactory = new IndexAugmentorFactory();
+
+ @Rule
+ public final OsgiContext context = new OsgiContext();
+
+ @Test
+ public void compositeIndexProvider()
+ {
+ final String typeA = "type:A";
+ final String typeB = "type:B";
+ final String typeC = "type:C";
+ final String typeD = "type:D";
+
+ context.registerInjectActivateService(indexAugmentorFactory);
+
+ new IdentifiableIndexFiledProvider("1", Sets.newHashSet(typeA, typeB));
+ new IdentifiableIndexFiledProvider("2", Sets.newHashSet(typeC));
+ new IdentifiableIndexFiledProvider("3", Sets.newHashSet(typeA, typeB));
+
+ //register an instance which would be unregistered before validation
+ IndexFieldProvider unreg = new IdentifiableIndexFiledProvider("4", Sets.newHashSet(typeD));
+ indexAugmentorFactory.unbindIndexFieldProvider(unreg);
+
+ validateComposedFields(typeA, "1", "3");
+ validateComposedFields(typeC, "2");
+ validateComposedFields(typeD);
+
+ MockOsgi.deactivate(indexAugmentorFactory, context.bundleContext(), Collections.EMPTY_MAP);
+
+ validateDeactivatedService();
+ }
+
+ @Test
+ public void compositeQueryTermsProvider()
+ {
+ final String typeA = "type:A";
+ final String typeB = "type:B";
+ final String typeC = "type:C";
+ final String typeD = "type:D";
+ final String typeE = "type:E";
+
+ context.registerInjectActivateService(indexAugmentorFactory);
+
+ new IdentifiableQueryTermsProvider("1", Sets.newHashSet(typeA, typeB));
+ new IdentifiableQueryTermsProvider("2", Sets.newHashSet(typeC));
+ new IdentifiableQueryTermsProvider("3", Sets.newHashSet(typeA, typeB));
+ new IdentifiableQueryTermsProvider(null, Sets.newHashSet(typeE));
+
+ //register an instance which would be unregistered before validation
+ FulltextQueryTermsProvider unreg = new IdentifiableQueryTermsProvider("4", Sets.newHashSet(typeD));
+ indexAugmentorFactory.unbindFulltextQueryTermsProvider(unreg);
+
+ validateComposedQueryTerms(typeA, "1", "3");
+ validateComposedQueryTerms(typeC, "2");
+ validateComposedQueryTerms(typeD);
+ validateComposedQueryTerms(typeE);
+
+ MockOsgi.deactivate(indexAugmentorFactory, context.bundleContext(), Collections.EMPTY_MAP);
+
+ validateDeactivatedService();
+ }
+
+ void validateComposedFields(String type, String ... expected) {
+ IndexFieldProvider compositeIndexProvider = indexAugmentorFactory.getIndexFieldProvider(type);
+
+ if (expected.length > 0) {
+ assertTrue("Composed index field provider doesn't declare correct supported type",
+ compositeIndexProvider.getSupportedTypes().contains(type));
+ }
+
+ Iterable<Field> fields = compositeIndexProvider.getAugmentedFields(null, null, null);
+ Set<String> ids = Sets.newHashSet();
+ for (Field f : fields) {
+ ids.add(f.stringValue());
+ }
+
+ assertEquals(expected.length, Iterables.size(ids));
+ assertThat(ids, CoreMatchers.hasItems(expected));
+ }
+
+ void validateComposedQueryTerms(String type, String ... expected) {
+ FulltextQueryTermsProvider compositeQueryTermsProvider = indexAugmentorFactory.getFulltextQueryTermsProvider(type);
+
+ if (expected.length > 0) {
+ assertTrue("Composed query terms provider doesn't declare correct supported type",
+ compositeQueryTermsProvider.getSupportedTypes().contains(type));
+ }
+
+ Query q = compositeQueryTermsProvider.getQueryTerm(null, null, null);
+ if (q == null) {
+ assertEquals("No query terms generated for " + type + ".", 0, expected.length);
+ } else {
+ Set<String> ids = Sets.newHashSet();
+ if (q instanceof BooleanQuery) {
+ BooleanQuery query = (BooleanQuery) q;
+ List<BooleanClause> clauses = query.clauses();
+ for (BooleanClause clause : clauses) {
+ assertEquals(SHOULD, clause.getOccur());
+
+ Query subQuery = clause.getQuery();
+ String subQueryStr = subQuery.toString();
+ ids.add(subQueryStr.substring(0, subQueryStr.indexOf(":1")));
+ }
+ } else {
+ String subQueryStr = q.toString();
+ ids.add(subQueryStr.substring(0, subQueryStr.indexOf(":1")));
+ }
+
+ assertEquals(expected.length, Iterables.size(ids));
+ assertThat(ids, CoreMatchers.hasItems(expected));
+ }
+ }
+
+ private void validateDeactivatedService() {
+ assertTrue("All data structures must be empty after deactivate", indexAugmentorFactory.isStateEmpty());
+ }
+
+ class IdentifiableIndexFiledProvider implements IndexFieldProvider {
+ private final Field id;
+ private final Set<String> nodeTypes;
+
+ IdentifiableIndexFiledProvider(String id, Set<String> nodeTypes) {
+ this.id = new StringField("id", id, Field.Store.NO);
+ this.nodeTypes = nodeTypes;
+ context.registerService(IndexFieldProvider.class, this);
+ }
+
+ @Nonnull
+ @Override
+ public Iterable<Field> getAugmentedFields(String path, NodeState document, NodeState indexDefinition) {
+ return Lists.newArrayList(id);
+ }
+
+ @Nonnull
+ @Override
+ public Set<String> getSupportedTypes() {
+ return nodeTypes;
+ }
+ }
+
+ class IdentifiableQueryTermsProvider implements FulltextQueryTermsProvider {
+ private final Query id;
+ private final Set<String> nodeTypes;
+
+ IdentifiableQueryTermsProvider(String id, Set<String> nodeTypes) {
+ this.id = (id == null)?null:new TermQuery(new Term(id, "1"));
+ this.nodeTypes = nodeTypes;
+ context.registerService(FulltextQueryTermsProvider.class, this);
+ }
+
+ @Override
+ public Query getQueryTerm(String text, Analyzer analyzer, NodeState indexDefinition) {
+ return id;
+ }
+
+ @Nonnull
+ @Override
+ public Set<String> getSupportedTypes() {
+ return nodeTypes;
+ }
+ }
+}
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java?rev=1830204&r1=1830203&r2=1830204&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/directory/ActiveDeletedBlobCollectorTest.java Thu Apr 26 12:13:22 2018
@@ -1,660 +1,660 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
-
-import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
-import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_FAILED;
-import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeFalse;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.attribute.PosixFilePermission;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.annotation.Nonnull;
-
-import ch.qos.logback.classic.Level;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-import org.apache.commons.io.FileUtils;
-import org.apache.jackrabbit.core.data.DataStoreException;
-import org.apache.jackrabbit.oak.commons.CIHelper;
-import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
-import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollector;
-import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollectorImpl;
-import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
-import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
-import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
-import org.apache.jackrabbit.oak.stats.Clock;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.internal.util.collections.Sets;
-
-public class ActiveDeletedBlobCollectorTest {
- @Rule
- public TemporaryFolder blobCollectionRoot = new TemporaryFolder(new File("target"));
-
- private Clock clock;
- private ChunkDeletionTrackingBlobStore blobStore;
- private ActiveDeletedBlobCollector adbc;
-
- @Before
- public void setup() throws Exception {
- clock = new Clock.Virtual();
- blobStore = new ChunkDeletionTrackingBlobStore();
- createBlobCollector();
- }
-
- private void createBlobCollector() {
- adbc = new ActiveDeletedBlobCollectorImpl(clock,
- new File(blobCollectionRoot.getRoot(), "/a"), sameThreadExecutor());
- }
-
- @Test
- public void simpleCase() throws Exception {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
-
- bdc.deleted("blobId", Collections.singleton("/a"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted("blobId");
- }
-
- @Test
- public void noopDoesNothing() throws Exception {
- adbc = ActiveDeletedBlobCollectorFactory.NOOP;
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
-
- bdc.deleted("blobId", Collections.singleton("/a"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted();
- }
-
- @Test
- public void blobTimestampMustBeBiggerThanFileTimestamp() throws Exception {
- BlobDeletionCallback bdc1 = adbc.getBlobDeletionCallback();
- bdc1.deleted("blobId1", Collections.singleton("/a"));
- bdc1.commitProgress(COMMIT_SUCCEDED);
-
- BlobDeletionCallback bdc2 = adbc.getBlobDeletionCallback();
- bdc2.deleted("blobId2", Collections.singleton("/b"));
-
- BlobDeletionCallback bdc3 = adbc.getBlobDeletionCallback();
- bdc3.deleted("blobId3", Collections.singleton("/c"));
- bdc3.commitProgress(COMMIT_SUCCEDED);
-
- long time = clock.getTimeIncreasing();
- clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
-
- bdc2.commitProgress(COMMIT_SUCCEDED);
-
- adbc.purgeBlobsDeleted(time, blobStore);
-
- //blobId2 is committed later
- verifyBlobsDeleted("blobId1", "blobId3");
- }
-
- @Test
- public void uncommittedDeletionsMustNotBePurged() throws Exception {
- BlobDeletionCallback bdc1 = adbc.getBlobDeletionCallback();
- bdc1.deleted("blobId1", Collections.singleton("/a"));
- bdc1.commitProgress(COMMIT_FAILED);
-
- BlobDeletionCallback bdc2 = adbc.getBlobDeletionCallback();
- bdc2.deleted("blobId2", Collections.singleton("/b"));
- bdc2.commitProgress(COMMIT_SUCCEDED);
-
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- //blobId2 is committed later
- verifyBlobsDeleted("blobId2");
- }
-
- @Test
- public void deleteBlobsDespiteFileExplicitlyPurgedBeforeRestart() throws Exception {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId1", Collections.singleton("/a"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
- createBlobCollector();
- bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId2", Collections.singleton("/b"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
- createBlobCollector();
- bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId3", Collections.singleton("/c"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted("blobId1", "blobId2", "blobId3");
- }
-
- @Test
- public void multiThreadedCommits() throws Exception {
- ExecutorService executorService = Executors.newFixedThreadPool(3);
- File rootDirectory = new File(blobCollectionRoot.getRoot(), "b");
- FileUtils.forceMkdir(rootDirectory);
- adbc = new ActiveDeletedBlobCollectorImpl(clock, rootDirectory, executorService);
-
- int numThreads = 4;
- int numBlobsPerThread = 500;
-
- List<Thread> threads = new ArrayList<>(numThreads);
- final AtomicInteger threadIndex = new AtomicInteger(0);
- for (; threadIndex.get() < numThreads; threadIndex.incrementAndGet()) {
- threads.add(new Thread(new Runnable() {
- private int thisThreadNum = threadIndex.get();
- @Override
- public void run() {
- int blobCnt = 0;
- while (blobCnt < numBlobsPerThread) {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- for (; blobCnt < numBlobsPerThread;) {
- String id = "Thread" + thisThreadNum + "Blob" + blobCnt;
- bdc.deleted(id, Collections.singleton(id));
- blobCnt++;
- if (Math.random() > 0.5) {
- break;
- }
- }
- bdc.commitProgress(COMMIT_SUCCEDED);
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }));
- }
-
- for (Thread t : threads) {
- t.start();
- }
-
- for (Thread t : threads) {
- t.join();
- }
-
- boolean timeout = executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
- assertFalse(timeout);
-
- List<String> deletedChunks = new ArrayList<>(numThreads*numBlobsPerThread*2);
- for (int threadNum = 0; threadNum < numThreads; threadNum++) {
- for (int blobCnt = 0; blobCnt < numBlobsPerThread; blobCnt++) {
- String id = "Thread" + threadNum + "Blob" + blobCnt;
- Iterators.addAll(deletedChunks, blobStore.resolveChunks(id));
- }
- }
-
- // Blocking queue doesn't supply all the items immediately.
- // So, we'd push "MARKER*" blob ids and purge until some marker blob
- // gets purged. BUT, we'd time-out this activity in 3 seconds
- long until = Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(3);
- List<String> markerChunks = Lists.newArrayList();
- int i = 0;
- while (Clock.SIMPLE.getTime() < until) {
- // Push commit with a marker blob-id and wait for it to be purged
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- String markerBlobId = "MARKER-" + (i++);
- bdc.deleted(markerBlobId, Lists.newArrayList(markerBlobId));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- Iterators.addAll(markerChunks, blobStore.resolveChunks(markerBlobId));
- clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(5));
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- if (blobStore.markerChunkDeleted) {
- break;
- }
- }
-
- assertTrue("Timed out while waiting for marker chunk to be purged", blobStore.markerChunkDeleted);
-
- // don't care how many marker blobs are purged
- blobStore.deletedChunkIds.removeAll(markerChunks);
-
- HashSet<String> list = new HashSet<>(deletedChunks);
- list.removeAll(blobStore.deletedChunkIds);
- assertTrue("size: " + list.size() + "; list: " + list.toString(), list.isEmpty());
-
- assertThat(blobStore.deletedChunkIds, containsInAnyOrder(deletedChunks.toArray()));
- }
-
- @Test
- public void inaccessibleWorkDirGivesNoop() throws Exception {
- assumeFalse(CIHelper.windows());
-
- File rootDir = blobCollectionRoot.getRoot();
- File unwritableExistingRootFolder = new File(rootDir, "existingRoot");
- FileUtils.forceMkdir(unwritableExistingRootFolder);
- File unwritableNonExistingRootFolder = new File(unwritableExistingRootFolder, "existingRoot");
-
- Path unwritableExistingPath = FileSystems.getDefault().getPath(unwritableExistingRootFolder.getPath());
- Files.setPosixFilePermissions(unwritableExistingPath,
- Sets.newSet(PosixFilePermission.OWNER_READ,
- PosixFilePermission.GROUP_READ,
- PosixFilePermission.OTHERS_READ));
-
- adbc = ActiveDeletedBlobCollectorFactory.newInstance(unwritableExistingRootFolder, sameThreadExecutor());
- assertEquals("Unwritable existing root folder must have NOOP active blob collector",
- ActiveDeletedBlobCollectorFactory.NOOP, adbc);
-
- adbc = ActiveDeletedBlobCollectorFactory.newInstance(unwritableNonExistingRootFolder, sameThreadExecutor());
- assertEquals("Unwritable non-existing root folder must have NOOP active blob collector",
- ActiveDeletedBlobCollectorFactory.NOOP, adbc);
- }
-
- @Test
- public void cancellablePurge() throws Exception {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- for (int i = 0; i < 10; i++) {
- String id = "Blob" + i;
- bdc.deleted(id, Collections.singleton(id));
- }
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- Semaphore purgeBlocker = new Semaphore(0);
- blobStore.callback = () -> purgeBlocker.acquireUninterruptibly();
- Thread purgeThread = new Thread(() -> {
- try {
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- purgeThread.setDaemon(true);
- purgeBlocker.release(10);//allow 5 deletes
- purgeThread.start();
-
- boolean deleted5 = waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 10);
- assertTrue("Deleted " + blobStore.deletedChunkIds.size() + " chunks", deleted5);
-
- adbc.cancelBlobCollection();
- purgeBlocker.release(20);//release all that's there... this is more than needed, btw.
-
- boolean deleted6 = waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 12);
- assertTrue("Haven't deleted another blob which was locked earlier.", deleted6);
-
- boolean cancelWorked = waitFor(5000, () -> !purgeThread.isAlive());
- assertTrue("Cancel didn't let go of purge thread in 2 seconds", cancelWorked);
-
- assertTrue("Cancelling purge must return asap", blobStore.deletedChunkIds.size() == 12);
- }
-
- @Test
- public void resumeCancelledPurge() throws Exception {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- for (int i = 0; i < 10; i++) {
- String id = "Blob" + i;
- bdc.deleted(id, Collections.singleton(id));
- }
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- Semaphore purgeBlocker = new Semaphore(0);
- blobStore.callback = () -> purgeBlocker.acquireUninterruptibly();
- Thread purgeThread = new Thread(() -> {
- try {
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- purgeThread.setDaemon(true);
- purgeBlocker.release(10);//allow 5 deletes
- purgeThread.start();
-
- waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 10);
-
- adbc.cancelBlobCollection();
- purgeBlocker.release(22);//release all that's there... this is more than needed, btw.
-
- waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 12);
-
- waitFor(5000, () -> !purgeThread.isAlive());
-
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- // Resume can re-attempt to delete already deleted blobs. Hence, the need for for ">="
- assertEquals("All blobs must get deleted", 20, blobStore.deletedChunkIds.size());
- }
-
- @Test
- public void dontWarnWhileErrorsWhileDeletingBlobs() throws Exception {
- LogCustomizer warnLogCustomizer =
- LogCustomizer.forLogger(ActiveDeletedBlobCollectorFactory.class.getName()).enable(Level.WARN)
- .contains("Exception occurred while ")
- .create();
-
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId1", Collections.singleton("/a"));
- bdc.deleted("blobId2", Collections.singleton("/b"));
- bdc.deleted("blobId3", Collections.singleton("/c"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- List<String> externallyDeletedChunks = Lists.newArrayList(blobStore.resolveChunks("blobId2"));
- blobStore.countDeleteChunks(externallyDeletedChunks, 0);
-
- warnLogCustomizer.starting();
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
- blobStore.deletedChunkIds.removeAll(externallyDeletedChunks);
-
- verifyBlobsDeleted("blobId1", "blobId3");
-
- assertEquals("No warn logs must show up: " + warnLogCustomizer.getLogs(), 0, warnLogCustomizer.getLogs().size());
- warnLogCustomizer.finished();
-
- bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId4", Collections.singleton("/d"));
- bdc.deleted("blobId5", Collections.singleton("/e"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- blobStore.resetLists();
- blobStore.failWithDSEForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId4")));
-
- warnLogCustomizer.starting();
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted("blobId3", "blobId5");
-
- assertEquals("No warn logs must show up", 0, warnLogCustomizer.getLogs().size());
- warnLogCustomizer.finished();
-
- bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId6", Collections.singleton("/f"));
- bdc.deleted("blobId7", Collections.singleton("/g"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- blobStore.resetLists();
- blobStore.failWithExceptionForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId6")));
-
- warnLogCustomizer.starting();
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted("blobId5", "blobId7");
-
- assertEquals("General exception must log a warn", 1, warnLogCustomizer.getLogs().size());
- warnLogCustomizer.finished();
- }
-
- @Test
- public void doDebugLogWhileErrorsWhileDeletingBlobs() throws Exception {
- LogCustomizer warnLogCustomizer =
- LogCustomizer.forLogger(ActiveDeletedBlobCollectorFactory.class.getName()).enable(Level.DEBUG)
- .contains("Exception occurred while ")
- .create();
-
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId1", Collections.singleton("/a"));
- bdc.deleted("blobId2", Collections.singleton("/b"));
- bdc.deleted("blobId3", Collections.singleton("/c"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- List<String> externallyDeletedChunks = Lists.newArrayList(blobStore.resolveChunks("blobId2"));
- blobStore.countDeleteChunks(externallyDeletedChunks, 0);
-
- warnLogCustomizer.starting();
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
- blobStore.deletedChunkIds.removeAll(externallyDeletedChunks);
-
- verifyBlobsDeleted("blobId1", "blobId3");
-
- assertEquals("Should log on debug", 1, warnLogCustomizer.getLogs().size());
- warnLogCustomizer.finished();
-
- bdc = adbc.getBlobDeletionCallback();
- bdc.deleted("blobId4", Collections.singleton("/d"));
- bdc.deleted("blobId5", Collections.singleton("/e"));
- bdc.commitProgress(COMMIT_SUCCEDED);
-
- blobStore.resetLists();
- blobStore.failWithDSEForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId4")));
-
- warnLogCustomizer.starting();
- adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
-
- verifyBlobsDeleted("blobId3", "blobId5");
-
- assertEquals("Should log on debug", 1, warnLogCustomizer.getLogs().size());
- warnLogCustomizer.finished();
- }
-
- // OAK-6950
- @Test
- public void pauseMarkingDeletedBlobs() {
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
-
- adbc.flagActiveDeletionUnsafe(true);
- bdc = adbc.getBlobDeletionCallback();
- assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
-
- adbc.flagActiveDeletionUnsafe(false);
- bdc = adbc.getBlobDeletionCallback();
- assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
- }
-
- // OAK-6950
- @Test
- public void pauseMarkingDeletedBlobsNOOP() {
- adbc = ActiveDeletedBlobCollectorFactory.NOOP;
- BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
- assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
-
- adbc.flagActiveDeletionUnsafe(true);
- bdc = adbc.getBlobDeletionCallback();
- assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
-
- adbc.flagActiveDeletionUnsafe(false);
- bdc = adbc.getBlobDeletionCallback();
- assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
- }
-
- private void verifyBlobsDeleted(String ... blobIds) throws IOException {
- List<String> chunkIds = new ArrayList<>();
- for (String blobId : blobIds) {
- chunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks(blobId)));
- }
-
- assertThat(blobStore.deletedChunkIds, containsInAnyOrder(chunkIds.toArray()));
- }
-
- class ChunkDeletionTrackingBlobStore implements GarbageCollectableBlobStore {
- Set<String> deletedChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
- Set<String> failWithDSEForChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
- Set<String> failWithExceptionForChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
- Runnable callback = null;
- volatile boolean markerChunkDeleted = false;
-
- @Override
- public String writeBlob(InputStream in) throws IOException {
- return null;
- }
-
- @Override
- public String writeBlob(InputStream in, BlobOptions options) throws IOException {
- return null;
- }
-
- @Override
- public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
- return 0;
- }
-
- @Override
- public long getBlobLength(String blobId) throws IOException {
- return 0;
- }
-
- @Override
- public InputStream getInputStream(String blobId) throws IOException {
- return null;
- }
-
- @Override
- public String getBlobId(@Nonnull String reference) {
- return null;
- }
-
- @Override
- public String getReference(@Nonnull String blobId) {
- return null;
- }
-
- @Override
- public void setBlockSize(int x) {
-
- }
-
- @Override
- public String writeBlob(String tempFileName) throws IOException {
- return null;
- }
-
- @Override
- public int sweep() throws IOException {
- return 0;
- }
-
- @Override
- public void startMark() throws IOException {
-
- }
-
- @Override
- public void clearInUse() {
-
- }
-
- @Override
- public void clearCache() {
-
- }
-
- @Override
- public long getBlockSizeMin() {
- return 0;
- }
-
- @Override
- public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
- return null;
- }
-
- @Override
- public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
- setMarkerChunkDeletedFlag(chunkIds);
- deletedChunkIds.addAll(chunkIds);
- return true;
- }
-
- @Override
- public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
- setMarkerChunkDeletedFlag(chunkIds);
-
- long count = 0;
- for (String chunkId : chunkIds) {
- if (deletedChunkIds.contains(chunkId)) {
- throw new DataStoreException("Already deleted chunk: " + chunkId);
- } else if (failWithDSEForChunkIds.contains(chunkId)) {
- throw new DataStoreException("Synthetically failing with DSE for chunk: " + chunkId);
- } else if (failWithExceptionForChunkIds.contains(chunkId)) {
- throw new Exception("Synthetically failing with Exception for chunk: " + chunkId);
- }
- deletedChunkIds.add(chunkId);
- count++;
- }
- return count;
- }
-
- private void setMarkerChunkDeletedFlag(List<String> deletedChunkIds) {
- if (!markerChunkDeleted) {
- for (String chunkId : deletedChunkIds) {
- if (chunkId.startsWith("MARKER")) {
- markerChunkDeleted = true;
- break;
- }
-
- if (callback != null) {
- callback.run();
- }
- }
- }
- }
-
- @Override
- public Iterator<String> resolveChunks(String blobId) throws IOException {
- return Iterators.forArray(blobId + "-1", blobId + "-2");
- }
-
- private void resetLists() {
- deletedChunkIds.clear();
- failWithDSEForChunkIds.clear();
- failWithExceptionForChunkIds.clear();
- }
- }
-
- private interface Condition {
- boolean evaluate();
- }
-
- private boolean waitFor(long timeout, Condition c)
- throws InterruptedException {
- long end = System.currentTimeMillis() + timeout;
- long remaining = end - System.currentTimeMillis();
- while (remaining > 0) {
- if (c.evaluate()) {
- return true;
- }
-
- Thread.sleep(100);//The constant is exaggerated
- remaining = end - System.currentTimeMillis();
- }
- return c.evaluate();
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.lucene.directory;
+
+import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_FAILED;
+import static org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeFalse;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.PosixFilePermission;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nonnull;
+
+import ch.qos.logback.classic.Level;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.jackrabbit.core.data.DataStoreException;
+import org.apache.jackrabbit.oak.commons.CIHelper;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollector;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.ActiveDeletedBlobCollectorImpl;
+import org.apache.jackrabbit.oak.plugins.index.lucene.directory.ActiveDeletedBlobCollectorFactory.BlobDeletionCallback;
+import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
+import org.apache.jackrabbit.oak.stats.Clock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.internal.util.collections.Sets;
+
+public class ActiveDeletedBlobCollectorTest {
+ @Rule
+ public TemporaryFolder blobCollectionRoot = new TemporaryFolder(new File("target"));
+
+ private Clock clock;
+ private ChunkDeletionTrackingBlobStore blobStore;
+ private ActiveDeletedBlobCollector adbc;
+
+ @Before
+ public void setup() throws Exception {
+ clock = new Clock.Virtual();
+ blobStore = new ChunkDeletionTrackingBlobStore();
+ createBlobCollector();
+ }
+
+ private void createBlobCollector() {
+ adbc = new ActiveDeletedBlobCollectorImpl(clock,
+ new File(blobCollectionRoot.getRoot(), "/a"), sameThreadExecutor());
+ }
+
+ @Test
+ public void simpleCase() throws Exception {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+
+ bdc.deleted("blobId", Collections.singleton("/a"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted("blobId");
+ }
+
+ @Test
+ public void noopDoesNothing() throws Exception {
+ adbc = ActiveDeletedBlobCollectorFactory.NOOP;
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+
+ bdc.deleted("blobId", Collections.singleton("/a"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted();
+ }
+
+ @Test
+ public void blobTimestampMustBeBiggerThanFileTimestamp() throws Exception {
+ BlobDeletionCallback bdc1 = adbc.getBlobDeletionCallback();
+ bdc1.deleted("blobId1", Collections.singleton("/a"));
+ bdc1.commitProgress(COMMIT_SUCCEDED);
+
+ BlobDeletionCallback bdc2 = adbc.getBlobDeletionCallback();
+ bdc2.deleted("blobId2", Collections.singleton("/b"));
+
+ BlobDeletionCallback bdc3 = adbc.getBlobDeletionCallback();
+ bdc3.deleted("blobId3", Collections.singleton("/c"));
+ bdc3.commitProgress(COMMIT_SUCCEDED);
+
+ long time = clock.getTimeIncreasing();
+ clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
+
+ bdc2.commitProgress(COMMIT_SUCCEDED);
+
+ adbc.purgeBlobsDeleted(time, blobStore);
+
+ //blobId2 is committed later
+ verifyBlobsDeleted("blobId1", "blobId3");
+ }
+
+ @Test
+ public void uncommittedDeletionsMustNotBePurged() throws Exception {
+ BlobDeletionCallback bdc1 = adbc.getBlobDeletionCallback();
+ bdc1.deleted("blobId1", Collections.singleton("/a"));
+ bdc1.commitProgress(COMMIT_FAILED);
+
+ BlobDeletionCallback bdc2 = adbc.getBlobDeletionCallback();
+ bdc2.deleted("blobId2", Collections.singleton("/b"));
+ bdc2.commitProgress(COMMIT_SUCCEDED);
+
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ //blobId2 is committed later
+ verifyBlobsDeleted("blobId2");
+ }
+
+ @Test
+ public void deleteBlobsDespiteFileExplicitlyPurgedBeforeRestart() throws Exception {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId1", Collections.singleton("/a"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
+ createBlobCollector();
+ bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId2", Collections.singleton("/b"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ clock.waitUntil(clock.getTime() + TimeUnit.MINUTES.toMillis(1));
+ createBlobCollector();
+ bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId3", Collections.singleton("/c"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted("blobId1", "blobId2", "blobId3");
+ }
+
+ @Test
+ public void multiThreadedCommits() throws Exception {
+ ExecutorService executorService = Executors.newFixedThreadPool(3);
+ File rootDirectory = new File(blobCollectionRoot.getRoot(), "b");
+ FileUtils.forceMkdir(rootDirectory);
+ adbc = new ActiveDeletedBlobCollectorImpl(clock, rootDirectory, executorService);
+
+ int numThreads = 4;
+ int numBlobsPerThread = 500;
+
+ List<Thread> threads = new ArrayList<>(numThreads);
+ final AtomicInteger threadIndex = new AtomicInteger(0);
+ for (; threadIndex.get() < numThreads; threadIndex.incrementAndGet()) {
+ threads.add(new Thread(new Runnable() {
+ private int thisThreadNum = threadIndex.get();
+ @Override
+ public void run() {
+ int blobCnt = 0;
+ while (blobCnt < numBlobsPerThread) {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ for (; blobCnt < numBlobsPerThread;) {
+ String id = "Thread" + thisThreadNum + "Blob" + blobCnt;
+ bdc.deleted(id, Collections.singleton(id));
+ blobCnt++;
+ if (Math.random() > 0.5) {
+ break;
+ }
+ }
+ bdc.commitProgress(COMMIT_SUCCEDED);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }));
+ }
+
+ for (Thread t : threads) {
+ t.start();
+ }
+
+ for (Thread t : threads) {
+ t.join();
+ }
+
+ boolean timeout = executorService.awaitTermination(100, TimeUnit.MILLISECONDS);
+ assertFalse(timeout);
+
+ List<String> deletedChunks = new ArrayList<>(numThreads*numBlobsPerThread*2);
+ for (int threadNum = 0; threadNum < numThreads; threadNum++) {
+ for (int blobCnt = 0; blobCnt < numBlobsPerThread; blobCnt++) {
+ String id = "Thread" + threadNum + "Blob" + blobCnt;
+ Iterators.addAll(deletedChunks, blobStore.resolveChunks(id));
+ }
+ }
+
+ // Blocking queue doesn't supply all the items immediately.
+ // So, we'd push "MARKER*" blob ids and purge until some marker blob
+ // gets purged. BUT, we'd time-out this activity in 3 seconds
+ long until = Clock.SIMPLE.getTime() + TimeUnit.SECONDS.toMillis(3);
+ List<String> markerChunks = Lists.newArrayList();
+ int i = 0;
+ while (Clock.SIMPLE.getTime() < until) {
+ // Push commit with a marker blob-id and wait for it to be purged
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ String markerBlobId = "MARKER-" + (i++);
+ bdc.deleted(markerBlobId, Lists.newArrayList(markerBlobId));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ Iterators.addAll(markerChunks, blobStore.resolveChunks(markerBlobId));
+ clock.waitUntil(clock.getTime() + TimeUnit.SECONDS.toMillis(5));
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ if (blobStore.markerChunkDeleted) {
+ break;
+ }
+ }
+
+ assertTrue("Timed out while waiting for marker chunk to be purged", blobStore.markerChunkDeleted);
+
+ // don't care how many marker blobs are purged
+ blobStore.deletedChunkIds.removeAll(markerChunks);
+
+ HashSet<String> list = new HashSet<>(deletedChunks);
+ list.removeAll(blobStore.deletedChunkIds);
+ assertTrue("size: " + list.size() + "; list: " + list.toString(), list.isEmpty());
+
+ assertThat(blobStore.deletedChunkIds, containsInAnyOrder(deletedChunks.toArray()));
+ }
+
+ @Test
+ public void inaccessibleWorkDirGivesNoop() throws Exception {
+ assumeFalse(CIHelper.windows());
+
+ File rootDir = blobCollectionRoot.getRoot();
+ File unwritableExistingRootFolder = new File(rootDir, "existingRoot");
+ FileUtils.forceMkdir(unwritableExistingRootFolder);
+ File unwritableNonExistingRootFolder = new File(unwritableExistingRootFolder, "existingRoot");
+
+ Path unwritableExistingPath = FileSystems.getDefault().getPath(unwritableExistingRootFolder.getPath());
+ Files.setPosixFilePermissions(unwritableExistingPath,
+ Sets.newSet(PosixFilePermission.OWNER_READ,
+ PosixFilePermission.GROUP_READ,
+ PosixFilePermission.OTHERS_READ));
+
+ adbc = ActiveDeletedBlobCollectorFactory.newInstance(unwritableExistingRootFolder, sameThreadExecutor());
+ assertEquals("Unwritable existing root folder must have NOOP active blob collector",
+ ActiveDeletedBlobCollectorFactory.NOOP, adbc);
+
+ adbc = ActiveDeletedBlobCollectorFactory.newInstance(unwritableNonExistingRootFolder, sameThreadExecutor());
+ assertEquals("Unwritable non-existing root folder must have NOOP active blob collector",
+ ActiveDeletedBlobCollectorFactory.NOOP, adbc);
+ }
+
+ @Test
+ public void cancellablePurge() throws Exception {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ for (int i = 0; i < 10; i++) {
+ String id = "Blob" + i;
+ bdc.deleted(id, Collections.singleton(id));
+ }
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ Semaphore purgeBlocker = new Semaphore(0);
+ blobStore.callback = () -> purgeBlocker.acquireUninterruptibly();
+ Thread purgeThread = new Thread(() -> {
+ try {
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ purgeThread.setDaemon(true);
+ purgeBlocker.release(10);//allow 5 deletes
+ purgeThread.start();
+
+ boolean deleted5 = waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 10);
+ assertTrue("Deleted " + blobStore.deletedChunkIds.size() + " chunks", deleted5);
+
+ adbc.cancelBlobCollection();
+ purgeBlocker.release(20);//release all that's there... this is more than needed, btw.
+
+ boolean deleted6 = waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 12);
+ assertTrue("Haven't deleted another blob which was locked earlier.", deleted6);
+
+ boolean cancelWorked = waitFor(5000, () -> !purgeThread.isAlive());
+ assertTrue("Cancel didn't let go of purge thread in 2 seconds", cancelWorked);
+
+ assertTrue("Cancelling purge must return asap", blobStore.deletedChunkIds.size() == 12);
+ }
+
+ @Test
+ public void resumeCancelledPurge() throws Exception {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ for (int i = 0; i < 10; i++) {
+ String id = "Blob" + i;
+ bdc.deleted(id, Collections.singleton(id));
+ }
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ Semaphore purgeBlocker = new Semaphore(0);
+ blobStore.callback = () -> purgeBlocker.acquireUninterruptibly();
+ Thread purgeThread = new Thread(() -> {
+ try {
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ purgeThread.setDaemon(true);
+ purgeBlocker.release(10);//allow 5 deletes
+ purgeThread.start();
+
+ waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 10);
+
+ adbc.cancelBlobCollection();
+ purgeBlocker.release(22);//release all that's there... this is more than needed, btw.
+
+ waitFor(5000, () -> blobStore.deletedChunkIds.size() >= 12);
+
+ waitFor(5000, () -> !purgeThread.isAlive());
+
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ // Resume can re-attempt to delete already deleted blobs. Hence, the need for for ">="
+ assertEquals("All blobs must get deleted", 20, blobStore.deletedChunkIds.size());
+ }
+
+ @Test
+ public void dontWarnWhileErrorsWhileDeletingBlobs() throws Exception {
+ LogCustomizer warnLogCustomizer =
+ LogCustomizer.forLogger(ActiveDeletedBlobCollectorFactory.class.getName()).enable(Level.WARN)
+ .contains("Exception occurred while ")
+ .create();
+
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId1", Collections.singleton("/a"));
+ bdc.deleted("blobId2", Collections.singleton("/b"));
+ bdc.deleted("blobId3", Collections.singleton("/c"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ List<String> externallyDeletedChunks = Lists.newArrayList(blobStore.resolveChunks("blobId2"));
+ blobStore.countDeleteChunks(externallyDeletedChunks, 0);
+
+ warnLogCustomizer.starting();
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+ blobStore.deletedChunkIds.removeAll(externallyDeletedChunks);
+
+ verifyBlobsDeleted("blobId1", "blobId3");
+
+ assertEquals("No warn logs must show up: " + warnLogCustomizer.getLogs(), 0, warnLogCustomizer.getLogs().size());
+ warnLogCustomizer.finished();
+
+ bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId4", Collections.singleton("/d"));
+ bdc.deleted("blobId5", Collections.singleton("/e"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ blobStore.resetLists();
+ blobStore.failWithDSEForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId4")));
+
+ warnLogCustomizer.starting();
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted("blobId3", "blobId5");
+
+ assertEquals("No warn logs must show up", 0, warnLogCustomizer.getLogs().size());
+ warnLogCustomizer.finished();
+
+ bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId6", Collections.singleton("/f"));
+ bdc.deleted("blobId7", Collections.singleton("/g"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ blobStore.resetLists();
+ blobStore.failWithExceptionForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId6")));
+
+ warnLogCustomizer.starting();
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted("blobId5", "blobId7");
+
+ assertEquals("General exception must log a warn", 1, warnLogCustomizer.getLogs().size());
+ warnLogCustomizer.finished();
+ }
+
+ @Test
+ public void doDebugLogWhileErrorsWhileDeletingBlobs() throws Exception {
+ LogCustomizer warnLogCustomizer =
+ LogCustomizer.forLogger(ActiveDeletedBlobCollectorFactory.class.getName()).enable(Level.DEBUG)
+ .contains("Exception occurred while ")
+ .create();
+
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId1", Collections.singleton("/a"));
+ bdc.deleted("blobId2", Collections.singleton("/b"));
+ bdc.deleted("blobId3", Collections.singleton("/c"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ List<String> externallyDeletedChunks = Lists.newArrayList(blobStore.resolveChunks("blobId2"));
+ blobStore.countDeleteChunks(externallyDeletedChunks, 0);
+
+ warnLogCustomizer.starting();
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+ blobStore.deletedChunkIds.removeAll(externallyDeletedChunks);
+
+ verifyBlobsDeleted("blobId1", "blobId3");
+
+ assertEquals("Should log on debug", 1, warnLogCustomizer.getLogs().size());
+ warnLogCustomizer.finished();
+
+ bdc = adbc.getBlobDeletionCallback();
+ bdc.deleted("blobId4", Collections.singleton("/d"));
+ bdc.deleted("blobId5", Collections.singleton("/e"));
+ bdc.commitProgress(COMMIT_SUCCEDED);
+
+ blobStore.resetLists();
+ blobStore.failWithDSEForChunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks("blobId4")));
+
+ warnLogCustomizer.starting();
+ adbc.purgeBlobsDeleted(clock.getTimeIncreasing(), blobStore);
+
+ verifyBlobsDeleted("blobId3", "blobId5");
+
+ assertEquals("Should log on debug", 1, warnLogCustomizer.getLogs().size());
+ warnLogCustomizer.finished();
+ }
+
+ // OAK-6950
+ @Test
+ public void pauseMarkingDeletedBlobs() {
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
+
+ adbc.flagActiveDeletionUnsafe(true);
+ bdc = adbc.getBlobDeletionCallback();
+ assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
+
+ adbc.flagActiveDeletionUnsafe(false);
+ bdc = adbc.getBlobDeletionCallback();
+ assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
+ }
+
+ // OAK-6950
+ @Test
+ public void pauseMarkingDeletedBlobsNOOP() {
+ adbc = ActiveDeletedBlobCollectorFactory.NOOP;
+ BlobDeletionCallback bdc = adbc.getBlobDeletionCallback();
+ assertFalse("Active deletion should be safe by default", bdc.isMarkingForActiveDeletionUnsafe());
+
+ adbc.flagActiveDeletionUnsafe(true);
+ bdc = adbc.getBlobDeletionCallback();
+ assertTrue("Active deletion should be unsafe", bdc.isMarkingForActiveDeletionUnsafe());
+
+ adbc.flagActiveDeletionUnsafe(false);
+ bdc = adbc.getBlobDeletionCallback();
+ assertFalse("Active deletion should be safe after unpausing", bdc.isMarkingForActiveDeletionUnsafe());
+ }
+
+ private void verifyBlobsDeleted(String ... blobIds) throws IOException {
+ List<String> chunkIds = new ArrayList<>();
+ for (String blobId : blobIds) {
+ chunkIds.addAll(Lists.newArrayList(blobStore.resolveChunks(blobId)));
+ }
+
+ assertThat(blobStore.deletedChunkIds, containsInAnyOrder(chunkIds.toArray()));
+ }
+
+ class ChunkDeletionTrackingBlobStore implements GarbageCollectableBlobStore {
+ Set<String> deletedChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
+ Set<String> failWithDSEForChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
+ Set<String> failWithExceptionForChunkIds = com.google.common.collect.Sets.newLinkedHashSet();
+ Runnable callback = null;
+ volatile boolean markerChunkDeleted = false;
+
+ @Override
+ public String writeBlob(InputStream in) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String writeBlob(InputStream in, BlobOptions options) throws IOException {
+ return null;
+ }
+
+ @Override
+ public int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public long getBlobLength(String blobId) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public InputStream getInputStream(String blobId) throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getBlobId(@Nonnull String reference) {
+ return null;
+ }
+
+ @Override
+ public String getReference(@Nonnull String blobId) {
+ return null;
+ }
+
+ @Override
+ public void setBlockSize(int x) {
+
+ }
+
+ @Override
+ public String writeBlob(String tempFileName) throws IOException {
+ return null;
+ }
+
+ @Override
+ public int sweep() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void startMark() throws IOException {
+
+ }
+
+ @Override
+ public void clearInUse() {
+
+ }
+
+ @Override
+ public void clearCache() {
+
+ }
+
+ @Override
+ public long getBlockSizeMin() {
+ return 0;
+ }
+
+ @Override
+ public Iterator<String> getAllChunkIds(long maxLastModifiedTime) throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean deleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
+ setMarkerChunkDeletedFlag(chunkIds);
+ deletedChunkIds.addAll(chunkIds);
+ return true;
+ }
+
+ @Override
+ public long countDeleteChunks(List<String> chunkIds, long maxLastModifiedTime) throws Exception {
+ setMarkerChunkDeletedFlag(chunkIds);
+
+ long count = 0;
+ for (String chunkId : chunkIds) {
+ if (deletedChunkIds.contains(chunkId)) {
+ throw new DataStoreException("Already deleted chunk: " + chunkId);
+ } else if (failWithDSEForChunkIds.contains(chunkId)) {
+ throw new DataStoreException("Synthetically failing with DSE for chunk: " + chunkId);
+ } else if (failWithExceptionForChunkIds.contains(chunkId)) {
+ throw new Exception("Synthetically failing with Exception for chunk: " + chunkId);
+ }
+ deletedChunkIds.add(chunkId);
+ count++;
+ }
+ return count;
+ }
+
+ private void setMarkerChunkDeletedFlag(List<String> deletedChunkIds) {
+ if (!markerChunkDeleted) {
+ for (String chunkId : deletedChunkIds) {
+ if (chunkId.startsWith("MARKER")) {
+ markerChunkDeleted = true;
+ break;
+ }
+
+ if (callback != null) {
+ callback.run();
+ }
+ }
+ }
+ }
+
+ @Override
+ public Iterator<String> resolveChunks(String blobId) throws IOException {
+ return Iterators.forArray(blobId + "-1", blobId + "-2");
+ }
+
+ private void resetLists() {
+ deletedChunkIds.clear();
+ failWithDSEForChunkIds.clear();
+ failWithExceptionForChunkIds.clear();
+ }
+ }
+
+ private interface Condition {
+ boolean evaluate();
+ }
+
+ private boolean waitFor(long timeout, Condition c)
+ throws InterruptedException {
+ long end = System.currentTimeMillis() + timeout;
+ long remaining = end - System.currentTimeMillis();
+ while (remaining > 0) {
+ if (c.evaluate()) {
+ return true;
+ }
+
+ Thread.sleep(100);//The constant is exaggerated
+ remaining = end - System.currentTimeMillis();
+ }
+ return c.evaluate();
+ }
+}