You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2015/08/27 03:07:48 UTC

[2/3] incubator-geode git commit: Adding an AsyncEventListener to write events to Lucene

Adding an AsyncEventListener to write events to Lucene

Adding an implementation of the listener and a unit test. I also added
several other interfaces for collaborators with this AsyncEventListener.

SingleIndexRepository - Interface for a single lucene index (in other
words, a single index on a single bucket for a PR). This currently has
methods for writing to the index.

RepositoryManager - interface for obtaining SingleIndexRepository
objects based on a user region and a key.

ObjectToDocumentMapper - interface for translating a gemfire object
into an indexable document.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/7efa4c15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/7efa4c15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/7efa4c15

Branch: refs/heads/feature/GEODE-11
Commit: 7efa4c15ebef1836cd763522359385ecd6db14c6
Parents: d802082
Author: Dan Smith <up...@apache.org>
Authored: Wed Aug 26 16:50:03 2015 -0700
Committer: Dan Smith <up...@apache.org>
Committed: Wed Aug 26 17:28:21 2015 -0700

----------------------------------------------------------------------
 .../lucene/internal/LuceneEventListener.java    | 78 ++++++++++++++++++
 .../repository/ObjectToDocumentMapper.java      | 24 ++++++
 .../internal/repository/RepositoryManager.java  |  9 +++
 .../repository/SingleIndexRepository.java       | 33 ++++++++
 .../repository/SingleIndexRepositoryImpl.java   | 40 +++++++++
 .../internal/LuceneEventListenerJUnitTest.java  | 85 ++++++++++++++++++++
 6 files changed, 269 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
