You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/01/20 00:30:44 UTC

[geode] branch feature/GEODE-3928 updated (a952781 -> 9239278)

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a change to branch feature/GEODE-3928
in repository https://gitbox.apache.org/repos/asf/geode.git.


 discard a952781  GEODE-3928: createIndex on existing region creates lucene indexes for existing data
     new 9239278  GEODE-3928: createIndex on existing region creates lucene indexes for existing data

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (a952781)
            \
             N -- N -- N   refs/heads/feature/GEODE-3928 (9239278)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java   | 1 -
 1 file changed, 1 deletion(-)

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].

[geode] 01/01: GEODE-3928: createIndex on existing region creates lucene indexes for existing data

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch feature/GEODE-3928
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 9239278ee28debdbcb24df87c1bfe64868be6029
Author: Lynn Hughes-Godfrey <lh...@pivotal.io>
AuthorDate: Thu Jan 18 16:03:47 2018 -0800

    GEODE-3928: createIndex on existing region creates lucene indexes for existing data
---
 .../lucene/internal/IndexRepositoryFactory.java    |  59 ++++++++++-
 .../cache/lucene/internal/LuceneServiceImpl.java   |  75 ++++++++++++--
 .../apache/geode/cache/lucene/LuceneDUnitTest.java |   8 ++
 .../geode/cache/lucene/LuceneQueriesDUnitTest.java | 109 +++++++++++++++++++++
 4 files changed, 239 insertions(+), 12 deletions(-)

diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
index b825940..416b165 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/IndexRepositoryFactory.java
@@ -15,12 +15,15 @@
 package org.apache.geode.cache.lucene.internal;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.logging.log4j.Logger;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.IndexWriterConfig;
 
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.lucene.LuceneSerializer;
 import org.apache.geode.cache.lucene.internal.directory.RegionDirectory;
 import org.apache.geode.cache.lucene.internal.partition.BucketTargetingMap;
