You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/09/27 20:58:51 UTC

[geode] branch feature/GEODE-3239 updated: GEODE-3273: catch serializer's exception for current event, not to interrupt other events.

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-3239
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3239 by this push:
     new 9d0aa3a  GEODE-3273: catch serializer's exception for current event,              not to interrupt other events.
9d0aa3a is described below

commit 9d0aa3abf50a259d32658ab34600bf02223478be
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Sep 26 14:26:17 2017 -0700

    GEODE-3273: catch serializer's exception for current event,
                 not to interrupt other events.
---
 .../cache/lucene/internal/LuceneIndexStats.java    | 11 ++++++
 .../internal/repository/IndexRepositoryImpl.java   | 31 +++++++++++----
 .../LuceneIndexMaintenanceIntegrationTest.java     | 46 ++++++++++++++++++++++
 .../lucene/internal/LuceneIndexStatsJUnitTest.java | 14 +++++++
 4 files changed, 95 insertions(+), 7 deletions(-)

diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
index 6c35d5c9..368c063 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
@@ -47,6 +47,7 @@ public class LuceneIndexStats {
   private static final int commitTimeId;
   private static final int commitsInProgressId;
   private static final int documentsId;
+  private static final int failedEntriesId;
 
   private final Statistics stats;
   private final CopyOnWriteHashSet<IntSupplier> documentsSuppliers = new CopyOnWriteHashSet<>();
@@ -75,6 +76,7 @@ public class LuceneIndexStats {
         f.createLongCounter("updateTime",
             "Amount of time spent adding or removing documents from the index", "nanoseconds"),
         f.createIntGauge("updatesInProgress", "Number of index updates in progress", "operations"),
+        f.createIntCounter("failedEntries", "Number of entries failed to index", "entries"),
         f.createIntCounter("commits", "Number of lucene index commits on this member",
             "operations"),
         f.createLongCounter("commitTime", "Amount of time spent in lucene index commits",
@@ -99,6 +101,7 @@ public class LuceneIndexStats {
     commitTimeId = statsType.nameToId("commitTime");
     commitsInProgressId = statsType.nameToId("commitsInProgress");
     documentsId = statsType.nameToId("documents");
+    failedEntriesId = statsType.nameToId("failedEntries");
   }
 
   public LuceneIndexStats(StatisticsFactory f, String name) {
@@ -175,6 +178,14 @@ public class LuceneIndexStats {
     stats.incInt(commitsId, 1);
   }
 
+  public void incFailedEntries() {
+    stats.incInt(failedEntriesId, 1);
+  }
+
+  public int getFailedEntries() {
+    return stats.getInt(failedEntriesId);
+  }
+
   public void addDocumentsSupplier(IntSupplier supplier) {
     this.documentsSuppliers.add(supplier);
   }
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
index da1cb2e..0032fe9 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.geode.distributed.LockNotHeldException;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.function.IntSupplier;
 
 /**
@@ -78,10 +79,18 @@ public class IndexRepositoryImpl implements IndexRepository {
   @Override
   public void create(Object key, Object value) throws IOException {
     long start = stats.startUpdate();
+    Collection<Document> docs = Collections.emptyList();
     try {
-      Collection<Document> docs = serializer.toDocuments(index, value);
-      docs.forEach(doc -> SerializerUtil.addKey(key, doc));
-      writer.addDocuments(docs);
+      try {
+        docs = serializer.toDocuments(index, value);
+      } catch (Exception e) {
+        stats.incFailedEntries();
+        logger.info("Failed to add index for " + value + " due to " + e.getMessage());
+      }
+      if (!docs.isEmpty()) {
+        docs.forEach(doc -> SerializerUtil.addKey(key, doc));
+        writer.addDocuments(docs);
+      }
     } finally {
       stats.endUpdate(start);
     }
@@ -90,11 +99,19 @@ public class IndexRepositoryImpl implements IndexRepository {
   @Override
   public void update(Object key, Object value) throws IOException {
     long start = stats.startUpdate();
+    Collection<Document> docs = Collections.emptyList();
     try {
-      Collection<Document> docs = serializer.toDocuments(index, value);
-      docs.forEach(doc -> SerializerUtil.addKey(key, doc));
-      Term keyTerm = SerializerUtil.toKeyTerm(key);
-      writer.updateDocuments(keyTerm, docs);
+      try {
+        docs = serializer.toDocuments(index, value);
+      } catch (Exception e) {
+        stats.incFailedEntries();
+        logger.info("Failed to update index for " + value + " due to " + e.getMessage());
+      }
+      if (!docs.isEmpty()) {
+        docs.forEach(doc -> SerializerUtil.addKey(key, doc));
+        Term keyTerm = SerializerUtil.toKeyTerm(key);
+        writer.updateDocuments(keyTerm, docs);
+      }
     } finally {
       stats.endUpdate(start);
     }
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
index 1827f2a..9ee8e3c 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.*;
 import static org.junit.Assert.*;
 
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Set;
@@ -44,6 +45,7 @@ import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
 import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
 import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
 import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.lucene.document.Document;
 
 @Category(IntegrationTest.class)
 public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest {
@@ -128,6 +130,31 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest
   }
 
   @Test
+  public void serializerExceptionShouldNotImpactOtherEvents() throws Exception {
+    luceneService.createIndexFactory().setFields("title", "description")
+        .setLuceneSerializer(new TestCatchingExceptionInSerializer("title 3"))
+        .create(INDEX_NAME, REGION_NAME);
+
+    Region region = createRegion(REGION_NAME, RegionShortcut.PARTITION);
+    region.put("object-1", new TestObject("title 1", "hello world"));
+    region.put("object-2", new TestObject("title 2", "this will not match"));
+    region.put("object-3", new TestObject("title 3", "hello world"));
+    region.put("object-4", new TestObject("hello world", "hello world"));
+
+    LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
+    luceneService.waitUntilFlushed(INDEX_NAME, REGION_NAME, WAIT_FOR_FLUSH_TIME,
+        TimeUnit.MILLISECONDS);
+    LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
+        "description:\"hello world\"", DEFAULT_FIELD);
+    PageableLuceneQueryResults<Integer, TestObject> results = query.findPages();
+    assertEquals(2, results.size());
+    LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
+    LuceneIndexStats indexStats = indexForPR.getIndexStats();
+    assertEquals(1, indexStats.getFailedEntries());
+    assertEquals(4, indexStats.getUpdates());
+  }
+
+  @Test
   public void statsAreUpdatedAfterACommit() throws Exception {
     luceneService.createIndexFactory().setFields("title", "description").create(INDEX_NAME,
         REGION_NAME);
@@ -307,4 +334,23 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest
       this.description = description;
     }
   }
+
+  private static class TestCatchingExceptionInSerializer extends HeterogeneousLuceneSerializer {
+
+    String match;
+
+    TestCatchingExceptionInSerializer(String match) {
+      this.match = match;
+    }
+
+    @Override
+    public Collection<Document> toDocuments(LuceneIndex index, Object value) {
+      TestObject testObject = (TestObject) value;
+      if (testObject.title.equals(match)) {
+        throw new RuntimeException("Expected exception in Serializer:" + value);
+      } else {
+        return super.toDocuments(index, value);
+      }
+    }
+  }
 }
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
index 215edb4..6541cbd 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
@@ -89,6 +89,20 @@ public class LuceneIndexStatsJUnitTest {
   }
 
   @Test
+  public void shouldIncrementFailedEntriesStats() {
+
+    stats.startUpdate();
+    verifyIncInt("updatesInProgress", 1);
+    stats.incFailedEntries();
+    stats.endUpdate(5);
+    verifyIncInt("updatesInProgress", -1);
+    verifyIncInt("updates", 1);
+    verifyIncInt("failedEntries", 1);
+    // Because the initial stat time is 0 and the final time is 5, the delta is -5
+    verifyIncLong("updateTime", -5);
+  }
+
+  @Test
   public void shouldIncrementCommitStats() {
 
     stats.startCommit();

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].