new file mode 100644
index 0000000..1eca80a
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java
@@ -0,0 +1,78 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventListener;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.SingleIndexRepository;
+import com.gemstone.gemfire.cache.query.internal.DefaultQuery;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * An Async event queue listener that writes all of the
+ * events in batches to Lucene
+ */
+public class LuceneEventListener implements AsyncEventListener {
+  Logger logger = LogService.getLogger();
+
+  private final RepositoryManager repositoryManager;
+  
+  public LuceneEventListener(RepositoryManager repositoryManager) {
+    this.repositoryManager = repositoryManager;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public boolean processEvents(List<AsyncEvent> events) {
+    // Try to get a PDX instance if possible, rather than a deserialized object
+    DefaultQuery.setPdxReadSerialized(true);
+
+    Set<SingleIndexRepository> affectedRepos = new HashSet<SingleIndexRepository>();
+    
+    try {
+      for (AsyncEvent event : events) {
+        Region region = event.getRegion();
+        Object key = event.getKey();
+        
+        SingleIndexRepository repository = repositoryManager.getRepository(region, key);
+
+        Operation op = event.getOperation();
+
+        if (op.isCreate()) {
+          repository.create(key, event.getDeserializedValue());
+        } else if (op.isUpdate()) {
+          repository.update(key, event.getDeserializedValue());
+        } else if (op.isDestroy()) {
+          repository.delete(key);
+        } else if (op.isInvalidate()) {
+          repository.delete(key);
+        } else {
+          throw new InternalGemFireError("Unhandled operation " + op + " on " + event.getRegion());
+        }
+        affectedRepos.add(repository);
+      }
+      
+      for(SingleIndexRepository repo : affectedRepos) {
+        repo.commit();
+      }
+      return true;
+    } catch(IOException e) {
+      logger.error("Unable to save to lucene index", e);
+      return false;
+    } finally {
+      DefaultQuery.setPdxReadSerialized(false);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
new file mode 100644
index 0000000..24612ca
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/ObjectToDocumentMapper.java
@@ -0,0 +1,24 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import org.apache.lucene.index.IndexableField;
+import org.apache.lucene.index.Term;
+
+/**
+ * Interface for transforming a gemfire key and value into a lucene document
+ */
+public interface ObjectToDocumentMapper {
+
+  /**
+   * Transform a gemfire key and value into a document suitable for adding
+   * to a Lucene IndexWriter. The document is expected to have a unique
+   * key which can later be used to delete or update the document
+   */
+  Iterable<? extends IndexableField> transform(Object key, Object value);
+
+  /**
+   * Convert a gemfire key into a key search term that can be used to
+   * update or delete the document associated with this key.
+   */
+  Term keyTerm(Object key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
new file mode 100644
index 0000000..a6879a9
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/RepositoryManager.java
@@ -0,0 +1,9 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import com.gemstone.gemfire.cache.Region;
+
+public interface RepositoryManager {
+
+  SingleIndexRepository getRepository(Region region, Object key);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
new file mode 100644
index 0000000..f4cc4ef
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepository.java
@@ -0,0 +1,33 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.IOException;
+
+/**
+ * An Repository interface for the writing data to lucene.
+ */
+public interface SingleIndexRepository {
+
+  /**
+   * Create a new entry in the lucene index
+   * @throws IOException 
+   */
+  void create(Object key, Object value) throws IOException;
+
+  /**
+   * Update the entries in the lucene index
+   * @throws IOException 
+   */
+  void update(Object key, Object value) throws IOException;
+  
+  /**
+   * Delete the entries in the lucene index
+   * @throws IOException 
+   */
+  void delete(Object key) throws IOException;
+
+  /**
+   * Commit the changes to all lucene index
+   * @throws IOException 
+   */
+  void commit() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
new file mode 100644
index 0000000..944cf41
--- /dev/null
+++ b/gemfire-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/repository/SingleIndexRepositoryImpl.java
@@ -0,0 +1,40 @@
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.IOException;
+
+import org.apache.lucene.index.IndexWriter;
+
+/**
+ * A repository that writes to a single lucene index writer
+ */
+public class SingleIndexRepositoryImpl implements SingleIndexRepository {
+  
+  private final IndexWriter writer;
+  private final ObjectToDocumentMapper mapper;
+  
+  public SingleIndexRepositoryImpl(IndexWriter writer, ObjectToDocumentMapper mapper) {
+    this.writer = writer;
+    this.mapper = mapper;
+  }
+
+  @Override
+  public void create(Object key, Object value) throws IOException {
+      writer.addDocument(mapper.transform(key, value));
+  }
+
+  @Override
+  public void update(Object key, Object value) throws IOException {
+    writer.updateDocument(mapper.keyTerm(key), mapper.transform(key, value));
+  }
+
+  @Override
+  public void delete(Object key) throws IOException {
+    writer.deleteDocuments(mapper.keyTerm(key));
+  }
+
+  @Override
+  public void commit() throws IOException {
+    writer.commit();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/7efa4c15/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
new file mode 100644
index 0000000..ca80db1
--- /dev/null
+++ b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListenerJUnitTest.java
@@ -0,0 +1,85 @@
+package com.gemstone.gemfire.cache.lucene.internal;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.AssertionFailedError;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEvent;
+import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager;
+import com.gemstone.gemfire.cache.lucene.internal.repository.SingleIndexRepository;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+/**
+ * Unit test that async event listener dispatched the events
+ * to the appropriate repository.
+ */
+@Category(UnitTest.class)
+public class LuceneEventListenerJUnitTest {
+
+  @Test
+  public void testProcessBatch() throws IOException {
+    RepositoryManager manager = Mockito.mock(RepositoryManager.class);
+    SingleIndexRepository repo1 = Mockito.mock(SingleIndexRepository.class);
+    SingleIndexRepository repo2 = Mockito.mock(SingleIndexRepository.class);
+    Region region1 = Mockito.mock(Region.class);
+    Region region2 = Mockito.mock(Region.class);
+
+    Mockito.when(manager.getRepository(eq(region1), any())).thenReturn(repo1);
+    Mockito.when(manager.getRepository(eq(region2), any())).thenReturn(repo2);
+
+    LuceneEventListener listener = new LuceneEventListener(manager);
+
+    List<AsyncEvent> events = new ArrayList<AsyncEvent>();
+
+    int numEntries = 100;
+    for (int i = 0; i < numEntries; i++) {
+      AsyncEvent event = Mockito.mock(AsyncEvent.class);
+
+      Region region = i % 2 == 0 ? region1 : region2;
+      Mockito.when(event.getRegion()).thenReturn(region);
+      Mockito.when(event.getKey()).thenReturn(i);
+
+      switch (i % 3) {
+      case 0:
+        Mockito.when(event.getOperation()).thenReturn(Operation.CREATE);
+        Mockito.when(event.getDeserializedValue()).thenReturn(i);
+        break;
+      case 1:
+        Mockito.when(event.getOperation()).thenReturn(Operation.UPDATE);
+        Mockito.when(event.getDeserializedValue()).thenReturn(i);
+        break;
+      case 2:
+        Mockito.when(event.getOperation()).thenReturn(Operation.DESTROY);
+        Mockito.when(event.getDeserializedValue()).thenThrow(new AssertionFailedError());
+        break;
+      }
+
+      events.add(event);
+    }
+
+    listener.processEvents(events);
+
+    verify(repo1, atLeast(numEntries / 6)).create(any(), any());
+    verify(repo1, atLeast(numEntries / 6)).delete(any());
+    verify(repo1, atLeast(numEntries / 6)).update(any(), any());
+    verify(repo2, atLeast(numEntries / 6)).create(any(), any());
+    verify(repo2, atLeast(numEntries / 6)).delete(any());
+    verify(repo2, atLeast(numEntries / 6)).update(any(), any());
+    verify(repo1, times(1)).commit();
+    verify(repo2, times(1)).commit();
+  }
+}