You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/01/20 00:30:45 UTC
[geode] 01/01: GEODE-3928: createIndex on existing region creates
lucene indexes for existing data
This is an automated email from the ASF dual-hosted git repository.
ladyvader pushed a commit to branch feature/GEODE-3928
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 9239278ee28debdbcb24df87c1bfe64868be6029
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Thu Jan 18 16:03:47 2018 -0800
GEODE-3928: createIndex on existing region creates lucene indexes for existing data
---
.../lucene/internal/IndexRepositoryFactory.java | 59 ++++++++++-
.../cache/lucene/internal/LuceneServiceImpl.java | 75 ++++++++++++--
.../apache/geode/cache/lucene/LuceneDUnitTest.java | 8 ++
.../geode/cache/lucene/LuceneQueriesDUnitTest.java | 109 +++++++++++++++++++++
4 files changed, 239 insertions(+), 12 deletions(-)
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index b825940..416b165 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -15,12 +15,15 @@
package org.apache.geode.cache.lucene.internal;
import java.io.IOException;
+import java.util.Iterator;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Region;
import org.apache.geode.cache.lucene.LuceneSerializer;
import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
@@ -28,6 +31,9 @@ import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ColocationHelper;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.PartitionRegionConfig;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.logging.LogService;
@@ -36,6 +42,7 @@ public class IndexRepositoryFactory {
private static final Logger logger = LogService.getLogger();
public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
+ public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
public IndexRepositoryFactory() {}
@@ -45,6 +52,14 @@ public class IndexRepositoryFactory {
LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
+ // We need to ensure that all members have created the fileAndChunk region before continuing
+ Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
+ PartitionRegionConfig prConfig =
+ (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+ while (!prConfig.isColocationComplete()) {
+ prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+ }
+
BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
boolean success = false;
@@ -77,14 +92,37 @@ public class IndexRepositoryFactory {
}
final IndexRepository repo;
+ final String indexCompleteKey = getIndexingCompleteKey(fileAndChunkBucket);
try {
- RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileAndChunkBucket, bucketId),
- indexForPR.getFileSystemStats());
+ // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
+ Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
+ RegionDirectory dir =
+ new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
IndexWriter writer = new IndexWriter(dir, config);
repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer,
indexForPR.getIndexStats(), dataBucket, lockService, lockName, indexForPR);
- success = true;
+ success = false;
+ // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+ if (null != fileRegion.get(indexCompleteKey, bucketId)) {
+ success = true;
+ return repo;
+ } else {
+ Iterator keysIterator = dataBucket.keySet().iterator();
+ while (keysIterator.hasNext()) {
+ Object key = keysIterator.next();
+ Object value = getValue(userRegion.getEntry(key));
+ if (value != null) {
+ repo.update(key, value);
+ } else {
+ repo.delete(key);
+ }
+ }
+ repo.commit();
+ // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+ fileRegion.put(indexCompleteKey, APACHE_GEODE_INDEX_COMPLETE, bucketId);
+ success = true;
+ }
return repo;
} catch (IOException e) {
logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId
@@ -98,6 +136,17 @@ public class IndexRepositoryFactory {
}
+ private Object getValue(Region.Entry entry) {
+ final EntrySnapshot es = (EntrySnapshot) entry;
+ Object value;
+ try {
+ value = es == null ? null : es.getRawValue(true);
+ } catch (EntryDestroyedException e) {
+ value = null;
+ }
+ return value;
+ }
+
private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
return new BucketTargetingMap(region, bucketId);
}
@@ -106,6 +155,10 @@ public class IndexRepositoryFactory {
return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath();
}
+ private String getIndexingCompleteKey(final BucketRegion fileAndChunkBucket) {
+ return APACHE_GEODE_INDEX_COMPLETE + fileAndChunkBucket.getFullPath();
+ }
+
private DistributedLockService getLockService() {
return DistributedLockService
.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index e7813af..1cafa12 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -15,10 +15,15 @@
package org.apache.geode.cache.lucene.internal;
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -31,12 +36,18 @@ import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.store.AlreadyClosedException;
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.EvictionAlgorithm;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.FunctionService;
@@ -58,15 +69,23 @@ import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
import org.apache.geode.cache.lucene.internal.filesystem.File;
import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
import org.apache.geode.cache.lucene.internal.results.PageResults;
import org.apache.geode.cache.lucene.internal.xml.LuceneServiceXmlGenerator;
+import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.DSFIDFactory;
import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.EntrySnapshot;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
import org.apache.geode.internal.cache.RegionListener;
import org.apache.geode.internal.cache.extension.Extensible;
import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
@@ -228,27 +247,24 @@ public class LuceneServiceImpl implements InternalLuceneService {
regionPath = "/" + regionPath;
}
- registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
- regionPath, fields, analyzer, fieldAnalyzers, serializer));
-
+ // If the region does not yet exist, install LuceneRegionListener and return
PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
-
- LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
- regionPath, fields, analyzer, fieldAnalyzers, serializer);
if (region == null) {
+ registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
+ regionPath, fields, analyzer, fieldAnalyzers, serializer));
+ LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
+ regionPath, fields, analyzer, fieldAnalyzers, serializer);
cache.addRegionListener(regionListener);
return;
}
if (!allowOnExistingRegion) {
- definedIndexMap.remove(LuceneServiceImpl.getUniqueIndexName(indexName, regionPath));
throw new IllegalStateException("The lucene index must be created before region");
}
-
+ // do work normally handled by LuceneRegionListener (if region already exists)
createIndexOnExistingRegion(region, indexName, regionPath, fields, analyzer, fieldAnalyzers,
serializer);
-
}
private void createIndexOnExistingRegion(PartitionedRegion region, String indexName,
@@ -266,6 +282,47 @@ public class LuceneServiceImpl implements InternalLuceneService {
region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields);
afterDataRegionCreated(luceneIndex);
+
+ createLuceneIndexOnDataRegion(region, luceneIndex);
+ }
+
+ protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion userRegion,
+ final LuceneIndexImpl luceneIndex) {
+ // Try to get a PDX instance if possible, rather than a deserialized object
+ DefaultQuery.setPdxReadSerialized(true);
+ try {
+ AbstractPartitionedRepositoryManager repositoryManager =
+ (AbstractPartitionedRepositoryManager) luceneIndex.getRepositoryManager();
+ if (userRegion.getDataStore() == null) {
+ return true;
+ }
+ Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds();
+ Iterator primaryBucketIterator = primaryBucketIds.iterator();
+ while (primaryBucketIterator.hasNext()) {
+ int primaryBucketId = (Integer) primaryBucketIterator.next();
+ try {
+ BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(primaryBucketId);
+ if (!userBucket.isEmpty()) {
+ repositoryManager.getRepository(primaryBucketId);
+ }
+ } catch (BucketNotFoundException | PrimaryBucketException e) {
+ logger.debug("Bucket ID : " + primaryBucketId
+ + " not found while saving to lucene index: " + e.getMessage(), e);
+ }
+ }
+ return true;
+ } catch (RegionDestroyedException e) {
+ logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e);
+ return false;
+ } catch (CacheClosedException e) {
+ logger.debug("Unable to save to lucene index, cache has been closed", e);
+ return false;
+ } catch (AlreadyClosedException e) {
+ logger.debug("Unable to commit, the lucene index is already closed", e);
+ return false;
+ } finally {
+ DefaultQuery.setPdxReadSerialized(false);
+ }
}
static void validateRegionAttributes(RegionAttributes attrs) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
index c3f5dc2..44962f8 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
@@ -61,6 +61,14 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase {
regionTestType.createAccessor(getCache(), REGION_NAME);
}
+ protected void initDataStore(RegionTestableType regionTestType) throws Exception {
+ regionTestType.createDataStore(getCache(), REGION_NAME);
+ }
+
+ protected void initAccessor(RegionTestableType regionTestType) throws Exception {
+ regionTestType.createAccessor(getCache(), REGION_NAME);
+ }
+
protected RegionTestableType[] getListOfRegionTestTypes() {
return new RegionTestableType[] {RegionTestableType.PARTITION,
RegionTestableType.PARTITION_REDUNDANT, RegionTestableType.PARTITION_OVERFLOW_TO_DISK,
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
index 08e0e05..fc87fcc 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
@@ -28,10 +28,16 @@ import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.lucene.internal.LuceneIndexFactoryImpl;
import org.apache.geode.cache.lucene.internal.LuceneQueryImpl;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.SerializableRunnableIF;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.junit.categories.DistributedTest;
@@ -101,6 +107,109 @@ public class LuceneQueriesDUnitTest extends LuceneQueriesAccessorBase {
executeTextSearch(accessor);
}
+ private void destroyIndex() {
+ LuceneService luceneService = LuceneServiceProvider.get(getCache());
+ luceneService.destroyIndex(INDEX_NAME, REGION_NAME);
+ }
+
+ private void recreateIndex() {
+ LuceneService luceneService = LuceneServiceProvider.get(getCache());
+ LuceneIndexFactoryImpl indexFactory =
+ (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+ indexFactory.create(INDEX_NAME, REGION_NAME, true);
+ };
+
+ @Test
+ @Parameters(method = "getListOfRegionTestTypes")
+ public void dropAndRecreateIndex(RegionTestableType regionTestType) throws Exception {
+ SerializableRunnableIF createIndex = () -> {
+ LuceneService luceneService = LuceneServiceProvider.get(getCache());
+ luceneService.createIndexFactory().addField("text").create(INDEX_NAME, REGION_NAME);
+ };
+ dataStore1.invoke(() -> initDataStore(createIndex, regionTestType));
+ dataStore2.invoke(() -> initDataStore(createIndex, regionTestType));
+ accessor.invoke(() -> initAccessor(createIndex, regionTestType));
+
+ putDataInRegion(accessor);
+ assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+ assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+ executeTextSearch(accessor);
+
+ dataStore1.invoke(() -> destroyIndex());
+
+ // re-index stored data
+ AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ // re-index stored data
+ AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ ai1.join();
+ ai2.join();
+ ai3.join();
+
+ ai1.checkException();
+ ai2.checkException();
+ ai3.checkException();
+
+ executeTextSearch(accessor);
+ }
+
+ @Test
+ @Parameters(method = "getListOfRegionTestTypes")
+ public void reindexThenQuery(RegionTestableType regionTestType) throws Exception {
+ SerializableRunnableIF createIndex = () -> {
+ LuceneService luceneService = LuceneServiceProvider.get(getCache());
+ LuceneIndexFactoryImpl indexFactory =
+ (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+ indexFactory.create(INDEX_NAME, REGION_NAME, true);
+ };
+
+ // Create dataRegion prior to index
+ dataStore1.invoke(() -> initDataStore(regionTestType));
+ dataStore2.invoke(() -> initDataStore(regionTestType));
+ accessor.invoke(() -> initAccessor(regionTestType));
+
+ // populate region
+ accessor.invoke(() -> {
+ Region<Object, Object> region = getCache().getRegion(REGION_NAME);
+ region.put(1, new TestObject("hello world"));
+ region.put(113, new TestObject("hi world"));
+ region.put(2, new TestObject("goodbye world"));
+ });
+
+ // re-index stored data
+ AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ // re-index stored data
+ AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+ recreateIndex();
+ });
+
+ ai1.join();
+ ai2.join();
+ ai3.join();
+
+ ai1.checkException();
+ ai2.checkException();
+ ai3.checkException();
+
+ executeTextSearch(accessor);
+ }
+
@Test
@Parameters(method = "getListOfRegionTestTypes")
public void defaultFieldShouldPropogateCorrectlyThroughFunction(
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.