You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/04/18 17:42:27 UTC
[geode] 01/02: GEODE-5096: Added support to validate server cache
profiles in the case of concurrent updates to region cache profiles.
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-5096
in repository https://gitbox.apache.org/repos/asf/geode.git
commit cc798b9a4158c09661a7ff2a88d8491e9add3880
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Mon Apr 16 15:46:40 2018 -0700
GEODE-5096: Added support to validate server cache profiles in the case
of concurrent updates to region cache profiles.
---
.../IncompatibleCacheServiceProfileException.java | 23 +++
.../geode/internal/cache/InternalRegion.java | 3 +-
.../apache/geode/internal/cache/LocalRegion.java | 187 ++++++++++++---------
.../geode/internal/cache/PartitionedRegion.java | 10 ++
.../internal/cache/UpdateAttributesProcessor.java | 46 ++---
.../sanctioned-geode-core-serializables.txt | 1 +
.../apache/geode/codeAnalysis/excludedClasses.txt | 2 +-
.../cache/lucene/internal/LuceneServiceImpl.java | 9 +-
.../LuceneIndexCreationProfileDUnitTest.java | 109 ++++++++++++
.../LuceneIndexCreationProfileJUnitTest.java | 4 +-
.../cache/lucene/test/LuceneTestUtilities.java | 6 +-
11 files changed, 287 insertions(+), 113 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java
new file mode 100644
index 0000000..c9a7435
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.GemFireCheckedException;
+
+public class IncompatibleCacheServiceProfileException extends GemFireCheckedException {
+ public IncompatibleCacheServiceProfileException(String message) {
+ super(message);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 7c5d722..61360d6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -142,7 +142,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
InternalRegionArguments internalRegionArgs)
throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException;
- void addCacheServiceProfile(CacheServiceProfile profile);
+ void addCacheServiceProfile(CacheServiceProfile profile)
+ throws IncompatibleCacheServiceProfileException;
void setEvictionMaximum(int maximum);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 3aab945..bb7a03a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -146,7 +146,6 @@ import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionAdvisor;
-import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionStats;
@@ -709,7 +708,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return new Object();
} else {
return this.fullPath; // avoids creating another sync object - could be anything unique to
- // this region
+ // this region
}
}
@@ -883,8 +882,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
LocalRegion newRegion = null;
try {
- if (getDestroyLock)
+ if (getDestroyLock) {
acquireDestroyLock();
+ }
LocalRegion existing = null;
try {
if (isDestroyed()) {
@@ -1063,7 +1063,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
false, // ifOld
null, // expectedOldValue
true // requireOldValue TODO txMerge why is oldValue required for
- // create? I think so that the EntryExistsException will have it.
+ // create? I think so that the EntryExistsException will have it.
)) {
throw new EntryExistsException(event.getKey().toString(), event.getOldValue());
} else {
@@ -1635,8 +1635,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
eventReturned = true;
return event;
} finally {
- if (!eventReturned)
+ if (!eventReturned) {
event.release();
+ }
}
}
@@ -2518,8 +2519,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
private static void releaseLatch(StoppableCountDownLatch latch) {
- if (latch == null)
+ if (latch == null) {
return;
+ }
latch.countDown();
}
@@ -3123,10 +3125,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
// check validity of key against keyConstraint
if (this.keyConstraint != null) {
- if (!this.keyConstraint.isInstance(key))
+ if (!this.keyConstraint.isInstance(key)) {
throw new ClassCastException(
LocalizedStrings.LocalRegion_KEY_0_DOES_NOT_SATISFY_KEYCONSTRAINT_1
.toLocalizedString(key.getClass().getName(), this.keyConstraint.getName()));
+ }
}
// We don't need to check that the key is Serializable. Instead,
@@ -3145,7 +3148,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*/
private final AtomicInteger tombstoneCount = new AtomicInteger();
- /** a boolean for issuing a client/server configuration mismatch message */
+ /**
+ * a boolean for issuing a client/server configuration mismatch message
+ */
private boolean concurrencyMessageIssued;
/**
@@ -3199,7 +3204,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.cachePerfStats;
}
- /** regions track the number of tombstones their map holds for size calculations */
+ /**
+ * regions track the number of tombstones their map holds for size calculations
+ */
public void incTombstoneCount(int delta) {
this.tombstoneCount.addAndGet(delta);
this.cachePerfStats.incTombstoneCount(delta);
@@ -3331,7 +3338,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** local regions do not perform versioning */
+ /**
+ * local regions do not perform versioning
+ */
protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
if (this.getDataPolicy().withPersistence()) {
return true;
@@ -3381,9 +3390,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* returned. This method is intended for testing.testing purposes only.
*
* @throws EntryNotFoundException No entry with {@code key} exists
- *
* @see RegionMap#getEntry
- *
* @since GemFire 3.2
*/
@Override
@@ -3452,9 +3459,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @throws EntryNotFoundException No entry with {@code key} exists
* @throws IllegalStateException If this region does not write to disk
- *
* @see RegionEntry#getValueOnDisk
- *
* @since GemFire 3.2
*/
@Override
@@ -3486,9 +3491,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @throws EntryNotFoundException No entry with {@code key} exists
* @throws IllegalStateException If this region does not write to disk
- *
* @see RegionEntry#getValueOnDisk
- *
* @since GemFire 5.1
*/
public Object getValueOnDiskOrBuffer(Object key) throws EntryNotFoundException {
@@ -4396,12 +4399,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
clearViaFilterClass((String) key);
break;
case InterestType.KEY:
- if (key instanceof String && key.equals("ALL_KEYS"))
+ if (key instanceof String && key.equals("ALL_KEYS")) {
clearViaRegEx(".*");
- else if (key instanceof List)
+ } else if (key instanceof List) {
clearViaList((List) key);
- else
+ } else {
localDestroyNoCallbacks(key);
+ }
break;
case InterestType.OQL_QUERY:
clearViaQuery((String) key);
@@ -4435,7 +4439,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
void reinitializeFromImageTarget(InternalDistributedMember imageTarget)
throws TimeoutException, IOException, ClassNotFoundException {
Assert.assertTrue(imageTarget != null);
@@ -4450,7 +4456,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.reinitialized_new;
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
void reinitialize_destroy(RegionEventImpl event) throws CacheWriterException, TimeoutException {
final boolean cacheWrite = !event.originRemote;
// register this region as reinitializing
@@ -4458,7 +4466,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
basicDestroyRegion(event, cacheWrite, false/* lock */, true);
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
private void recreate(InputStream inputStream, InternalDistributedMember imageTarget)
throws TimeoutException, IOException, ClassNotFoundException {
String thePath = getFullPath();
@@ -4682,7 +4692,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return latch.getCount() == 0;
}
- /** wait on the initialization Latch based on thread requirements */
+ /**
+ * wait on the initialization Latch based on thread requirements
+ */
@Override
public void waitOnInitialization() {
if (this.initialized) {
@@ -4736,7 +4748,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
waitOnInitialization(this.initializationLatchAfterGetInitialImage);
}
- /** return null if not found */
+ /**
+ * return null if not found
+ */
@Override
public RegionEntry basicGetEntry(Object key) {
// ok to ignore tx state; all callers are non-transactional
@@ -4873,7 +4887,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* than any other surviving members. So they shouldn't have any entries in their cache that match
* entries that we failed to receive through the GII but are reflected in our current RVV. So it
* should be safe to start with the current RVV.
- *
*/
void repairRVV() {
RegionVersionVector rvv = this.getVersionVector();
@@ -5031,7 +5044,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param event the event object for this operation, with the exception that the oldValue
* parameter is not yet filled in. The oldValue will be filled in by this operation.
- *
* @param ifNew true if this operation must not overwrite an existing key
* @param ifOld true if this operation must not create a new key
* @param expectedOldValue only succeed if old value is equal to this value. If null, then doesn't
@@ -5570,7 +5582,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param event the event object for this operation, with the exception that the oldValue
* parameter is not yet filled in. The oldValue will be filled in by this operation.
- *
* @param ifNew true if this operation must not overwrite an existing key
* @param ifOld true if this operation must not create a new entry
* @param lastModified the lastModified time to set with the value; if 0L, then the lastModified
@@ -6019,7 +6030,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param task - a Runnable to wrap the processing of the bulk op
* @param eventId - the base event ID of the bulk op
- *
* @since GemFire 5.7
*/
public void syncBulkOp(Runnable task, EventID eventId) {
@@ -6298,9 +6308,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
/**
* Called after this region has been completely created
*
- * @since GemFire 5.0
- *
* @see DistributedRegion#postDestroyRegion(boolean, RegionEventImpl)
+ * @since GemFire 5.0
*/
@Override
public void postCreateRegion() {
@@ -6980,7 +6989,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** @return true if initialization is complete */
+ /**
+ * @return true if initialization is complete
+ */
@Override
public boolean isInitialized() {
if (this.initialized) {
@@ -7097,7 +7108,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* to map entry, and key must be in map
*
* @param lastModified time, may be 0 in which case uses now instead
- *
* @return the actual lastModifiedTime used.
*/
@Override
@@ -7202,7 +7212,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** The listener is not closed until after the afterRegionDestroy event */
+ /**
+ * The listener is not closed until after the afterRegionDestroy event
+ */
protected void closeCallbacksExceptListener() {
closeCacheCallback(getCacheLoader());
closeCacheCallback(getCacheWriter());
@@ -7212,7 +7224,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** This is only done when the cache is closed. */
+ /**
+ * This is only done when the cache is closed.
+ */
private void closeAllCallbacks() {
closeCallbacksExceptListener();
CacheListener[] listeners = fetchCacheListenersField();
@@ -7578,7 +7592,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* the region have been set.
*
* @return {@code null} is a disk region is not desired
- *
* @since GemFire 3.2
*/
protected DiskRegion createDiskRegion(InternalRegionArguments internalRegionArgs)
@@ -7863,8 +7876,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (other.region != null) {
return false;
}
- } else if (!this.region.equals(other.region))
+ } else if (!this.region.equals(other.region)) {
return false;
+ }
return true;
}
@@ -8156,10 +8170,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
private void cancelAllEntryExpiryTasks() {
// This method gets called during LocalRegion construction
// in which case the final entryExpiryTasks field can still be null
- if (this.entryExpiryTasks == null)
+ if (this.entryExpiryTasks == null) {
return;
- if (this.entryExpiryTasks.isEmpty())
+ }
+ if (this.entryExpiryTasks.isEmpty()) {
return;
+ }
boolean doPurge = false;
for (EntryExpiryTask task : this.entryExpiryTasks.values()) {
// no need to call incCancels since we will call forcePurge
@@ -8227,7 +8243,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
getImageState().readUnlockRI();
}
- /** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */
+ /**
+ * doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor
+ */
LocalRegion basicGetParentRegion() {
return this.parentRegion;
}
@@ -8403,7 +8421,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
this.entries.decTxRefCount(regionEntry);
}
- /** Does not throw RegionDestroyedException even if destroyed */
+ /**
+ * Does not throw RegionDestroyedException even if destroyed
+ */
List debugGetSubregionNames() {
List names = new ArrayList();
names.addAll(this.subregions.keySet());
@@ -8508,7 +8528,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** Set view of subregions */
+ /**
+ * Set view of subregions
+ */
private class SubregionsSet extends AbstractSet {
final boolean recursive;
@@ -8663,7 +8685,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
this.key = regionEntry.getKey();
}
- /** Internal method for getting the underlying RegionEntry */
+ /**
+ * Internal method for getting the underlying RegionEntry
+ */
public RegionEntry getRegionEntry() {
RegionEntry regionEntry = LocalRegion.this.getRegionMap().getEntry(this.key);
if (regionEntry == null) {
@@ -10310,7 +10334,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
FilterProfile filterProfile;
/**
- *
* @return int array containing the IDs of the oplogs which will potentially get rolled else null
* if no oplogs were available at the time of signal or region is not having disk
* persistence. Pls note that the actual number of oplogs rolled may be more than what is
@@ -10422,8 +10445,17 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.cacheServiceProfiles.getSnapshot();
}
- public void addCacheServiceProfile(CacheServiceProfile profile) {
- this.cacheServiceProfiles.put(profile.getId(), profile);
+ @Override
+ public void addCacheServiceProfile(CacheServiceProfile profile)
+ throws IncompatibleCacheServiceProfileException {
+ synchronized (this.cacheServiceProfiles) {
+ this.cacheServiceProfiles.putIfAbsent(profile.getId(), profile);
+ this.validateProfiles();
+ }
+ }
+
+ protected boolean validateProfiles() throws IncompatibleCacheServiceProfileException {
+ return true;
}
@Override
@@ -10433,42 +10465,38 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
searcher);
}
- /** visitor over the CacheProfiles to check if the region has a CacheLoader */
+ /**
+ * visitor over the CacheProfiles to check if the region has a CacheLoader
+ */
private static final DistributionAdvisor.ProfileVisitor<Void> netLoaderVisitor =
- new DistributionAdvisor.ProfileVisitor<Void>() {
- @Override
- public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex,
- int numProfiles, Void aggregate) {
- assert profile instanceof CacheProfile;
- final CacheProfile prof = (CacheProfile) profile;
-
- // if region in cache is not yet initialized, exclude
- if (prof.regionInitialized) { // fix for bug 41102
- // cut the visit short if we find a CacheLoader
- return !prof.hasCacheLoader;
- }
- // continue the visit
- return true;
+ (advisor, profile, profileIndex, numProfiles, aggregate) -> {
+ assert profile instanceof CacheProfile;
+ final CacheProfile prof = (CacheProfile) profile;
+
+ // if region in cache is not yet initialized, exclude
+ if (prof.regionInitialized) { // fix for bug 41102
+ // cut the visit short if we find a CacheLoader
+ return !prof.hasCacheLoader;
}
+ // continue the visit
+ return true;
};
- /** visitor over the CacheProfiles to check if the region has a CacheWriter */
+ /**
+ * visitor over the CacheProfiles to check if the region has a CacheWriter
+ */
private static final DistributionAdvisor.ProfileVisitor<Void> netWriterVisitor =
- new DistributionAdvisor.ProfileVisitor<Void>() {
- @Override
- public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex,
- int numProfiles, Void aggregate) {
- assert profile instanceof CacheProfile;
- final CacheProfile prof = (CacheProfile) profile;
-
- // if region in cache is in recovery
- if (!prof.inRecovery) {
- // cut the visit short if we find a CacheWriter
- return !prof.hasCacheWriter;
- }
- // continue the visit
- return true;
+ (advisor, profile, profileIndex, numProfiles, aggregate) -> {
+ assert profile instanceof CacheProfile;
+ final CacheProfile prof = (CacheProfile) profile;
+
+ // if region in cache is in recovery
+ if (!prof.inRecovery) {
+ // cut the visit short if we find a CacheWriter
+ return !prof.hasCacheWriter;
}
+ // continue the visit
+ return true;
};
/**
@@ -11458,7 +11486,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** test hook - dump the backing map for this region */
+ /**
+ * test hook - dump the backing map for this region
+ */
public void dumpBackingMap() {
synchronized (this.entries) {
if (this.entries instanceof AbstractRegionMap) {
@@ -11514,7 +11544,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @return previous value associated with specified key, or <tt>null</tt> if there was no mapping
* for key. A <tt>null</tt> return can also indicate that the entry in the region was
* previously in an invalidated state.
- *
* @throws ClassCastException if key does not satisfy the keyConstraint
* @throws IllegalArgumentException if the key or value is not serializable and this is a
* distributed region
@@ -11990,14 +12019,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @param indexName the name of the index
* @param indexedExpression the index expression
* @param fromClause the from clause.
- *
* @return The index map.
- *
* @throws IllegalStateException if this region is not using soplog persistence
- *
* @throws IllegalStateException if this index was previously persisted with a different
* expression or from clause.
- *
*/
public IndexMap getIndexMap(String indexName, String indexedExpression, String fromClause) {
return new IndexMapImpl();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index a7d0800..58d2284 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -476,6 +476,16 @@ public class PartitionedRegion extends LocalRegion
colocationListeners.remove(colocationListener);
}
+ @Override
+ protected boolean validateProfiles() throws IncompatibleCacheServiceProfileException {
+ try {
+ new CreateRegionProcessor((PartitionedRegion) this).initializeRegion();
+ return true;
+ } catch (Exception e) {
+ throw new IncompatibleCacheServiceProfileException(e.getMessage());
+ }
+ // new UpdateAttributesProcessor((PartitionedRegion) this).validateProfiles()
+ }
static PRIdMap getPrIdToPR() {
return prIdToPR;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
index b0effb2..60e2527 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
@@ -20,6 +20,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -48,7 +49,6 @@ import org.apache.geode.internal.logging.LogService;
* This class is a bit misnamed. It really has more with pushing a DistributionAdvisee's profile out
* to others and, optionally if <code>profileExchange</code>, fetching the profile of anyone who
* excepts the pushed profile.
- *
*/
public class UpdateAttributesProcessor {
private static final Logger logger = LogService.getLogger();
@@ -64,7 +64,9 @@ public class UpdateAttributesProcessor {
private boolean removeProfile = false;
private ReplyProcessor21 processor;
- /** Creates a new instance of UpdateAttributesProcessor */
+ /**
+ * Creates a new instance of UpdateAttributesProcessor
+ */
public UpdateAttributesProcessor(DistributionAdvisee da) {
this(da, false);
}
@@ -115,6 +117,7 @@ public class UpdateAttributesProcessor {
}
}
+
public void sendProfileUpdate(boolean exchangeProfiles) {
DistributionManager mgr = this.advisee.getDistributionManager();
DistributionAdvisor advisor = this.advisee.getDistributionAdvisor();
@@ -151,32 +154,31 @@ public class UpdateAttributesProcessor {
return;
}
- ReplyProcessor21 processor = null;
- // Scope scope = this.region.scope;
-
// always require an ack to prevent misordering of messages
InternalDistributedSystem system = this.advisee.getSystem();
- processor = new UpdateAttributesReplyProcessor(system, recipients);
- UpdateAttributesMessage message = getUpdateAttributesMessage(processor, recipients);
+ UpdateAttributesReplyProcessor replyProcessor =
+ new UpdateAttributesReplyProcessor(system, recipients);
+ UpdateAttributesMessage message = getUpdateAttributesMessage(replyProcessor);
mgr.putOutgoing(message);
- this.processor = processor;
+ this.processor = replyProcessor;
}
-
- UpdateAttributesMessage getUpdateAttributesMessage(ReplyProcessor21 processor, Set recipients) {
+ UpdateAttributesMessage getUpdateAttributesMessage(UpdateAttributesReplyProcessor processor) {
UpdateAttributesMessage msg = new UpdateAttributesMessage();
msg.adviseePath = this.advisee.getFullPath();
- msg.setRecipients(recipients);
- if (processor != null) {
- msg.processorId = processor.getProcessorId();
- }
+ msg.setRecipients(processor.getRecipients());
+ msg.processorId = processor.getProcessorId();
msg.profile = this.advisee.getProfile();
msg.exchangeProfiles = this.profileExchange;
msg.removeProfile = this.removeProfile;
return msg;
}
+ public void validateProfiles() {
+
+ }
+
class UpdateAttributesReplyProcessor extends ReplyProcessor21 {
UpdateAttributesReplyProcessor(InternalDistributedSystem system, Set members) {
@@ -236,12 +238,9 @@ public class UpdateAttributesProcessor {
if (msg instanceof ProfilesReplyMessage) {
ProfilesReplyMessage reply = (ProfilesReplyMessage) msg;
if (reply.profiles != null) {
- for (int i = 0; i < reply.profiles.length; i++) {
- // @todo Add putProfiles to DistributionAdvisor to do this
- // with one call atomically?
- UpdateAttributesProcessor.this.advisee.getDistributionAdvisor()
- .putProfile(reply.profiles[i]);
- }
+ Arrays.stream(reply.profiles)
+ .forEach((profile) -> UpdateAttributesProcessor.this.advisee
+ .getDistributionAdvisor().putProfile(profile));
}
} else if (msg instanceof ProfileReplyMessage) {
ProfileReplyMessage reply = (ProfileReplyMessage) msg;
@@ -254,6 +253,10 @@ public class UpdateAttributesProcessor {
super.process(msg);
}
}
+
+ public Collection getRecipients() {
+ return Arrays.asList(members);
+ }
}
@@ -439,6 +442,7 @@ public class UpdateAttributesProcessor {
}
}
+
/**
* Used to return multiple profiles
*
@@ -465,14 +469,12 @@ public class UpdateAttributesProcessor {
}
-
@Override
public int getDSFID() {
return PROFILES_REPLY_MESSAGE;
}
-
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 7f9cc5d..9f94b1f 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -804,3 +804,4 @@ org/apache/geode/security/AuthenticationFailedException,true,-820286647227908887
org/apache/geode/security/AuthenticationRequiredException,true,4675976651103154919
org/apache/geode/security/GemFireSecurityException,true,3814254578203076926,cause:java/lang/Throwable
org/apache/geode/security/NotAuthorizedException,true,419215768216387745,principal:java/security/Principal
+org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException,false
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index fef1243..394012a 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -91,4 +91,4 @@ org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$3
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$4
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList
-org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
+org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
\ No newline at end of file
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 5756d71..e7cabce 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -69,6 +69,7 @@ import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PrimaryBucketException;
@@ -221,8 +222,12 @@ public class LuceneServiceImpl implements InternalLuceneService {
LuceneSerializer serializer) {
validateRegionAttributes(region.getAttributes());
- region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields,
- analyzer, fieldAnalyzers, serializer));
+ try {
+ region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields,
+ analyzer, fieldAnalyzers, serializer));
+ } catch (IncompatibleCacheServiceProfileException e) {
+ e.printStackTrace();
+ }
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
region.updatePRConfigWithNewGatewaySender(aeqId);
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java
new file mode 100644
index 0000000..17a3d6e
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.awaitility.Awaitility;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.LuceneTest;
+
+@Category({DistributedTest.class, LuceneTest.class})
+public class LuceneIndexCreationProfileDUnitTest implements Serializable {
+
+ private static final String INDEX_NAME = "index";
+ private static final String REGION_NAME = "region";
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule(2);
+
+ @Rule
+ public CacheRule cacheRule = CacheRule.builder()
+ .addSystemProperty(DistributionConfig.GEMFIRE_PREFIX + "luceneReindex", "true")
+ .createCacheInAll().disconnectAfter().build();
+
+ @Test
+ public void testConcurrentIndexCreationWithDifferentProfiles() {
+ VM vm0 = VM.getVM(0);
+ VM vm1 = VM.getVM(1);
+
+ vm0.invoke(this::setupCacheAndRegion);
+ vm1.invoke(this::setupCacheAndRegion);
+
+ vm0.invoke(() -> {
+ Region<Object, Object> region = cacheRule.getCache().getRegion(REGION_NAME);
+ for (int i = 0; i < 113; i++) {
+ region.put(i, i);
+ }
+ });
+
+ AsyncInvocation<Boolean> asyncInvocation0 = vm0.invokeAsync(() -> {
+ PartitionedRegion region1 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ try {
+ region1.addCacheServiceProfile(getOneFieldLuceneIndexCreationProfile());
+ return false;
+ } catch (IncompatibleCacheServiceProfileException e) {
+ e.printStackTrace();
+ return true;
+ }
+ });
+
+ AsyncInvocation<Boolean> asyncInvocation1 = vm1.invokeAsync(() -> {
+ PartitionedRegion region2 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ try {
+ region2.addCacheServiceProfile(getTwoFieldLuceneIndexCreationProfile());
+ return false;
+ } catch (IncompatibleCacheServiceProfileException e) {
+ e.printStackTrace();
+ return true;
+ }
+ });
+
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+ .until(() -> asyncInvocation0.get() && asyncInvocation1.get());
+ }
+
+ private void setupCacheAndRegion() {
+ InternalCache cache = cacheRule.getCache();
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+ }
+
+ private LuceneIndexCreationProfile getOneFieldLuceneIndexCreationProfile() {
+ return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME, new String[] {"field1"},
+ new StandardAnalyzer(), null, null);
+ }
+
+ private LuceneIndexCreationProfile getTwoFieldLuceneIndexCreationProfile() {
+ return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME,
+ new String[] {"field1", "field2"}, new StandardAnalyzer(), null, null);
+ }
+}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
index 8a88e72..7e76b93 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
@@ -61,11 +61,11 @@ public class LuceneIndexCreationProfileJUnitTest {
}
private Object[] getSerializationProfiles() {
- return $(new Object[] {getOneFieldLuceneIndexCreationProfile()},
+ return new Object[] {new Object[] {getOneFieldLuceneIndexCreationProfile()},
new Object[] {getTwoFieldLuceneIndexCreationProfile()},
new Object[] {getTwoAnalyzersLuceneIndexCreationProfile()},
new Object[] {getDummySerializerCreationProfile()},
- new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()});
+ new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()}};
}
@Test
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
index e3c2001..9e7eab0 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
@@ -83,10 +83,8 @@ public class LuceneTestUtilities {
public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_SERIALIZER =
"Cannot create Lucene index index on region /region with serializer DummyLuceneSerializer because another member defines the same index with different serializer HeterogeneousLuceneSerializer.";
- public static String Quarter1 = "Q1";
- public static String Quarter2 = "Q2";
- public static String Quarter3 = "Q3";
- public static String Quarter4 = "Q4";
+ private static String Quarter1 = "Q1";
+ private static String Quarter2 = "Q2";
public static void verifyResultOrder(Collection<EntryScore<String>> list,
EntryScore<String>... expectedEntries) {
--
To stop receiving notification emails like this one, please contact
udo@apache.org.