@@ -28,6 +31,9 @@ import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.repository.IndexRepositoryImpl;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.ColocationHelper;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.PartitionRegionConfig;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.logging.LogService;
@@ -36,6 +42,7 @@ public class IndexRepositoryFactory {
 
   private static final Logger logger = LogService.getLogger();
   public static final String FILE_REGION_LOCK_FOR_BUCKET_ID = "FileRegionLockForBucketId:";
+  public static final String APACHE_GEODE_INDEX_COMPLETE = "APACHE_GEODE_INDEX_COMPLETE";
 
   public IndexRepositoryFactory() {}
 
@@ -45,6 +52,14 @@ public class IndexRepositoryFactory {
     LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
     final PartitionedRegion fileRegion = indexForPR.getFileAndChunkRegion();
 
+    // We need to ensure that all members have created the fileAndChunk region before continuing
+    Region prRoot = PartitionedRegionHelper.getPRRoot(fileRegion.getCache());
+    PartitionRegionConfig prConfig =
+        (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+    while (!prConfig.isColocationComplete()) {
+      prConfig = (PartitionRegionConfig) prRoot.get(fileRegion.getRegionIdentifier());
+    }
+
     BucketRegion fileAndChunkBucket = getMatchingBucket(fileRegion, bucketId);
     BucketRegion dataBucket = getMatchingBucket(userRegion, bucketId);
     boolean success = false;
@@ -77,14 +92,37 @@ public class IndexRepositoryFactory {
     }
 
     final IndexRepository repo;
+    final String indexCompleteKey = getIndexingCompleteKey(fileAndChunkBucket);
     try {
-      RegionDirectory dir = new RegionDirectory(getBucketTargetingMap(fileAndChunkBucket, bucketId),
-          indexForPR.getFileSystemStats());
+      // bucketTargetingMap handles partition resolver (via bucketId as callbackArg)
+      Map bucketTargetingMap = getBucketTargetingMap(fileAndChunkBucket, bucketId);
+      RegionDirectory dir =
+          new RegionDirectory(bucketTargetingMap, indexForPR.getFileSystemStats());
       IndexWriterConfig config = new IndexWriterConfig(indexForPR.getAnalyzer());
       IndexWriter writer = new IndexWriter(dir, config);
       repo = new IndexRepositoryImpl(fileAndChunkBucket, writer, serializer,
           indexForPR.getIndexStats(), dataBucket, lockService, lockName, indexForPR);
-      success = true;
+      success = false;
+      // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+      if (null != fileRegion.get(indexCompleteKey, bucketId)) {
+        success = true;
+        return repo;
+      } else {
+        Iterator keysIterator = dataBucket.keySet().iterator();
+        while (keysIterator.hasNext()) {
+          Object key = keysIterator.next();
+          Object value = getValue(userRegion.getEntry(key));
+          if (value != null) {
+            repo.update(key, value);
+          } else {
+            repo.delete(key);
+          }
+        }
+        repo.commit();
+        // fileRegion ops (get/put) need bucketId as a callbackArg for PartitionResolver
+        fileRegion.put(indexCompleteKey, APACHE_GEODE_INDEX_COMPLETE, bucketId);
+        success = true;
+      }
       return repo;
     } catch (IOException e) {
       logger.info("Exception thrown while constructing Lucene Index for bucket:" + bucketId
@@ -98,6 +136,17 @@ public class IndexRepositoryFactory {
 
   }
 
+  private Object getValue(Region.Entry entry) {
+    final EntrySnapshot es = (EntrySnapshot) entry;
+    Object value;
+    try {
+      value = es == null ? null : es.getRawValue(true);
+    } catch (EntryDestroyedException e) {
+      value = null;
+    }
+    return value;
+  }
+
   private Map getBucketTargetingMap(BucketRegion region, int bucketId) {
     return new BucketTargetingMap(region, bucketId);
   }
@@ -106,6 +155,10 @@ public class IndexRepositoryFactory {
     return FILE_REGION_LOCK_FOR_BUCKET_ID + fileAndChunkBucket.getFullPath();
   }
 
+  private String getIndexingCompleteKey(final BucketRegion fileAndChunkBucket) {
+    return APACHE_GEODE_INDEX_COMPLETE + fileAndChunkBucket.getFullPath();
+  }
+
   private DistributedLockService getLockService() {
     return DistributedLockService
         .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index e7813af..1cafa12 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -15,10 +15,15 @@
 
 package org.apache.geode.cache.lucene.internal;
 
+import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -31,12 +36,18 @@ import org.apache.logging.log4j.Logger;
 import org.apache.lucene.analysis.Analyzer;
 import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
 import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.store.AlreadyClosedException;
 
+import org.apache.geode.InternalGemFireError;
+import org.apache.geode.cache.AttributesMutator;
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.EntryDestroyedException;
 import org.apache.geode.cache.EvictionAlgorithm;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.FunctionService;
@@ -58,15 +69,23 @@ import org.apache.geode.cache.lucene.internal.filesystem.ChunkKey;
 import org.apache.geode.cache.lucene.internal.filesystem.File;
 import org.apache.geode.cache.lucene.internal.management.LuceneServiceMBean;
 import org.apache.geode.cache.lucene.internal.management.ManagementIndexListener;
+import org.apache.geode.cache.lucene.internal.repository.IndexRepository;
 import org.apache.geode.cache.lucene.internal.results.LuceneGetPageFunction;
 import org.apache.geode.cache.lucene.internal.results.PageResults;
 import org.apache.geode.cache.lucene.internal.xml.LuceneServiceXmlGenerator;
+import org.apache.geode.cache.query.internal.DefaultQuery;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.DSFIDFactory;
 import org.apache.geode.internal.DataSerializableFixedID;
+import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.BucketNotFoundException;
+import org.apache.geode.internal.cache.BucketRegion;
 import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.EntrySnapshot;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PrimaryBucketException;
 import org.apache.geode.internal.cache.RegionListener;
 import org.apache.geode.internal.cache.extension.Extensible;
 import org.apache.geode.internal.cache.xmlcache.XmlGenerator;
@@ -228,27 +247,24 @@ public class LuceneServiceImpl implements InternalLuceneService {
       regionPath = "/" + regionPath;
     }
 
-    registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
-        regionPath, fields, analyzer, fieldAnalyzers, serializer));
-
+    // If the region does not yet exist, install LuceneRegionListener and return
     PartitionedRegion region = (PartitionedRegion) cache.getRegion(regionPath);
-
-    LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
-        regionPath, fields, analyzer, fieldAnalyzers, serializer);
     if (region == null) {
+      registerDefinedIndex(indexName, regionPath, new LuceneIndexCreationProfile(indexName,
+          regionPath, fields, analyzer, fieldAnalyzers, serializer));
+      LuceneRegionListener regionListener = new LuceneRegionListener(this, cache, indexName,
+          regionPath, fields, analyzer, fieldAnalyzers, serializer);
       cache.addRegionListener(regionListener);
       return;
     }
 
     if (!allowOnExistingRegion) {
-      definedIndexMap.remove(LuceneServiceImpl.getUniqueIndexName(indexName, regionPath));
       throw new IllegalStateException("The lucene index must be created before region");
     }
 
-
+    // do work normally handled by LuceneRegionListener (if region already exists)
     createIndexOnExistingRegion(region, indexName, regionPath, fields, analyzer, fieldAnalyzers,
         serializer);
-
   }
 
   private void createIndexOnExistingRegion(PartitionedRegion region, String indexName,
@@ -266,6 +282,47 @@ public class LuceneServiceImpl implements InternalLuceneService {
         region.getAttributes(), analyzer, fieldAnalyzers, aeqId, serializer, fields);
 
     afterDataRegionCreated(luceneIndex);
+
+    createLuceneIndexOnDataRegion(region, luceneIndex);
+  }
+
+  protected boolean createLuceneIndexOnDataRegion(final PartitionedRegion userRegion,
+      final LuceneIndexImpl luceneIndex) {
+    // Try to get a PDX instance if possible, rather than a deserialized object
+    DefaultQuery.setPdxReadSerialized(true);
+    try {
+      AbstractPartitionedRepositoryManager repositoryManager =
+          (AbstractPartitionedRepositoryManager) luceneIndex.getRepositoryManager();
+      if (userRegion.getDataStore() == null) {
+        return true;
+      }
+      Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds();
+      Iterator primaryBucketIterator = primaryBucketIds.iterator();
+      while (primaryBucketIterator.hasNext()) {
+        int primaryBucketId = (Integer) primaryBucketIterator.next();
+        try {
+          BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(primaryBucketId);
+          if (!userBucket.isEmpty()) {
+            repositoryManager.getRepository(primaryBucketId);
+          }
+        } catch (BucketNotFoundException | PrimaryBucketException e) {
+          logger.debug("Bucket ID : " + primaryBucketId
+              + " not found while saving to lucene index: " + e.getMessage(), e);
+        }
+      }
+      return true;
+    } catch (RegionDestroyedException e) {
+      logger.debug("Bucket not found while saving to lucene index: " + e.getMessage(), e);
+      return false;
+    } catch (CacheClosedException e) {
+      logger.debug("Unable to save to lucene index, cache has been closed", e);
+      return false;
+    } catch (AlreadyClosedException e) {
+      logger.debug("Unable to commit, the lucene index is already closed", e);
+      return false;
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
   }
 
   static void validateRegionAttributes(RegionAttributes attrs) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
index c3f5dc2..44962f8 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneDUnitTest.java
@@ -61,6 +61,14 @@ public abstract class LuceneDUnitTest extends JUnit4CacheTestCase {
     regionTestType.createAccessor(getCache(), REGION_NAME);
   }
 
+  protected void initDataStore(RegionTestableType regionTestType) throws Exception {
+    regionTestType.createDataStore(getCache(), REGION_NAME);
+  }
+
+  protected void initAccessor(RegionTestableType regionTestType) throws Exception {
+    regionTestType.createAccessor(getCache(), REGION_NAME);
+  }
+
   protected RegionTestableType[] getListOfRegionTestTypes() {
     return new RegionTestableType[] {RegionTestableType.PARTITION,
         RegionTestableType.PARTITION_REDUNDANT, RegionTestableType.PARTITION_OVERFLOW_TO_DISK,
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
index 08e0e05..fc87fcc 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesDUnitTest.java
@@ -28,10 +28,16 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.lucene.internal.LuceneIndexFactoryImpl;
 import org.apache.geode.cache.lucene.internal.LuceneQueryImpl;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
 import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.SerializableRunnableIF;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.junit.categories.DistributedTest;
@@ -101,6 +107,109 @@ public class LuceneQueriesDUnitTest extends LuceneQueriesAccessorBase {
     executeTextSearch(accessor);
   }
 
+  private void destroyIndex() {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    luceneService.destroyIndex(INDEX_NAME, REGION_NAME);
+  }
+
+  private void recreateIndex() {
+    LuceneService luceneService = LuceneServiceProvider.get(getCache());
+    LuceneIndexFactoryImpl indexFactory =
+        (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+    indexFactory.create(INDEX_NAME, REGION_NAME, true);
+  };
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void dropAndRecreateIndex(RegionTestableType regionTestType) throws Exception {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      luceneService.createIndexFactory().addField("text").create(INDEX_NAME, REGION_NAME);
+    };
+    dataStore1.invoke(() -> initDataStore(createIndex, regionTestType));
+    dataStore2.invoke(() -> initDataStore(createIndex, regionTestType));
+    accessor.invoke(() -> initAccessor(createIndex, regionTestType));
+
+    putDataInRegion(accessor);
+    assertTrue(waitForFlushBeforeExecuteTextSearch(accessor, 60000));
+    assertTrue(waitForFlushBeforeExecuteTextSearch(dataStore1, 60000));
+    executeTextSearch(accessor);
+
+    dataStore1.invoke(() -> destroyIndex());
+
+    // re-index stored data
+    AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    // re-index stored data
+    AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+
+    executeTextSearch(accessor);
+  }
+
+  @Test
+  @Parameters(method = "getListOfRegionTestTypes")
+  public void reindexThenQuery(RegionTestableType regionTestType) throws Exception {
+    SerializableRunnableIF createIndex = () -> {
+      LuceneService luceneService = LuceneServiceProvider.get(getCache());
+      LuceneIndexFactoryImpl indexFactory =
+          (LuceneIndexFactoryImpl) luceneService.createIndexFactory().addField("text");
+      indexFactory.create(INDEX_NAME, REGION_NAME, true);
+    };
+
+    // Create dataRegion prior to index
+    dataStore1.invoke(() -> initDataStore(regionTestType));
+    dataStore2.invoke(() -> initDataStore(regionTestType));
+    accessor.invoke(() -> initAccessor(regionTestType));
+
+    // populate region
+    accessor.invoke(() -> {
+      Region<Object, Object> region = getCache().getRegion(REGION_NAME);
+      region.put(1, new TestObject("hello world"));
+      region.put(113, new TestObject("hi world"));
+      region.put(2, new TestObject("goodbye world"));
+    });
+
+    // re-index stored data
+    AsyncInvocation ai1 = dataStore1.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    // re-index stored data
+    AsyncInvocation ai2 = dataStore2.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    AsyncInvocation ai3 = accessor.invokeAsync(() -> {
+      recreateIndex();
+    });
+
+    ai1.join();
+    ai2.join();
+    ai3.join();
+
+    ai1.checkException();
+    ai2.checkException();
+    ai3.checkException();
+
+    executeTextSearch(accessor);
+  }
+
   @Test
   @Parameters(method = "getListOfRegionTestTypes")
   public void defaultFieldShouldPropogateCorrectlyThroughFunction(

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.