You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2017/09/27 20:58:51 UTC
[geode] branch feature/GEODE-3239 updated: GEODE-3273: catch
serializer's exception for current event, not to interrupt other events.
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch feature/GEODE-3239
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3239 by this push:
new 9d0aa3a GEODE-3273: catch serializer's exception for current event, not to interrupt other events.
9d0aa3a is described below
commit 9d0aa3abf50a259d32658ab34600bf02223478be
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Sep 26 14:26:17 2017 -0700
GEODE-3273: catch serializer's exception for current event,
not to interrupt other events.
---
.../cache/lucene/internal/LuceneIndexStats.java | 11 ++++++
.../internal/repository/IndexRepositoryImpl.java | 31 +++++++++++----
.../LuceneIndexMaintenanceIntegrationTest.java | 46 ++++++++++++++++++++++
.../lucene/internal/LuceneIndexStatsJUnitTest.java | 14 +++++++
4 files changed, 95 insertions(+), 7 deletions(-)
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
index 6c35d5c9..368c063 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexStats.java
@@ -47,6 +47,7 @@ public class LuceneIndexStats {
private static final int commitTimeId;
private static final int commitsInProgressId;
private static final int documentsId;
+ private static final int failedEntriesId;
private final Statistics stats;
private final CopyOnWriteHashSet<IntSupplier> documentsSuppliers = new CopyOnWriteHashSet<>();
@@ -75,6 +76,7 @@ public class LuceneIndexStats {
f.createLongCounter("updateTime",
"Amount of time spent adding or removing documents from the index", "nanoseconds"),
f.createIntGauge("updatesInProgress", "Number of index updates in progress", "operations"),
+ f.createIntCounter("failedEntries", "Number of entries failed to index", "entries"),
f.createIntCounter("commits", "Number of lucene index commits on this member",
"operations"),
f.createLongCounter("commitTime", "Amount of time spent in lucene index commits",
@@ -99,6 +101,7 @@ public class LuceneIndexStats {
commitTimeId = statsType.nameToId("commitTime");
commitsInProgressId = statsType.nameToId("commitsInProgress");
documentsId = statsType.nameToId("documents");
+ failedEntriesId = statsType.nameToId("failedEntries");
}
public LuceneIndexStats(StatisticsFactory f, String name) {
@@ -175,6 +178,14 @@ public class LuceneIndexStats {
stats.incInt(commitsId, 1);
}
+ public void incFailedEntries() {
+ stats.incInt(failedEntriesId, 1);
+ }
+
+ public int getFailedEntries() {
+ return stats.getInt(failedEntriesId);
+ }
+
public void addDocumentsSupplier(IntSupplier supplier) {
this.documentsSuppliers.add(supplier);
}
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
index da1cb2e..0032fe9 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/repository/IndexRepositoryImpl.java
@@ -35,6 +35,7 @@ import org.apache.geode.distributed.LockNotHeldException;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.function.IntSupplier;
/**
@@ -78,10 +79,18 @@ public class IndexRepositoryImpl implements IndexRepository {
@Override
public void create(Object key, Object value) throws IOException {
long start = stats.startUpdate();
+ Collection<Document> docs = Collections.emptyList();
try {
- Collection<Document> docs = serializer.toDocuments(index, value);
- docs.forEach(doc -> SerializerUtil.addKey(key, doc));
- writer.addDocuments(docs);
+ try {
+ docs = serializer.toDocuments(index, value);
+ } catch (Exception e) {
+ stats.incFailedEntries();
+ logger.info("Failed to add index for " + value + " due to " + e.getMessage());
+ }
+ if (!docs.isEmpty()) {
+ docs.forEach(doc -> SerializerUtil.addKey(key, doc));
+ writer.addDocuments(docs);
+ }
} finally {
stats.endUpdate(start);
}
@@ -90,11 +99,19 @@ public class IndexRepositoryImpl implements IndexRepository {
@Override
public void update(Object key, Object value) throws IOException {
long start = stats.startUpdate();
+ Collection<Document> docs = Collections.emptyList();
try {
- Collection<Document> docs = serializer.toDocuments(index, value);
- docs.forEach(doc -> SerializerUtil.addKey(key, doc));
- Term keyTerm = SerializerUtil.toKeyTerm(key);
- writer.updateDocuments(keyTerm, docs);
+ try {
+ docs = serializer.toDocuments(index, value);
+ } catch (Exception e) {
+ stats.incFailedEntries();
+ logger.info("Failed to update index for " + value + " due to " + e.getMessage());
+ }
+ if (!docs.isEmpty()) {
+ docs.forEach(doc -> SerializerUtil.addKey(key, doc));
+ Term keyTerm = SerializerUtil.toKeyTerm(key);
+ writer.updateDocuments(keyTerm, docs);
+ }
} finally {
stats.endUpdate(start);
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
index 1827f2a..9ee8e3c 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexMaintenanceIntegrationTest.java
@@ -18,6 +18,7 @@ import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.*;
import static org.junit.Assert.*;
import java.io.Serializable;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
@@ -44,6 +45,7 @@ import org.apache.geode.cache.lucene.internal.filesystem.FileSystemStats;
import org.apache.geode.cache.lucene.internal.repository.serializer.HeterogeneousLuceneSerializer;
import org.apache.geode.cache.lucene.test.LuceneTestUtilities;
import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.apache.lucene.document.Document;
@Category(IntegrationTest.class)
public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest {
@@ -128,6 +130,31 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest
}
@Test
+ public void serializerExceptionShouldNotImpactOtherEvents() throws Exception {
+ luceneService.createIndexFactory().setFields("title", "description")
+ .setLuceneSerializer(new TestCatchingExceptionInSerializer("title 3"))
+ .create(INDEX_NAME, REGION_NAME);
+
+ Region region = createRegion(REGION_NAME, RegionShortcut.PARTITION);
+ region.put("object-1", new TestObject("title 1", "hello world"));
+ region.put("object-2", new TestObject("title 2", "this will not match"));
+ region.put("object-3", new TestObject("title 3", "hello world"));
+ region.put("object-4", new TestObject("hello world", "hello world"));
+
+ LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME);
+ luceneService.waitUntilFlushed(INDEX_NAME, REGION_NAME, WAIT_FOR_FLUSH_TIME,
+ TimeUnit.MILLISECONDS);
+ LuceneQuery query = luceneService.createLuceneQueryFactory().create(INDEX_NAME, REGION_NAME,
+ "description:\"hello world\"", DEFAULT_FIELD);
+ PageableLuceneQueryResults<Integer, TestObject> results = query.findPages();
+ assertEquals(2, results.size());
+ LuceneIndexForPartitionedRegion indexForPR = (LuceneIndexForPartitionedRegion) index;
+ LuceneIndexStats indexStats = indexForPR.getIndexStats();
+ assertEquals(1, indexStats.getFailedEntries());
+ assertEquals(4, indexStats.getUpdates());
+ }
+
+ @Test
public void statsAreUpdatedAfterACommit() throws Exception {
luceneService.createIndexFactory().setFields("title", "description").create(INDEX_NAME,
REGION_NAME);
@@ -307,4 +334,23 @@ public class LuceneIndexMaintenanceIntegrationTest extends LuceneIntegrationTest
this.description = description;
}
}
+
+ private static class TestCatchingExceptionInSerializer extends HeterogeneousLuceneSerializer {
+
+ String match;
+
+ TestCatchingExceptionInSerializer(String match) {
+ this.match = match;
+ }
+
+ @Override
+ public Collection<Document> toDocuments(LuceneIndex index, Object value) {
+ TestObject testObject = (TestObject) value;
+ if (testObject.title.equals(match)) {
+ throw new RuntimeException("Expected exception in Serializer:" + value);
+ } else {
+ return super.toDocuments(index, value);
+ }
+ }
+ }
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
index 215edb4..6541cbd 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexStatsJUnitTest.java
@@ -89,6 +89,20 @@ public class LuceneIndexStatsJUnitTest {
}
@Test
+ public void shouldIncrementFailedEntriesStats() {
+
+ stats.startUpdate();
+ verifyIncInt("updatesInProgress", 1);
+ stats.incFailedEntries();
+ stats.endUpdate(5);
+ verifyIncInt("updatesInProgress", -1);
+ verifyIncInt("updates", 1);
+ verifyIncInt("failedEntries", 1);
+ // Because the initial stat time is 0 and the final time is 5, the delta is -5
+ verifyIncLong("updateTime", -5);
+ }
+
+ @Test
public void shouldIncrementCommitStats() {
stats.startCommit();
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].