You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/01/20 00:30:45 UTC

[geode] 01/01: GEODE-3928: createIndex on existing region creates lucene indexes for existing data

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-3928
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9239278ee28debdbcb24df87c1bfe64868be6029
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Thu Jan 18 16:03:47 2018 -0800

    GEODE-3928: createIndex on existing region creates lucene indexes for existing data
---
 .../lucene/internal/IndexRepositoryFactory.java    |  59 ++++++++++-
 .../cache/lucene/internal/LuceneServiceImpl.java   |  75 ++++++++++++--
 .../apache/geode/cache/lucene/LuceneDUnitTest.java |   8 ++
 .../geode/cache/lucene/LuceneQueriesDUnitTest.java | 109 +++++++++++++++++++++
 4 files changed, 239 insertions(+), 12 deletions(-)

diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index b825940..416b165 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -15,12 +15,15 @@
 package org.apache.geode.cache.lucene.internal;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.lucene.LuceneSerializer;
 import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
 import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
@@ -28,6 +31,9 @@ import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ColocationHelper;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.PartitionRegionConfig;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.logging.LogService;
@@ -36,6 +42,7 @@ public class IndexRepositoryFactory {
 
   private static final Logger logger = LogService.getLogger();
   public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
+  public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
 
   public IndexRepositoryFactory() {}
 
@@ -45,6 +52,14 @@ public class IndexRepositoryFactory {
     LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
     final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
 
+    // We need to ensure that all members have created the fileAndChunk region before continuing
+    Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
+    PartitionRegionConfig prConfig =
+        (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+    while (!prConfig.isColocationComplete()) {
+      prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+    }
+
     BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
     BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
     boolean success = false;
@@ -77,14 +92,37 @@ public class IndexRepositoryFactory {
     }
 
     final IndexRepository repo;
+    final String indexCompleteKey = getIndexingCompleteKey(fileAndChunkBucket);
     try {
-      RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileAndChunkBucket, bucketId),
-          indexForPR.getFileSystemStats());
+      // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
+      Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
+      RegionDirectory dir =
+          new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
       IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
       IndexWriter writer = new IndexWriter(dir, config);
       repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer,
           indexForPR.getIndexStats(), dataBucket, lockService, lockName, indexForPR);
-      success = true;
+      success = false;
+      // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+      if (null != fileRegion.get(indexCompleteKey, bucketId)) {
+        success = true;
+        return repo;
+      } else {
+        Iterator keysIterator = dataBucket.keySet().iterator();
+        while (keysIterator.hasNext()) {
+          Object key = keysIterator.next();
+          Object value = getValue(userRegion.getEntry(key));
+          if (value != null) {
+            repo.update(key, value);
+          } else {
+            repo.delete(key);
+          }
+        }
+        repo.commit();
+        // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+        fileRegion.put(indexCompleteKey, APACHE_GEODE_INDEX_COMPLETE, bucketId);
+        success = true;
+      }
       return repo;
     } catch (IOException e) {
       logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId
@@ -98,6 +136,17 @@ public class IndexRepositoryFactory {
 
   }
 
+  private Object getValue(Region.Entry entry) {
+    final EntrySnapshot es = (EntrySnapshot) entry;
+    Object value;
+    try {
+      value = es == null ? null : es.getRawValue(true);
+    } catch (EntryDestroyedException e) {
+      value = null;
+    }
+    return value;
+  }
+
   private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
     return new BucketTargetingMap(region, bucketId);
   }
@@ -106,6 +155,10 @@ public class IndexRepositoryFactory {
     return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath();
   }
 
+  private String getIndexingCompleteKey(final BucketRegion fileAndChunkBucket) {
+    return APACHE_GEODE_INDEX_COMPLETE + fileAndChunkBucket.getFullPath();
+  }
+
   private DistributedLockService getLockService() {
     return DistributedLockService
         .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index e7813af..1cafa12 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -15,10 +15,15 @@
 
 package org.apache.geode.cache.lucene.internal;
 
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,12 +36,18 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.store.AlreadyClosedException;
 
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.AttributesMutator;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryDestroyedException;
 import org.apache.geode.cache.EvictionAlgorithm;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.FunctionService;
@@ -58,15 +69,23 @@ import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
 import org.apache.geode.cache.lucene.internal.filesystem.File;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
 import org.apache.geode.cache.lucene.internal.results.PageResults;
 import org.apache.geode.cache.lucene.internal.xml.LuceneServiceXmlGenerator;
+import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.EntrySnapshot;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
 import org.apache.geode.internal.cache.RegionListener;
 import org.apache.geode.internal.cache.extension.Extensible;
 import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
@@ -228,27 +247,24 @@ public class LuceneServiceImpl implements InternalLuceneService {
       regionPath = "/" + regionPath;
     }
 
-    registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
-        regionPath, fields, analyzer, fieldAnalyzers, serializer));
-
+    // If the region does not yet exist, install LuceneRegionListener and return
     PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
-
-    LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
-        regionPath, fields, analyzer, fieldAnalyzers, serializer);
     if (region == null) {
+      registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
+          regionPath, fields, analyzer, fieldAnalyzers, serializer));
+      LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
+          regionPath, fields, analyzer, fieldAnalyzers, serializer);
       cache.addRegionListener(regionListener);
       return;
     }
 
     if (!allowOnExistingRegion) {
-      definedIndexMap.remove(LuceneServiceImpl.getUniqueIndexName(indexName, regionPath));
       throw new IllegalStateException("The lucene index must be created before region");
     }
 
