You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/08/27 03:07:48 UTC
[2/3] incubator-geode git commit: Adding an AsyncEventListener to
write events to Lucene
Adding an AsyncEventListener to write events to Lucene
Adding an implementation of the listener and a unit test. I also added
several other interfaces for collaborators with this AsyncEventListener.
SingleIndexRepository - Interface for a single lucene index (in other
words, a single index on a single bucket for a PR). This currently has
methods for writing to the index.
RepositoryManager - interface for obtaining SingleIndexRepository
objects based on a user region and a key.
ObjectToDocumentMapper - interface for translating a gemfire object
into an indexable document.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7efa4c15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7efa4c15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7efa4c15
Branch: refs/heads/feature/GEODE-11
Commit: 7efa4c15ebef1836cd763522359385ecd6db14c6
Parents: d802082
Author: Dan Smith <up...@apache.org>
Authored: Wed Aug 26 16:50:03 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Wed Aug 26 17:28:21 2015 -0700
----------------------------------------------------------------------
.../lucene/internal/LuceneEventListener.java | 78 ++++++++++++++++++
.../repository/ObjectToDocumentMapper.java | 24 ++++++
.../internal/repository/RepositoryManager.java | 9 +++
.../repository/SingleIndexRepository.java | 33 ++++++++
.../repository/SingleIndexRepositoryImpl.java | 40 +++++++++
.../internal/LuceneEventListenerJUnitTest.java | 85 ++++++++++++++++++++
6 files changed, 269 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
new file mode 100644
index 0000000..1eca80a
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -0,0 +1,78 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.SingleIndexRepository;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * An Async event queue listener that writes all of the
+ * events in batches to Lucene
+ */
+public class LuceneEventListener implements AsyncEventListener {
+ Logger logger = LogService.getLogger();
+
+ private final RepositoryManager repositoryManager;
+
+ public LuceneEventListener(RepositoryManager repositoryManager) {
+ this.repositoryManager = repositoryManager;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ // Try to get a PDX instance if possible, rather than a deserialized object
+ DefaultQuery.setPdxReadSerialized(true);
+
+ Set<SingleIndexRepository> affectedRepos = new HashSet<SingleIndexRepository>();
+
+ try {
+ for (AsyncEvent event : events) {
+ Region region = event.getRegion();
+ Object key = event.getKey();
+
+ SingleIndexRepository repository = repositoryManager.getRepository(region, key);
+
+ Operation op = event.getOperation();
+
+ if (op.isCreate()) {
+ repository.create(key, event.getDeserializedValue());
+ } else if (op.isUpdate()) {
+ repository.update(key, event.getDeserializedValue());
+ } else if (op.isDestroy()) {
+ repository.delete(key);
+ } else if (op.isInvalidate()) {
+ repository.delete(key);
+ } else {
+ throw new InternalGemFireError("Unhandled operation " + op + " on " + event.getRegion());
+ }
+ affectedRepos.add(repository);
+ }
+
+ for(SingleIndexRepository repo : affectedRepos) {
+ repo.commit();
+ }
+ return true;
+ } catch(IOException e) {
+ logger.error("Unable to save to lucene index", e);
+ return false;
+ } finally {
+ DefaultQuery.setPdxReadSerialized(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
new file mode 100644
index 0000000..24612ca
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
@@ -0,0 +1,24 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.Term;
+
+/**
+ * Interface for transforming a gemfire key and value into a lucene document
+ */
+public interface ObjectToDocumentMapper {
+
+ /**
+ * Transform a gemfire key and value into a document suitable for adding
+ * to a Lucene IndexWriter. The document is expected to have a unique
+ * key which can later be used to delete or update the document
+ */
+ Iterable<? extends IndexableField> transform(Object key, Object value);
+
+ /**
+ * Convert a gemfire key into a key search term that can be used to
+ * update or delete the document associated with this key.
+ */
+ Term keyTerm(Object key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
new file mode 100644
index 0000000..a6879a9
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
@@ -0,0 +1,9 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import com.gemstone.gemfire.cache.Region;
+
+public interface RepositoryManager {
+
+ SingleIndexRepository getRepository(Region region, Object key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
new file mode 100644
index 0000000..f4cc4ef
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
@@ -0,0 +1,33 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.IOException;
+
+/**
+ * An Repository interface for the writing data to lucene.
+ */
+public interface SingleIndexRepository {
+
+ /**
+ * Create a new entry in the lucene index
+ * @throws IOException
+ */
+ void create(Object key, Object value) throws IOException;
+
+ /**
+ * Update the entries in the lucene index
+ * @throws IOException
+ */
+ void update(Object key, Object value) throws IOException;
+
+ /**
+ * Delete the entries in the lucene index
+ * @throws IOException
+ */
+ void delete(Object key) throws IOException;
+
+ /**
+ * Commit the changes to all lucene index
+ * @throws IOException
+ */
+ void commit() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
new file mode 100644
index 0000000..944cf41
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
@@ -0,0 +1,40 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexWriter;
+
+/**
+ * A repository that writes to a single lucene index writer
+ */
+public class SingleIndexRepositoryImpl implements SingleIndexRepository {
+
+ private final IndexWriter writer;
+ private final ObjectToDocumentMapper mapper;
+
+ public SingleIndexRepositoryImpl(IndexWriter writer, ObjectToDocumentMapper mapper) {
+ this.writer = writer;
+ this.mapper = mapper;
+ }
+
+ @Override
+ public void create(Object key, Object value) throws IOException {
+ writer.addDocument(mapper.transform(key, value));
+ }
+
+ @Override
+ public void update(Object key, Object value) throws IOException {
+ writer.updateDocument(mapper.keyTerm(key), mapper.transform(key, value));
+ }
+
+ @Override
+ public void delete(Object key) throws IOException {
+ writer.deleteDocuments(mapper.keyTerm(key));
+ }
+
+ @Override
+ public void commit() throws IOException {
+ writer.commit();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
new file mode 100644
index 0000000..ca80db1
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -0,0 +1,85 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.SingleIndexRepository;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Unit test that async event listener dispatched the events
+ * to the appropriate repository.
+ */
+@Category(UnitTest.class)
+public class LuceneEventListenerJUnitTest {
+
+ @Test
+ public void testProcessBatch() throws IOException {
+ RepositoryManager manager = Mockito.mock(RepositoryManager.class);
+ SingleIndexRepository repo1 = Mockito.mock(SingleIndexRepository.class);
+ SingleIndexRepository repo2 = Mockito.mock(SingleIndexRepository.class);
+ Region region1 = Mockito.mock(Region.class);
+ Region region2 = Mockito.mock(Region.class);
+
+ Mockito.when(manager.getRepository(eq(region1), any())).thenReturn(repo1);
+ Mockito.when(manager.getRepository(eq(region2), any())).thenReturn(repo2);
+
+ LuceneEventListener listener = new LuceneEventListener(manager);
+
+ List<AsyncEvent> events = new ArrayList<AsyncEvent>();
+
+ int numEntries = 100;
+ for (int i = 0; i < numEntries; i++) {
+ AsyncEvent event = Mockito.mock(AsyncEvent.class);
+
+ Region region = i % 2 == 0 ? region1 : region2;
+ Mockito.when(event.getRegion()).thenReturn(region);
+ Mockito.when(event.getKey()).thenReturn(i);
+
+ switch (i % 3) {
+ case 0:
+ Mockito.when(event.getOperation()).thenReturn(Operation.CREATE);
+ Mockito.when(event.getDeserializedValue()).thenReturn(i);
+ break;
+ case 1:
+ Mockito.when(event.getOperation()).thenReturn(Operation.UPDATE);
+ Mockito.when(event.getDeserializedValue()).thenReturn(i);
+ break;
+ case 2:
+ Mockito.when(event.getOperation()).thenReturn(Operation.DESTROY);
+ Mockito.when(event.getDeserializedValue()).thenThrow(new AssertionFailedError());
+ break;
+ }
+
+ events.add(event);
+ }
+
+ listener.processEvents(events);
+
+ verify(repo1, atLeast(numEntries / 6)).create(any(), any());
+ verify(repo1, atLeast(numEntries / 6)).delete(any());
+ verify(repo1, atLeast(numEntries / 6)).update(any(), any());
+ verify(repo2, atLeast(numEntries / 6)).create(any(), any());
+ verify(repo2, atLeast(numEntries / 6)).delete(any());
+ verify(repo2, atLeast(numEntries / 6)).update(any(), any());
+ verify(repo1, times(1)).commit();
+ verify(repo2, times(1)).commit();
+ }
+}