-
+    // do work normally handled by LuceneRegionListener (if region already exists)
     createIndexOnExistingRegion(region, indexName, regionPath, fields, analyzer, fieldAnalyzers,
         serializer);
-
   }
 
   private void createIndexOnExistingRegion(PartitionedRegion region, String indexName,
@@ -266,6 +282,47 @@ public class LuceneServiceImpl implements InternalLuceneService {
         region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields);
 
     afterDataRegionCreated(luceneIndex);
+
+    createLuceneIndexOnDataRegion(region, luceneIndex);
+  }
+
+  protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion userRegion,
+      final LuceneIndexImpl luceneIndex) {
+    // Try to get a PDX instance if possible, rather than a deserialized object
+    DefaultQuery.setPdxReadSerialized(true);
+    try {
+      AbstractPartitionedRepositoryManager repositoryManager =
+          (AbstractPartitionedRepositoryManager) luceneIndex.getRepositoryManager();
+      if (userRegion.getDataStore() == null) {
+        return true;
+      }
+      Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds();
+      Iterator primaryBucketIterator = primaryBucketIds.iterator();
+      while (primaryBucketIterator.hasNext()) {
+        int primaryBucketId = (Integer) primaryBucketIterator.next();
+        try {
+          BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(primaryBucketId);
+          if (!userBucket.isEmpty()) {
+            repositoryManager.getRepository(primaryBucketId);
+          }
+        } catch (BucketNotFoundException | PrimaryBucketException e) {
+          logger.debug("Bucket ID : " + primaryBucketId
+              + " not found while saving to lucene index: " + e.getMessage(), e);
+        }
+      }
+      return true;
+    } catch (RegionDestroyedException e) {
+      logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e);
+      return false;
+    } catch (CacheClosedException e) {
+      logger.debug("Unable to save to lucene index, cache has been closed", e);
+      return false;
+    } catch (AlreadyClosedException e) {
+      logger.debug("Unable to commit, the lucene index is already closed", e);
+      return false;
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
   }
 
   static void validateRegionAttributes(RegionAttributes attrs) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
index c3f5dc2..44962f8 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
@@ -61,6 +61,14 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase {
     regionTestType.createAccessor(getCache(), REGION_NAME);
   }
 
+  protected void initDataStore(RegionTestableType regionTestType) throws Exception {
+    regionTestType.createDataStore(getCache(), REGION_NAME);
+  }
+
+  protected void initAccessor(RegionTestableType regionTestType) throws Exception {
+    regionTestType.createAccessor(getCache(), REGION_NAME);
+  }
+
   protected RegionTestableType[] getListOfRegionTestTypes() {
     return new RegionTestableType[] {RegionTestableType.PARTITION,
         RegionTestableType.PARTITION_REDUNDANT, RegionTestableType.PARTITION_OVERFLOW_TO_DISK,
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
index 08e0e05..fc87fcc 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
@@ -28,10 +28,16 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.lucene.internal.LuceneIndexFactoryImpl;
 import org.apache.geode.cache.lucene.internal.LuceneQueryImpl;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -101,6 +107,109 @@ public class LuceneQueriesDUnitTest extends LuceneQueriesAccessorBase {
     executeTextSearch(accessor);
   }
 
+  private void destroyIndex() {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    luceneService.destroyIndex(INDEX_NAME, REGION_NAME);
+  }
+
+  private void recreateIndex() {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    LuceneIndexFactoryImpl indexFactory =
+        (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+    indexFactory.create(INDEX_NAME, REGION_NAME, true);
+  };
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void dropAndRecreateIndex(RegionTestableType regionTestType) throws Exception {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndexFactory().addField("text").create(INDEX_NAME, REGION_NAME);
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex, regionTestType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionTestType));
+    accessor.invoke(() -> initAccessor(createIndex, regionTestType));
+
+    putDataInRegion(accessor);
+    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+    executeTextSearch(accessor);
+
+    dataStore1.invoke(() -> destroyIndex());
+
+    // re-index stored data
+    AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    // re-index stored data
+    AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+
+    executeTextSearch(accessor);
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void reindexThenQuery(RegionTestableType regionTestType) throws Exception {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      LuceneIndexFactoryImpl indexFactory =
+          (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+      indexFactory.create(INDEX_NAME, REGION_NAME, true);
+    };
+
+    // Create dataRegion prior to index
+    dataStore1.invoke(() -> initDataStore(regionTestType));
+    dataStore2.invoke(() -> initDataStore(regionTestType));
+    accessor.invoke(() -> initAccessor(regionTestType));
+
+    // populate region
+    accessor.invoke(() -> {
+      Region<Object, Object> region = getCache().getRegion(REGION_NAME);
+      region.put(1, new TestObject("hello world"));
+      region.put(113, new TestObject("hi world"));
+      region.put(2, new TestObject("goodbye world"));
+    });
+
+    // re-index stored data
+    AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    // re-index stored data
+    AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+
+    executeTextSearch(accessor);
+  }
+
   @Test
   @Parameters(method = "getListOfRegionTestTypes")
   public void defaultFieldShouldPropogateCorrectlyThroughFunction(

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.