You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/04/30 21:41:11 UTC
[geode] 01/01: GEODE-5096: ValidateCacheServerProfileProcessor to
validate CacheServerProfiles at time upgrade.
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/GEODE-5096
in repository https://gitbox.apache.org/repos/asf/geode.git
commit e847ec093238a25b5b5d766d92a678072ec49e01
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Mon Apr 16 15:46:40 2018 -0700
GEODE-5096: ValidateCacheServerProfileProcessor to validate
CacheServerProfiles at time upgrade.
---
.../distributed/internal/DistributionAdvisor.java | 12 +-
.../org/apache/geode/internal/DSFIDFactory.java | 20 +-
.../geode/internal/DataSerializableFixedID.java | 3 +
.../IncompatibleCacheServiceProfileException.java | 23 ++
.../geode/internal/cache/InternalRegion.java | 7 +-
.../apache/geode/internal/cache/LocalRegion.java | 258 ++++++++++-------
.../geode/internal/cache/PartitionedRegion.java | 166 ++++++-----
.../internal/cache/UpdateAttributesProcessor.java | 113 ++++----
.../cache/ValidateCacheServerProfileProcessor.java | 312 +++++++++++++++++++++
.../sanctioned-geode-core-serializables.txt | 1 +
.../geode/test/dunit/standalone/DUnitLauncher.java | 17 +-
.../apache/geode/codeAnalysis/excludedClasses.txt | 2 +-
.../internal/LuceneIndexCreationProfile.java | 1 +
.../cache/lucene/internal/LuceneServiceImpl.java | 20 +-
.../cache/lucene/LuceneIndexCreationDUnitTest.java | 5 +-
.../LuceneQueriesReindexClientDUnitTest.java | 12 +-
.../LuceneIndexCreationProfileDUnitTest.java | 109 +++++++
.../LuceneIndexCreationProfileJUnitTest.java | 4 +-
.../cache/lucene/test/LuceneTestUtilities.java | 6 +-
19 files changed, 824 insertions(+), 267 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
index a3be6cf..c8555c5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionAdvisor.java
@@ -1176,8 +1176,9 @@ public class DistributionAdvisor {
DistributionAdvisee advisee = getAdvisee();
do {
advisee = advisee.getParentAdvisee();
- if (advisee == null)
+ if (advisee == null) {
return getDefaultDistributionMembers();
+ }
advisor = advisee.getDistributionAdvisor();
} while (!advisor.isInitialized());
// do not call adviseGeneric because we don't want to trigger another
@@ -1276,7 +1277,7 @@ public class DistributionAdvisor {
protected void profileRemoved(Profile profile) {}
/** All advise methods go through this method */
- protected Set<InternalDistributedMember> adviseFilter(Filter f) {
+ protected Set<InternalDistributedMember> adviseFilter(Filter filter) {
initializationGate();
if (disabled) {
if (logger.isDebugEnabled()) {
@@ -1286,11 +1287,10 @@ public class DistributionAdvisor {
}
Set<InternalDistributedMember> recipients = null;
Profile[] locProfiles = this.profiles; // grab current profiles
- for (int i = 0; i < locProfiles.length; i++) {
- Profile profile = locProfiles[i];
- if (f == null || f.include(profile)) {
+ for (Profile profile : locProfiles) {
+ if (filter == null || filter.include(profile)) {
if (recipients == null) {
- recipients = new HashSet<InternalDistributedMember>();
+ recipients = new HashSet<>();
}
recipients.add(profile.getDistributedMember());
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 204459f..de534bd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -265,6 +265,7 @@ import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.UpdateEntryVersionOperation.UpdateEntryVersionMessage;
import org.apache.geode.internal.cache.UpdateOperation;
import org.apache.geode.internal.cache.VMCachedDeserializable;
+import org.apache.geode.internal.cache.ValidateCacheServerProfileProcessor;
import org.apache.geode.internal.cache.backup.AbortBackupRequest;
import org.apache.geode.internal.cache.backup.BackupResponse;
import org.apache.geode.internal.cache.backup.FinishBackupRequest;
@@ -451,7 +452,9 @@ public class DSFIDFactory implements DataSerializableFixedID {
registerDSFIDTypes();
}
- /** Register the constructor for a fixed ID class. */
+ /**
+ * Register the constructor for a fixed ID class.
+ */
public static void registerDSFID(int dsfid, Class dsfidClass) {
try {
Constructor<?> cons = dsfidClass.getConstructor((Class[]) null);
@@ -874,12 +877,12 @@ public class DSFIDFactory implements DataSerializableFixedID {
registerDSFID(END_BUCKET_CREATION_MESSAGE, EndBucketCreationMessage.class);
registerDSFID(PREPARE_BACKUP_REQUEST, PrepareBackupRequest.class);
registerDSFID(BACKUP_RESPONSE, BackupResponse.class); // in older versions this was
- // FinishBackupResponse which is
- // compatible
+ // FinishBackupResponse which is
+ // compatible
registerDSFID(FINISH_BACKUP_REQUEST, FinishBackupRequest.class);
registerDSFID(FINISH_BACKUP_RESPONSE, BackupResponse.class); // for backwards compatibility map
- // FINISH_BACKUP_RESPONSE to
- // BackupResponse
+ // FINISH_BACKUP_RESPONSE to
+ // BackupResponse
registerDSFID(COMPACT_REQUEST, CompactRequest.class);
registerDSFID(COMPACT_RESPONSE, CompactResponse.class);
registerDSFID(FLOW_CONTROL_PERMIT_MESSAGE, FlowControlPermitMessage.class);
@@ -937,6 +940,11 @@ public class DSFIDFactory implements DataSerializableFixedID {
registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+
+ registerDSFID(VALIDATE_CACHE_PROFILE_MESSAGE,
+ ValidateCacheServerProfileProcessor.ValidateCacheProfileMessage.class);
+ registerDSFID(VALIDATE_CACHE_SERVER_REPLY_MESSAGE,
+ ValidateCacheServerProfileProcessor.ValidateCacheServerProfileReplyMessage.class);
}
/**
@@ -1009,8 +1017,8 @@ public class DSFIDFactory implements DataSerializableFixedID {
}
}
-
////////////////// Reading Internal Objects /////////////////
+
/**
* Reads an instance of <code>IpAddress</code> from a <code>DataInput</code>.
*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index b5b6b5e..4cfffd3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -823,6 +823,9 @@ public interface DataSerializableFixedID extends SerializationVersions {
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
short ABORT_BACKUP_REQUEST = 2183;
+ short VALIDATE_CACHE_PROFILE_MESSAGE = 2184;
+ short VALIDATE_CACHE_SERVER_REPLY_MESSAGE = 2185;
+
// NOTE, codes > 65535 will take 4 bytes to serialize
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java
new file mode 100644
index 0000000..c9a7435
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.GemFireCheckedException;
+
+public class IncompatibleCacheServiceProfileException extends GemFireCheckedException {
+ public IncompatibleCacheServiceProfileException(String message) {
+ super(message);
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index fb9fe1c..593e467 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -150,8 +150,6 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
InternalRegionArguments internalRegionArgs)
throws RegionExistsException, TimeoutException, IOException, ClassNotFoundException;
- void addCacheServiceProfile(CacheServiceProfile profile);
-
void setEvictionMaximum(int maximum);
/**
@@ -375,4 +373,9 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
Object getIMSync();
IndexManager setIndexManager(IndexManager idxMgr);
+
+ void addCacheServiceProfile(CacheServiceProfile profile)
+ throws IncompatibleCacheServiceProfileException;
+
+ void removeCacheServiceProfile(CacheServiceProfile profile);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index bd909e1..749539c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -146,7 +146,6 @@ import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionAdvisor;
-import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionStats;
@@ -328,13 +327,19 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*/
private volatile boolean reinitialized_new = false;
- /** Lock used to prevent multiple concurrent destroy region operations */
+ /**
+ * Lock used to prevent multiple concurrent destroy region operations
+ */
private Semaphore destroyLock;
- /** GuardedBy regionExpiryLock. */
+ /**
+ * GuardedBy regionExpiryLock.
+ */
private RegionTTLExpiryTask regionTTLExpiryTask = null;
- /** GuardedBy regionExpiryLock. */
+ /**
+ * GuardedBy regionExpiryLock.
+ */
private RegionIdleExpiryTask regionIdleExpiryTask = null;
private final Object regionExpiryLock = new Object();
@@ -411,7 +416,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*/
private volatile boolean initialized = false; // added for bug 30223
- /** Used for accessing region data on disk */
+ /**
+ * Used for accessing region data on disk
+ */
private final DiskRegion diskRegion;
/**
@@ -717,19 +724,23 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return eventTracker;
}
- /** returns the regions version-vector */
+ /**
+ * returns the regions version-vector
+ */
@Override
public RegionVersionVector getVersionVector() {
return this.versionVector;
}
- /** returns object used to guard the size() operation during tombstone removal */
+ /**
+ * returns object used to guard the size() operation during tombstone removal
+ */
Object getSizeGuard() {
if (!this.getConcurrencyChecksEnabled()) {
return new Object();
} else {
return this.fullPath; // avoids creating another sync object - could be anything unique to
- // this region
+ // this region
}
}
@@ -740,7 +751,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return null;
}
- /** initializes a new version vector for this region */
+ /**
+ * initializes a new version vector for this region
+ */
private RegionVersionVector createVersionVector() {
RegionVersionVector regionVersionVector = RegionVersionVector.create(getVersionMember(), this);
@@ -826,7 +839,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.serverRegionProxy != null;
}
- /** Returns true if the ExpiryTask is currently allowed to expire. */
+ /**
+ * Returns true if the ExpiryTask is currently allowed to expire.
+ */
protected boolean isExpirationAllowed(ExpiryTask expiry) {
return true;
}
@@ -903,8 +918,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
LocalRegion newRegion = null;
try {
- if (getDestroyLock)
+ if (getDestroyLock) {
acquireDestroyLock();
+ }
LocalRegion existing = null;
try {
if (isDestroyed()) {
@@ -1083,7 +1099,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
false, // ifOld
null, // expectedOldValue
true // requireOldValue TODO txMerge why is oldValue required for
- // create? I think so that the EntryExistsException will have it.
+ // create? I think so that the EntryExistsException will have it.
)) {
throw new EntryExistsException(event.getKey().toString(), event.getOldValue());
} else {
@@ -1657,8 +1673,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
eventReturned = true;
return event;
} finally {
- if (!eventReturned)
+ if (!eventReturned) {
event.release();
+ }
}
}
@@ -1762,7 +1779,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return getDataView().getEntry(getKeyInfo(key), this, false);
}
- /** internally we often need to get an entry whether it is a tombstone or not */
+ /**
+ * internally we often need to get an entry whether it is a tombstone or not
+ */
public Region.Entry getEntry(Object key, boolean allowTombstones) {
return getDataView().getEntry(getKeyInfo(key), this, allowTombstones);
}
@@ -1784,7 +1803,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** a fast estimate of total number of entries locally in the region */
+ /**
+ * a fast estimate of total number of entries locally in the region
+ */
public long getEstimatedLocalSize() {
if (!this.isDestroyed) {
long size;
@@ -1842,7 +1863,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.isDestroyed;
}
- /** returns true if this region has been destroyed */
+ /**
+ * returns true if this region has been destroyed
+ */
@Override
public boolean isDestroyed() {
if (isClosed()) {
@@ -1864,7 +1887,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return false;
}
- /** a variant of subregions() that does not perform a readiness check */
+ /**
+ * a variant of subregions() that does not perform a readiness check
+ */
@Override
public Set basicSubregions(boolean recursive) {
return new SubregionsSet(recursive);
@@ -1886,7 +1911,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return basicEntries(recursive);
}
- /** Returns set of entries without performing validation checks. */
+ /**
+ * Returns set of entries without performing validation checks.
+ */
public Set basicEntries(boolean recursive) {
return new EntriesSet(this, recursive, IteratorType.ENTRIES, false);
}
@@ -2083,7 +2110,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* Returns the {@code DiskRegion} that this region uses to access data on disk.
*
* @return {@code null} if disk regions are not being used
- *
* @since GemFire 3.2
*/
@Override
@@ -2541,8 +2567,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
private static void releaseLatch(StoppableCountDownLatch latch) {
- if (latch == null)
+ if (latch == null) {
return;
+ }
latch.countDown();
}
@@ -2751,7 +2778,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
/**
- *
* Search for the value in a server (if one exists), then try a loader.
*
* If we find a value, we put it in the cache.
@@ -2948,7 +2974,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return result;
}
- /** @return true if this was a client region; false if not */
+ /**
+ * @return true if this was a client region; false if not
+ */
@Override
public boolean bridgeWriteBeforeDestroy(EntryEventImpl event, Object expectedOldValue)
throws CacheWriterException, EntryNotFoundException, TimeoutException {
@@ -3147,10 +3175,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
// check validity of key against keyConstraint
if (this.keyConstraint != null) {
- if (!this.keyConstraint.isInstance(key))
+ if (!this.keyConstraint.isInstance(key)) {
throw new ClassCastException(
LocalizedStrings.LocalRegion_KEY_0_DOES_NOT_SATISFY_KEYCONSTRAINT_1
.toLocalizedString(key.getClass().getName(), this.keyConstraint.getName()));
+ }
}
// We don't need to check that the key is Serializable. Instead,
@@ -3169,7 +3198,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*/
private final AtomicInteger tombstoneCount = new AtomicInteger();
- /** a boolean for issuing a client/server configuration mismatch message */
+ /**
+ * a boolean for issuing a client/server configuration mismatch message
+ */
private boolean concurrencyMessageIssued;
/**
@@ -3223,7 +3254,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.cachePerfStats;
}
- /** regions track the number of tombstones their map holds for size calculations */
+ /**
+ * regions track the number of tombstones their map holds for size calculations
+ */
public void incTombstoneCount(int delta) {
this.tombstoneCount.addAndGet(delta);
this.cachePerfStats.incTombstoneCount(delta);
@@ -3355,7 +3388,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** local regions do not perform versioning */
+ /**
+ * local regions do not perform versioning
+ */
protected boolean shouldGenerateVersionTag(RegionEntry entry, EntryEventImpl event) {
if (this.getDataPolicy().withPersistence()) {
return true;
@@ -3405,9 +3440,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* returned. This method is intended for testing.testing purposes only.
*
* @throws EntryNotFoundException No entry with {@code key} exists
- *
* @see RegionMap#getEntry
- *
* @since GemFire 3.2
*/
@Override
@@ -3476,9 +3509,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @throws EntryNotFoundException No entry with {@code key} exists
* @throws IllegalStateException If this region does not write to disk
- *
* @see RegionEntry#getValueOnDisk
- *
* @since GemFire 3.2
*/
@Override
@@ -3511,9 +3542,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @throws EntryNotFoundException No entry with {@code key} exists
* @throws IllegalStateException If this region does not write to disk
- *
* @see RegionEntry#getValueOnDisk
- *
* @since GemFire 5.1
*/
public Object getValueOnDiskOrBuffer(Object key) throws EntryNotFoundException {
@@ -4421,12 +4450,13 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
clearViaFilterClass((String) key);
break;
case InterestType.KEY:
- if (key instanceof String && key.equals("ALL_KEYS"))
+ if (key instanceof String && key.equals("ALL_KEYS")) {
clearViaRegEx(".*");
- else if (key instanceof List)
+ } else if (key instanceof List) {
clearViaList((List) key);
- else
+ } else {
localDestroyNoCallbacks(key);
+ }
break;
case InterestType.OQL_QUERY:
clearViaQuery((String) key);
@@ -4460,7 +4490,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
void reinitializeFromImageTarget(InternalDistributedMember imageTarget)
throws TimeoutException, IOException, ClassNotFoundException {
Assert.assertTrue(imageTarget != null);
@@ -4475,7 +4507,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.reinitialized_new;
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
void reinitialize_destroy(RegionEventImpl event) throws CacheWriterException, TimeoutException {
final boolean cacheWrite = !event.originRemote;
// register this region as reinitializing
@@ -4483,7 +4517,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
basicDestroyRegion(event, cacheWrite, false/* lock */, true);
}
- /** must be holding destroy lock */
+ /**
+ * must be holding destroy lock
+ */
private void recreate(InputStream inputStream, InternalDistributedMember imageTarget)
throws TimeoutException, IOException, ClassNotFoundException {
String thePath = getFullPath();
@@ -4658,7 +4694,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
/**
* Called by a thread that is doing region initialization. Causes the initialization Latch to be
* bypassed by this thread.
- *
*/
public static int setThreadInitLevelRequirement(int level) {
int oldLevel = threadInitLevelRequirement();
@@ -4707,7 +4742,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return latch.getCount() == 0;
}
- /** wait on the initialization Latch based on thread requirements */
+ /**
+ * wait on the initialization Latch based on thread requirements
+ */
@Override
public void waitOnInitialization() {
if (this.initialized) {
@@ -4762,7 +4799,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
waitOnInitialization(this.getInitializationLatchAfterGetInitialImage());
}
- /** return null if not found */
+ /**
+ * return null if not found
+ */
@Override
public RegionEntry basicGetEntry(Object key) {
// ok to ignore tx state; all callers are non-transactional
@@ -4899,7 +4938,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* than any other surviving members. So they shouldn't have any entries in their cache that match
* entries that we failed to receive through the GII but are reflected in our current RVV. So it
* should be safe to start with the current RVV.
- *
*/
void repairRVV() {
RegionVersionVector rvv = this.getVersionVector();
@@ -5058,7 +5096,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param event the event object for this operation, with the exception that the oldValue
* parameter is not yet filled in. The oldValue will be filled in by this operation.
- *
* @param ifNew true if this operation must not overwrite an existing key
* @param ifOld true if this operation must not create a new key
* @param expectedOldValue only succeed if old value is equal to this value. If null, then doesn't
@@ -5599,7 +5636,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param event the event object for this operation, with the exception that the oldValue
* parameter is not yet filled in. The oldValue will be filled in by this operation.
- *
* @param ifNew true if this operation must not overwrite an existing key
* @param ifOld true if this operation must not create a new entry
* @param lastModified the lastModified time to set with the value; if 0L, then the lastModified
@@ -6055,7 +6091,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
*
* @param task - a Runnable to wrap the processing of the bulk op
* @param eventId - the base event ID of the bulk op
- *
* @since GemFire 5.7
*/
@Override
@@ -6335,9 +6370,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
/**
* Called after this region has been completely created
*
- * @since GemFire 5.0
- *
* @see DistributedRegion#postDestroyRegion(boolean, RegionEventImpl)
+ * @since GemFire 5.0
*/
@Override
public void postCreateRegion() {
@@ -7022,7 +7056,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** @return true if initialization is complete */
+ /**
+ * @return true if initialization is complete
+ */
@Override
public boolean isInitialized() {
if (this.initialized) {
@@ -7139,7 +7175,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* to map entry, and key must be in map
*
* @param lastModified time, may be 0 in which case uses now instead
- *
* @return the actual lastModifiedTime used.
*/
@Override
@@ -7244,7 +7279,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** The listener is not closed until after the afterRegionDestroy event */
+ /**
+ * The listener is not closed until after the afterRegionDestroy event
+ */
protected void closeCallbacksExceptListener() {
closeCacheCallback(getCacheLoader());
closeCacheCallback(getCacheWriter());
@@ -7254,7 +7291,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** This is only done when the cache is closed. */
+ /**
+ * This is only done when the cache is closed.
+ */
private void closeAllCallbacks() {
closeCallbacksExceptListener();
CacheListener[] listeners = fetchCacheListenersField();
@@ -7620,7 +7659,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* the region have been set.
*
* @return {@code null} is a disk region is not desired
- *
* @since GemFire 3.2
*/
protected DiskRegion createDiskRegion(InternalRegionArguments internalRegionArgs)
@@ -7906,8 +7944,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (other.region != null) {
return false;
}
- } else if (!this.region.equals(other.region))
+ } else if (!this.region.equals(other.region)) {
return false;
+ }
return true;
}
@@ -8199,10 +8238,12 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
private void cancelAllEntryExpiryTasks() {
// This method gets called during LocalRegion construction
// in which case the final entryExpiryTasks field can still be null
- if (this.entryExpiryTasks == null)
+ if (this.entryExpiryTasks == null) {
return;
- if (this.entryExpiryTasks.isEmpty())
+ }
+ if (this.entryExpiryTasks.isEmpty()) {
return;
+ }
boolean doPurge = false;
for (EntryExpiryTask task : this.entryExpiryTasks.values()) {
// no need to call incCancels since we will call forcePurge
@@ -8272,7 +8313,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
getImageState().readUnlockRI();
}
- /** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */
+ /**
+ * doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor
+ */
LocalRegion basicGetParentRegion() {
return this.parentRegion;
}
@@ -8454,7 +8497,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
this.entries.decTxRefCount(regionEntry);
}
- /** Does not throw RegionDestroyedException even if destroyed */
+ /**
+ * Does not throw RegionDestroyedException even if destroyed
+ */
List debugGetSubregionNames() {
List names = new ArrayList();
names.addAll(this.subregions.keySet());
@@ -8559,7 +8604,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** Set view of subregions */
+ /**
+ * Set view of subregions
+ */
private class SubregionsSet extends AbstractSet {
final boolean recursive;
@@ -8714,7 +8761,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
this.key = regionEntry.getKey();
}
- /** Internal method for getting the underlying RegionEntry */
+ /**
+ * Internal method for getting the underlying RegionEntry
+ */
public RegionEntry getRegionEntry() {
RegionEntry regionEntry = LocalRegion.this.getRegionMap().getEntry(this.key);
if (regionEntry == null) {
@@ -10365,7 +10414,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
FilterProfile filterProfile;
/**
- *
* @return int array containing the IDs of the oplogs which will potentially get rolled else null
* if no oplogs were available at the time of signal or region is not having disk
* persistence. Pls note that the actual number of oplogs rolled may be more than what is
@@ -10479,10 +10527,31 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
return this.cacheServiceProfiles.getSnapshot();
}
- public void addCacheServiceProfile(CacheServiceProfile profile) {
- this.cacheServiceProfiles.put(profile.getId(), profile);
+ @Override
+ public void addCacheServiceProfile(CacheServiceProfile profile)
+ throws IncompatibleCacheServiceProfileException {
+ synchronized (this.cacheServiceProfiles) {
+ CacheServiceProfile cacheServiceProfile =
+ this.cacheServiceProfiles.putIfAbsent(profile.getId(), profile);
+ if (cacheServiceProfile == null) {
+ this.validateProfiles();
+ } else {
+ throw new IncompatibleCacheServiceProfileException(
+ "A profile for: " + profile.getId() + " has already been defined");
+ }
+
+ }
+ }
+
+ @Override
+ public void removeCacheServiceProfile(CacheServiceProfile profile) {
+ synchronized (this.cacheServiceProfiles) {
+ this.cacheServiceProfiles.remove(profile.getId(), profile);
+ }
}
+ protected void validateProfiles() throws IncompatibleCacheServiceProfileException {}
+
@Override
public LoaderHelper createLoaderHelper(Object key, Object callbackArgument,
boolean netSearchAllowed, boolean netLoadAllowed, SearchLoadAndWriteProcessor searcher) {
@@ -10490,42 +10559,38 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
searcher);
}
- /** visitor over the CacheProfiles to check if the region has a CacheLoader */
+ /**
+ * visitor over the CacheProfiles to check if the region has a CacheLoader
+ */
private static final DistributionAdvisor.ProfileVisitor<Void> netLoaderVisitor =
- new DistributionAdvisor.ProfileVisitor<Void>() {
- @Override
- public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex,
- int numProfiles, Void aggregate) {
- assert profile instanceof CacheProfile;
- final CacheProfile prof = (CacheProfile) profile;
-
- // if region in cache is not yet initialized, exclude
- if (prof.regionInitialized) { // fix for bug 41102
- // cut the visit short if we find a CacheLoader
- return !prof.hasCacheLoader;
- }
- // continue the visit
- return true;
+ (advisor, profile, profileIndex, numProfiles, aggregate) -> {
+ assert profile instanceof CacheProfile;
+ final CacheProfile prof = (CacheProfile) profile;
+
+ // if region in cache is not yet initialized, exclude
+ if (prof.regionInitialized) { // fix for bug 41102
+ // cut the visit short if we find a CacheLoader
+ return !prof.hasCacheLoader;
}
+ // continue the visit
+ return true;
};
- /** visitor over the CacheProfiles to check if the region has a CacheWriter */
+ /**
+ * visitor over the CacheProfiles to check if the region has a CacheWriter
+ */
private static final DistributionAdvisor.ProfileVisitor<Void> netWriterVisitor =
- new DistributionAdvisor.ProfileVisitor<Void>() {
- @Override
- public boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex,
- int numProfiles, Void aggregate) {
- assert profile instanceof CacheProfile;
- final CacheProfile prof = (CacheProfile) profile;
-
- // if region in cache is in recovery
- if (!prof.inRecovery) {
- // cut the visit short if we find a CacheWriter
- return !prof.hasCacheWriter;
- }
- // continue the visit
- return true;
+ (advisor, profile, profileIndex, numProfiles, aggregate) -> {
+ assert profile instanceof CacheProfile;
+ final CacheProfile prof = (CacheProfile) profile;
+
+ // if region in cache is in recovery
+ if (!prof.inRecovery) {
+ // cut the visit short if we find a CacheWriter
+ return !prof.hasCacheWriter;
}
+ // continue the visit
+ return true;
};
/**
@@ -11522,7 +11587,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
}
- /** test hook - dump the backing map for this region */
+ /**
+ * test hook - dump the backing map for this region
+ */
public void dumpBackingMap() {
synchronized (this.entries) {
if (this.entries instanceof AbstractRegionMap) {
@@ -11578,7 +11645,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @return previous value associated with specified key, or <tt>null</tt> if there was no mapping
* for key. A <tt>null</tt> return can also indicate that the entry in the region was
* previously in an invalidated state.
- *
* @throws ClassCastException if key does not satisfy the keyConstraint
* @throws IllegalArgumentException if the key or value is not serializable and this is a
* distributed region
@@ -12054,14 +12120,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
* @param indexName the name of the index
* @param indexedExpression the index expression
* @param fromClause the from clause.
- *
* @return The index map.
- *
* @throws IllegalStateException if this region is not using soplog persistence
- *
* @throws IllegalStateException if this index was previously persisted with a different
* expression or from clause.
- *
*/
public IndexMap getIndexMap(String indexName, String indexedExpression, String fromClause) {
return new IndexMapImpl();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index f3d1f25..3c36ab4 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -299,16 +299,13 @@ public class PartitionedRegion extends LocalRegion
/**
* Thread specific random number
*/
- private static ThreadLocal threadRandom = new ThreadLocal() {
- @Override
- protected Object initialValue() {
- int i = RANDOM.nextInt();
- if (i < 0) {
- i = -1 * i;
- }
- return i;
+ private static ThreadLocal threadRandom = ThreadLocal.withInitial(() -> {
+ int i = RANDOM.nextInt();
+ if (i < 0) {
+ i = -1 * i;
}
- };
+ return i;
+ });
/**
* Global Region for storing PR config ( PRName->PRConfig). This region would be used to resolve
@@ -317,7 +314,6 @@ public class PartitionedRegion extends LocalRegion
private volatile Region<String, PartitionRegionConfig> prRoot;
/**
- *
* PartitionedRegionDataStore class takes care of data storage for the PR. This will contain the
* bucket Regions to store data entries for PR*
*/
@@ -328,13 +324,19 @@ public class PartitionedRegion extends LocalRegion
*/
private final RegionAdvisor distAdvisor;
- /** Logging mechanism for debugging */
+ /**
+ * Logging mechanism for debugging
+ */
private static final Logger logger = LogService.getLogger();
- /** cleanup flags * */
+ /**
+ * cleanup flags *
+ */
private boolean cleanPRRegistration = false;
- /** Time to wait for for acquiring distributed lock ownership */
+ /**
+ * Time to wait for for acquiring distributed lock ownership
+ */
private static final long VM_OWNERSHIP_WAIT_TIME = PRSystemPropertyGetter.parseLong(
System.getProperty(PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_PROPERTY),
PartitionedRegionHelper.VM_OWNERSHIP_WAIT_TIME_DEFAULT);
@@ -359,10 +361,14 @@ public class PartitionedRegion extends LocalRegion
*/
static final float rebalanceThreshold = 0.75f;
- /** The maximum memory allocated for this node in Mb */
+ /**
+ * The maximum memory allocated for this node in Mb
+ */
final int localMaxMemory;
- /** The maximum milliseconds for retrying operations */
+ /**
+ * The maximum milliseconds for retrying operations
+ */
private final int retryTimeout;
/**
@@ -372,7 +378,9 @@ public class PartitionedRegion extends LocalRegion
// private Random random = new Random(System.currentTimeMillis());
- /** Number of initial buckets */
+ /**
+ * Number of initial buckets
+ */
private final int totalNumberOfBuckets;
// private static final boolean throwIfNoNodesLeft = true;
@@ -396,7 +404,6 @@ public class PartitionedRegion extends LocalRegion
/**
* Flag to indicate whether region is closed
- *
*/
public volatile boolean isClosed = false;
@@ -476,6 +483,10 @@ public class PartitionedRegion extends LocalRegion
colocationListeners.remove(colocationListener);
}
+ @Override
+ protected void validateProfiles() throws IncompatibleCacheServiceProfileException {
+ new ValidateCacheServerProfileProcessor(this).validateCacheServerProfiles();
+ }
static PRIdMap getPrIdToPR() {
return prIdToPR;
@@ -562,7 +573,9 @@ public class PartitionedRegion extends LocalRegion
return getPRId() + BUCKET_ID_SEPARATOR + bucketId;
}
- /** Separator between PRId and bucketId for creating bucketString */
+ /**
+ * Separator between PRId and bucketId for creating bucketString
+ */
public static final String BUCKET_ID_SEPARATOR = ":";
/**
@@ -660,8 +673,9 @@ public class PartitionedRegion extends LocalRegion
.toLocalizedString());
}
Assert.assertTrue(key instanceof Integer);
- if (sendIdentityRequestMessage)
+ if (sendIdentityRequestMessage) {
IdentityRequestMessage.setLatestId((Integer) key);
+ }
if ((super.get(key) == DESTROYED) && (value instanceof PartitionedRegion)) {
throw new PartitionedRegionException(
LocalizedStrings.PartitionedRegion_CAN_NOT_REUSE_OLD_PARTITIONED_REGION_ID_0
@@ -693,10 +707,14 @@ public class PartitionedRegion extends LocalRegion
private int partitionedRegionId = -3;
- /** Node description */
+ /**
+ * Node description
+ */
private final Node node;
- /** Helper Object for redundancy Management of PartitionedRegion */
+ /**
+ * Helper Object for redundancy Management of PartitionedRegion
+ */
private final PRHARedundancyProvider redundancyProvider;
/**
@@ -1333,7 +1351,6 @@ public class PartitionedRegion extends LocalRegion
*
* @param storesData which indicates whether the instance in this cache stores data, effecting the
* Nodes PRType
- *
* @see Node#setPRType(int)
*/
private void registerPartitionedRegion(boolean storesData) {
@@ -1754,9 +1771,9 @@ public class PartitionedRegion extends LocalRegion
* @param key the key, whose value needs to be checks
* @param access true if caller wants last access time updated
* @param allowTombstones whether tombstones should be returned
+ * @return true if the passed key is contained remotely.
* @throws EntryNotFoundException if the entry doesn't exist
* @throws ForceReattemptException if the peer is no longer available
- * @return true if the passed key is contained remotely.
*/
public EntrySnapshot getEntryRemotely(InternalDistributedMember targetNode, Integer bucketId,
Object key, boolean access, boolean allowTombstones)
@@ -1777,8 +1794,8 @@ public class PartitionedRegion extends LocalRegion
// /////////////////////////////////////////////////////////////////
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void becomeLockGrantor() {
@@ -1786,8 +1803,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public Region createSubregion(String subregionName, RegionAttributes regionAttributes)
@@ -1796,8 +1813,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public Lock getDistributedLock(Object key) throws IllegalStateException {
@@ -1805,8 +1822,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public CacheStatistics getStatistics() {
@@ -1814,16 +1831,16 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
public Region getSubregion() {
throw new UnsupportedOperationException();
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public Lock getRegionDistributedLock() throws IllegalStateException {
@@ -1832,8 +1849,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void loadSnapshot(InputStream inputStream)
@@ -1851,8 +1868,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void localInvalidate(Object key, Object aCallbackArgument) throws EntryNotFoundException {
@@ -1861,8 +1878,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void localInvalidateRegion(Object aCallbackArgument) {
@@ -1875,7 +1892,6 @@ public class PartitionedRegion extends LocalRegion
* query is a SELECT expression, and the only region it refers to is this region.
*
* @see DefaultQuery#execute()
- *
* @since GemFire 5.1
*/
public Object executeQuery(DefaultQuery query, Object[] parameters, Set buckets)
@@ -2010,8 +2026,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void saveSnapshot(OutputStream outputStream) throws IOException {
@@ -2019,8 +2035,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void writeToDisk() {
@@ -2028,8 +2044,8 @@ public class PartitionedRegion extends LocalRegion
}
/**
- * @since GemFire 5.0
* @throws UnsupportedOperationException OVERRIDES
+ * @since GemFire 5.0
*/
@Override
public void clear() {
@@ -2785,10 +2801,9 @@ public class PartitionedRegion extends LocalRegion
* @param ifOld true=can update existing entry false=can't update existing entry
* @param expectedOldValue only succeed if old value is equal to this value. If null, then doesn't
* matter what old value is. If INVALID token, must be INVALID.
- * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, boolean, long, boolean)
* @return false if ifNew is true and there is an existing key, or ifOld is true and there is no
* existing entry; otherwise return true.
- *
+ * @see LocalRegion#virtualPut(EntryEventImpl, boolean, boolean, Object, boolean, long, boolean)
*/
private boolean putInBucket(final InternalDistributedMember targetNode, final Integer bucketId,
final EntryEventImpl event, final boolean ifNew, boolean ifOld, Object expectedOldValue,
@@ -3203,7 +3218,6 @@ public class PartitionedRegion extends LocalRegion
* speed up operation and avoid remote calls.
*
* @param bucketId identifier for bucket
- *
* @return the primary member's id or null if there is no storage
*/
public InternalDistributedMember getNodeForBucketRead(int bucketId) {
@@ -4703,7 +4717,7 @@ public class PartitionedRegion extends LocalRegion
nodeToBuckets.entrySet().iterator(); itr.hasNext();) {
Map.Entry<InternalDistributedMember, HashSet<Integer>> entry = itr.next();
HashSet<Integer> buckets = new HashSet<Integer>(entry.getValue()); // Is it needed to copy the
- // set here?
+ // set here?
FetchBulkEntriesResponse fber = null;
result = new HashSet();
@@ -4828,7 +4842,6 @@ public class PartitionedRegion extends LocalRegion
/**
* This method returns Partitioned Region data store associated with this Partitioned Region
- *
*/
public PartitionedRegionDataStore getDataStore() {
return this.dataStore;
@@ -4938,7 +4951,9 @@ public class PartitionedRegion extends LocalRegion
return this.distAdvisor;
}
- /** Returns the distribution profile; lazily creates one if needed */
+ /**
+ * Returns the distribution profile; lazily creates one if needed
+ */
public Profile getProfile() {
return this.distAdvisor.createProfile();
}
@@ -4980,7 +4995,9 @@ public class PartitionedRegion extends LocalRegion
profile.isOffHeap = getOffHeap();
}
- /** set fields that are only in PartitionProfile... */
+ /**
+ * set fields that are only in PartitionProfile...
+ */
public void fillInProfile(PartitionProfile profile) {
// both isDataStore and numBuckets are not required for sending purposes,
// but nice to have for toString debugging
@@ -5109,7 +5126,6 @@ public class PartitionedRegion extends LocalRegion
/**
* This method returns prId
- *
*/
public int getPRId() {
return this.partitionedRegionId;
@@ -5128,7 +5144,6 @@ public class PartitionedRegion extends LocalRegion
/**
* This method returns total number of buckets for this PR
- *
*/
public int getTotalNumberOfBuckets() {
@@ -5379,8 +5394,9 @@ public class PartitionedRegion extends LocalRegion
return true;
} else {
for (String s : groups) {
- if (localServerGroups.contains(s))
+ if (localServerGroups.contains(s)) {
return true;
+ }
}
}
}
@@ -5915,7 +5931,9 @@ public class PartitionedRegion extends LocalRegion
private class EntriesSetIterator extends KeysSetIterator {
- /** reusable KeyInfo */
+ /**
+ * reusable KeyInfo
+ */
private final KeyInfo key = new KeyInfo(null, null, null);
public EntriesSetIterator(Set bucketSet, boolean allowTombstones) {
@@ -5989,7 +6007,9 @@ public class PartitionedRegion extends LocalRegion
return Collections.unmodifiableSet(new KeysSet(availableBuckets));
}
- /** Set view of entries */
+ /**
+ * Set view of entries
+ */
protected class KeysSet extends EntriesSet {
class KeysSetIterator implements PREntriesIterator<Object> {
final Iterator<Integer> bucketSetI;
@@ -6166,7 +6186,9 @@ public class PartitionedRegion extends LocalRegion
Object nextValue = null;
- /** reusable KeyInfo */
+ /**
+ * reusable KeyInfo
+ */
private final KeyInfo key = new KeyInfo(null, null, null);
public ValuesSetIterator(Set bucketSet) {
@@ -6606,7 +6628,6 @@ public class PartitionedRegion extends LocalRegion
/**
* Returns the lockname used by Distributed Lock service to clean the
* {@code allPartitionedRegions}.
- *
*/
private String getLockNameForBucket2NodeModification(int bucketID) {
return (getRegionIdentifier() + ":" + bucketID);
@@ -6870,14 +6891,16 @@ public class PartitionedRegion extends LocalRegion
}
BucketLock other = (BucketLock) obj;
- if (!this.lockName.equals(other.lockName))
+ if (!this.lockName.equals(other.lockName)) {
return false;
+ }
DLockService ls1 = lockService;
DLockService ls2 = other.lockService;
if (ls1 == null || ls2 == null) {
- if (ls1 != ls2)
+ if (ls1 != ls2) {
return false;
+ }
}
return ls1.equals(ls2);
}
@@ -7139,7 +7162,7 @@ public class PartitionedRegion extends LocalRegion
}
if (cacheWrite) { // in case of destroy operation senders should be
- // resumed
+ // resumed
if (sender.isPaused()) {
pausedSenders.add(senderId);
continue;
@@ -7147,11 +7170,12 @@ public class PartitionedRegion extends LocalRegion
}
if (pausedSenders.isEmpty()) { // if there are puase sender then only
- // check for other pause senders instead
- // of creating list of shadowPR
+ // check for other pause senders instead
+ // of creating list of shadowPR
AbstractGatewaySenderEventProcessor ep = sender.getEventProcessor();
- if (ep == null)
+ if (ep == null) {
continue;
+ }
ConcurrentParallelGatewaySenderQueue parallelQueue =
(ConcurrentParallelGatewaySenderQueue) ep.getQueue();
PartitionedRegion parallelQueueRegion = parallelQueue.getRegion(this.getFullPath());
@@ -7196,12 +7220,12 @@ public class PartitionedRegion extends LocalRegion
if (cacheWrite && parallelQueueRegion.size() != 0 && keepWaiting) {
continue;
} else {// In any case, destroy shadow PR locally. distributed destroy of
- // userPR will take care of detsroying shadowPR locally on other
- // nodes.
+ // userPR will take care of detsroying shadowPR locally on other
+ // nodes.
RegionEventImpl event = null;
if (op.isClose()) { // In case of cache close operation, we want SPR's basic destroy to go
- // through CACHE_CLOSE condition of postDestroyRegion not
- // closePartitionedRegion code
+ // through CACHE_CLOSE condition of postDestroyRegion not
+ // closePartitionedRegion code
event = new RegionEventImpl(parallelQueueRegion, op, null, false, getMyId(),
generateEventID());
} else {
@@ -7285,7 +7309,6 @@ public class PartitionedRegion extends LocalRegion
* Destroys bucket2node region
*
* @param event the RegionEvent that triggered this operation
- *
* @see #destroyPartitionedRegionLocally(boolean)
* @see #destroyPartitionedRegionGlobally(RegionEventImpl)
* @see #destroyCleanUp(RegionEventImpl, int[])
@@ -7404,7 +7427,6 @@ public class PartitionedRegion extends LocalRegion
* Destroys bucket2node region <br>
*
* @param event the RegionEvent that triggered the region clean up
- *
* @see DestroyPartitionedRegionMessage
*/
private void destroyCleanUp(RegionEventImpl event, int serials[]) {
@@ -7499,7 +7521,6 @@ public class PartitionedRegion extends LocalRegion
* this call returns, else if process with the removal from prIdMap and dataStore cleanup.
*
* @return wether local destroy happened
- *
* @see #destroyPartitionedRegion(RegionEventImpl)
*/
boolean destroyPartitionedRegionLocally(boolean removeFromDisk) {
@@ -7959,7 +7980,9 @@ public class PartitionedRegion extends LocalRegion
return localPrimaryList;
}
- /** doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor */
+ /**
+ * doesn't throw RegionDestroyedException, used by CacheDistributionAdvisor
+ */
@Override
public DistributionAdvisee getParentAdvisee() {
return (DistributionAdvisee) basicGetParentRegion();
@@ -8181,7 +8204,6 @@ public class PartitionedRegion extends LocalRegion
/**
* Returns the a PartitionedIndex on this partitioned region.
- *
*/
public PartitionedIndex getIndex(String indexName) {
Iterator iter = this.indexes.values().iterator();
@@ -8228,7 +8250,6 @@ public class PartitionedRegion extends LocalRegion
* @param indexedExpression expression for index creation.
* @param fromClause the from clause for index creation
* @param imports class to be imported for fromClause.
- *
* @return Index an index created on this region.
* @throws ForceReattemptException indicating the operation failed to create a remote index
* @throws IndexCreationException if the index is not created properly
@@ -8706,7 +8727,6 @@ public class PartitionedRegion extends LocalRegion
* Removes a particular index on this partitioned regions instance.
*
* @param ind Index to be removed.
- *
*/
public int removeIndex(Index ind, boolean remotelyOriginated)
throws CacheException, ForceReattemptException {
@@ -9212,8 +9232,8 @@ public class PartitionedRegion extends LocalRegion
* LOCAL_DESTROY and the region is {@link DataPolicy#withReplication replicated} or if the
* the ExpirationAction is LOCAL_INVALIDATE and the region is
* {@link DataPolicy#withReplication replicated}
- * @see AttributesFactory#setStatisticsEnabled
* @throws IllegalStateException if statistics are disabled for this region.
+ * @see AttributesFactory#setStatisticsEnabled
*/
@Override
public ExpirationAttributes setEntryIdleTimeout(ExpirationAttributes idleTimeout) {
@@ -9300,7 +9320,9 @@ public class PartitionedRegion extends LocalRegion
}
List<BucketRegion> bucketList = new ArrayList<>();
if (!bucketSortedOnce.get()) {
- while (bucketSortedOnce.get() == false);
+ while (bucketSortedOnce.get() == false) {
+ ;
+ }
}
bucketList.addAll(this.sortedBuckets);
return bucketList;
@@ -9730,8 +9752,9 @@ public class PartitionedRegion extends LocalRegion
* @since GemFire 6.1.2.9
*/
public BucketRegion getBucketRegion(Object key) {
- if (this.dataStore == null)
+ if (this.dataStore == null) {
return null;
+ }
Integer bucketId = PartitionedRegionHelper.getHashKey(this, null, key, null, null);
return this.dataStore.getLocalBucketById(bucketId);
}
@@ -9952,8 +9975,9 @@ public class PartitionedRegion extends LocalRegion
assert this.isShadowPR();
PartitionedRegion userPR = ColocationHelper.getLeaderRegion(this);
boolean isAccessor = (userPR.getLocalMaxMemory() == 0);
- if (isAccessor)
+ if (isAccessor) {
return; // return from here if accessor node
+ }
// Before going ahead, make sure all the buckets of shadowPR are
// loaded
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
index b0effb2..2cfc039 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/UpdateAttributesProcessor.java
@@ -20,6 +20,7 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -48,13 +49,12 @@ import org.apache.geode.internal.logging.LogService;
* This class is a bit misnamed. It really has more with pushing a DistributionAdvisee's profile out
* to others and, optionally if <code>profileExchange</code>, fetching the profile of anyone who
* excepts the pushed profile.
- *
*/
public class UpdateAttributesProcessor {
private static final Logger logger = LogService.getLogger();
protected final DistributionAdvisee advisee;
- private boolean profileExchange = false;
+
/**
* If true then sender is telling receiver to remove the sender's profile. No profile exchange is
* needed in this case.
@@ -62,9 +62,10 @@ public class UpdateAttributesProcessor {
* @since GemFire 5.7
*/
private boolean removeProfile = false;
- private ReplyProcessor21 processor;
- /** Creates a new instance of UpdateAttributesProcessor */
+ /**
+ * Creates a new instance of UpdateAttributesProcessor
+ */
public UpdateAttributesProcessor(DistributionAdvisee da) {
this(da, false);
}
@@ -93,11 +94,11 @@ public class UpdateAttributesProcessor {
* @param exchangeProfiles true if we want to receive profile replies
*/
public void distribute(boolean exchangeProfiles) {
- sendProfileUpdate(exchangeProfiles);
- waitForProfileResponse();
+ ReplyProcessor21 processor = sendProfileUpdate(exchangeProfiles);
+ waitForProfileResponse(processor);
}
- public void waitForProfileResponse() {
+ public void waitForProfileResponse(ReplyProcessor21 processor) {
if (processor == null) {
return;
}
@@ -115,10 +116,9 @@ public class UpdateAttributesProcessor {
}
}
- public void sendProfileUpdate(boolean exchangeProfiles) {
+ private ReplyProcessor21 sendProfileUpdate(boolean exchangeProfiles) {
DistributionManager mgr = this.advisee.getDistributionManager();
DistributionAdvisor advisor = this.advisee.getDistributionAdvisor();
- this.profileExchange = exchangeProfiles;
// if this is not intended for the purpose of exchanging profiles but
// the advisor is uninitialized, then just exchange profiles anyway
@@ -130,51 +130,43 @@ public class UpdateAttributesProcessor {
if (!advisor.isInitialized()) {
// no need to tell the other guy we are going away since
// never got initialized.
- return;
+ return null;
}
} else if (advisor.initializationGate()) {
// it just did the profile exchange so we are done
- return;
+ return null;
}
}
- final Set recipients;
- if (this.removeProfile) {
- recipients = advisor.adviseProfileRemove();
- } else if (exchangeProfiles) {
- recipients = advisor.adviseProfileExchange();
- } else {
- recipients = advisor.adviseProfileUpdate();
- }
-
+ final Set recipients = determineRecipients(exchangeProfiles, advisor);
if (recipients.isEmpty()) {
- return;
+ return null;
}
- ReplyProcessor21 processor = null;
- // Scope scope = this.region.scope;
-
// always require an ack to prevent misordering of messages
InternalDistributedSystem system = this.advisee.getSystem();
- processor = new UpdateAttributesReplyProcessor(system, recipients);
- UpdateAttributesMessage message = getUpdateAttributesMessage(processor, recipients);
+ UpdateAttributesReplyProcessor replyProcessor =
+ new UpdateAttributesReplyProcessor(system, recipients);
+ UpdateAttributesMessage message = getUpdateAttributesMessage(replyProcessor, exchangeProfiles);
mgr.putOutgoing(message);
- this.processor = processor;
+ return replyProcessor;
}
-
- UpdateAttributesMessage getUpdateAttributesMessage(ReplyProcessor21 processor, Set recipients) {
-
- UpdateAttributesMessage msg = new UpdateAttributesMessage();
- msg.adviseePath = this.advisee.getFullPath();
- msg.setRecipients(recipients);
- if (processor != null) {
- msg.processorId = processor.getProcessorId();
+ private Set determineRecipients(boolean exchangeProfiles, DistributionAdvisor advisor) {
+ if (this.removeProfile) {
+ return advisor.adviseProfileRemove();
+ } else if (exchangeProfiles) {
+ return advisor.adviseProfileExchange();
+ } else {
+ return advisor.adviseProfileUpdate();
}
- msg.profile = this.advisee.getProfile();
- msg.exchangeProfiles = this.profileExchange;
- msg.removeProfile = this.removeProfile;
- return msg;
+ }
+
+ private UpdateAttributesMessage getUpdateAttributesMessage(
+ UpdateAttributesReplyProcessor processor, boolean exchangeProfiles) {
+ return new UpdateAttributesMessage(this.advisee.getFullPath(), processor.getRecipients(),
+ processor.getProcessorId(), this.advisee.getProfile(), exchangeProfiles,
+ this.removeProfile);
}
class UpdateAttributesReplyProcessor extends ReplyProcessor21 {
@@ -236,12 +228,9 @@ public class UpdateAttributesProcessor {
if (msg instanceof ProfilesReplyMessage) {
ProfilesReplyMessage reply = (ProfilesReplyMessage) msg;
if (reply.profiles != null) {
- for (int i = 0; i < reply.profiles.length; i++) {
- // @todo Add putProfiles to DistributionAdvisor to do this
- // with one call atomically?
- UpdateAttributesProcessor.this.advisee.getDistributionAdvisor()
- .putProfile(reply.profiles[i]);
- }
+ Arrays.stream(reply.profiles)
+ .forEach((profile) -> UpdateAttributesProcessor.this.advisee
+ .getDistributionAdvisor().putProfile(profile));
}
} else if (msg instanceof ProfileReplyMessage) {
ProfileReplyMessage reply = (ProfileReplyMessage) msg;
@@ -254,17 +243,32 @@ public class UpdateAttributesProcessor {
super.process(msg);
}
}
- }
+ public Collection getRecipients() {
+ return Arrays.asList(members);
+ }
+ }
public static class UpdateAttributesMessage extends HighPriorityDistributionMessage
implements MessageWithReply {
- protected String adviseePath;
- protected int processorId = 0;
+ String adviseePath;
+ protected int processorId;
protected Profile profile;
- protected boolean exchangeProfiles = false;
- protected boolean removeProfile = false;
+ boolean exchangeProfiles;
+ boolean removeProfile;
+
+ public UpdateAttributesMessage() {}
+
+ public UpdateAttributesMessage(String adviseePath, Collection recipients, int processorId,
+ Profile profile, boolean exchangeProfiles, boolean removeProfile) {
+ this.adviseePath = adviseePath;
+ this.setRecipients(recipients);
+ this.processorId = processorId;
+ this.profile = profile;
+ this.exchangeProfiles = exchangeProfiles;
+ this.removeProfile = removeProfile;
+ }
@Override
public int getProcessorId() {
@@ -284,7 +288,7 @@ public class UpdateAttributesProcessor {
try {
if (this.profile != null) {
if (this.exchangeProfiles) {
- replyProfiles = new ArrayList<Profile>();
+ replyProfiles = new ArrayList<>();
}
this.profile.processIncoming(dm, this.adviseePath, this.removeProfile,
this.exchangeProfiles, replyProfiles);
@@ -330,7 +334,7 @@ public class UpdateAttributesProcessor {
@Override
public String toString() {
StringBuilder buff = new StringBuilder();
- buff.append("UpdateAttributesMessage (adviseePath=");
+ buff.append(this.getClass().getName() + " (adviseePath=");
buff.append(this.adviseePath);
buff.append("; processorId=");
buff.append(this.processorId);
@@ -374,7 +378,6 @@ public class UpdateAttributesProcessor {
}
}
-
public static class ProfileReplyMessage extends ReplyMessage {
Profile profile;
@@ -439,6 +442,7 @@ public class UpdateAttributesProcessor {
}
}
+
/**
* Used to return multiple profiles
*
@@ -465,14 +469,12 @@ public class UpdateAttributesProcessor {
}
-
@Override
public int getDSFID() {
return PROFILES_REPLY_MESSAGE;
}
-
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
@@ -507,6 +509,7 @@ public class UpdateAttributesProcessor {
final StringBuilder buff = new StringBuilder();
buff.append("ProfilesReplyMessage");
buff.append(" (processorId=");
+
buff.append(super.processorId);
if (this.profiles != null) {
buff.append("; profiles=");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ValidateCacheServerProfileProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidateCacheServerProfileProcessor.java
new file mode 100644
index 0000000..8a67ff5
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ValidateCacheServerProfileProcessor.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.DistributionAdvisee;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyException;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.logging.LogService;
+
+public class ValidateCacheServerProfileProcessor {
+ private static final Logger logger = LogService.getLogger();
+
+ protected final DistributionAdvisee advisee;
+
+ public ValidateCacheServerProfileProcessor(DistributionAdvisee distributionAdvisee) {
+ this.advisee = distributionAdvisee;
+ }
+
+ private ValidateCacheProfileMessage getValidateCacheProfileMessage(
+ ValidateProfileUpdateReplyProcessor processor) {
+ ValidateCacheProfileMessage validateCacheProfileMessage =
+ new ValidateCacheProfileMessage(this.advisee.getFullPath(), processor.getRecipients(),
+ processor.getProcessorId(), this.advisee.getProfile());
+ return validateCacheProfileMessage;
+ }
+
+ void validateCacheServerProfiles() throws IncompatibleCacheServiceProfileException {
+ final Set recipients = this.advisee.getDistributionAdvisor().adviseProfileUpdate();
+
+ if (recipients.isEmpty()) {
+ return;
+ }
+
+ ValidateProfileUpdateReplyProcessor replyProcessor =
+ new ValidateProfileUpdateReplyProcessor(this.advisee, recipients);
+ ValidateCacheProfileMessage message = getValidateCacheProfileMessage(replyProcessor);
+ this.advisee.getDistributionManager().putOutgoing(message);
+ waitForProfileResponse(replyProcessor);
+ }
+
+ public void waitForProfileResponse(ReplyProcessor21 processor)
+ throws IncompatibleCacheServiceProfileException {
+ if (processor == null) {
+ return;
+ }
+ DistributionManager mgr = this.advisee.getDistributionManager();
+ try {
+ // bug 36983 - you can't loop on a reply processor
+ mgr.getCancelCriterion().checkCancelInProgress(null);
+ try {
+ processor.waitForRepliesUninterruptibly();
+ } catch (ReplyException e) {
+ if (e.getCause() instanceof IncompatibleCacheServiceProfileException) {
+ throw (IncompatibleCacheServiceProfileException) e.getCause();
+ }
+ }
+ } finally {
+ processor.cleanup();
+ }
+ }
+
+ class ValidateProfileUpdateReplyProcessor extends ReplyProcessor21 {
+
+ private final DistributionAdvisee advisee;
+
+ ValidateProfileUpdateReplyProcessor(DistributionAdvisee advisee, Set members) {
+ super(advisee.getSystem(), members);
+ this.advisee = advisee;
+ }
+
+ /**
+ * Registers this processor as a membership listener and returns a set of the current members.
+ *
+ * @return a Set of the current members
+ * @since Geode 1.7
+ */
+ @Override
+ protected Set addListenerAndGetMembers() {
+ DistributionAdvisor da = this.advisee.getDistributionAdvisor();
+ if (da.useAdminMembersForDefault()) {
+ return getDistributionManager().addAllMembershipListenerAndGetAllIds(this);
+ } else {
+ return super.addListenerAndGetMembers();
+ }
+ }
+
+ /**
+ * Unregisters this processor as a membership listener
+ *
+ * @since Geode 1.7
+ */
+ @Override
+ protected void removeListener() {
+ DistributionAdvisor da = this.advisee.getDistributionAdvisor();
+ if (da.useAdminMembersForDefault()) {
+ getDistributionManager().removeAllMembershipListener(this);
+ } else {
+ super.removeListener();
+ }
+ }
+
+ /**
+ * If this processor being used by controller then return ALL members; otherwise defer to super.
+ *
+ * @return a Set of the current members
+ * @since Geode 1.7
+ */
+ @Override
+ protected Set getDistributionManagerIds() {
+ DistributionAdvisor da = this.advisee.getDistributionAdvisor();
+ if (da.useAdminMembersForDefault()) {
+ return getDistributionManager().getDistributionManagerIdsIncludingAdmin();
+ } else {
+ return super.getDistributionManagerIds();
+ }
+ }
+
+ public Collection getRecipients() {
+ return Arrays.asList(members);
+ }
+
+ }
+
+ public static class ValidateCacheProfileMessage extends HighPriorityDistributionMessage
+ implements MessageWithReply {
+
+ private String adviseePath;
+ private int processorId;
+ private Profile profile;
+
+ public ValidateCacheProfileMessage() {}
+
+ ValidateCacheProfileMessage(String adviseePath, Collection recipients, int processorId,
+ Profile profile) {
+ this.adviseePath = adviseePath;
+ this.setRecipients(recipients);
+ this.processorId = processorId;
+ this.profile = profile;
+ }
+
+ @Override
+ protected void process(ClusterDistributionManager dm) {
+ if (adviseePath != null) {
+ LocalRegion region = (LocalRegion) dm.getCache().getRegion(adviseePath);
+ String compatibility = checkCacheServerProfileCompatibility(region,
+ (CacheDistributionAdvisor.CacheProfile) this.profile);
+
+ ReplyException replyException = (compatibility != null
+ ? new ReplyException(new IncompatibleCacheServiceProfileException(compatibility))
+ : null);
+ ValidateCacheServerProfileReplyMessage.send(getSender(), this.processorId, replyException,
+ dm);
+ }
+ }
+
+ private String checkCacheServerProfileCompatibility(LocalRegion localRegion,
+ CacheDistributionAdvisor.CacheProfile cacheProfile) {
+ String cspResult = null;
+ Map<String, CacheServiceProfile> myProfiles = localRegion.getCacheServiceProfiles();
+ // Iterate and compare the remote CacheServiceProfiles to the local ones
+ for (CacheServiceProfile remoteProfile : cacheProfile.cacheServiceProfiles) {
+ CacheServiceProfile localProfile = myProfiles.get(remoteProfile.getId());
+ if (localProfile != null) {
+ cspResult = remoteProfile.checkCompatibility(localRegion.getFullPath(), localProfile);
+ }
+ if (cspResult != null) {
+ return cspResult;
+ }
+ }
+ return cspResult;
+ }
+
+ public int getDSFID() {
+ return VALIDATE_CACHE_PROFILE_MESSAGE;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buff = new StringBuilder();
+ buff.append(this.getClass().getName() + " (adviseePath=");
+ buff.append(this.adviseePath);
+ buff.append("; processorId=");
+ buff.append(this.processorId);
+ buff.append("; profile=");
+ buff.append(this.profile);
+ buff.append(")");
+ return buff.toString();
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.adviseePath = DataSerializer.readString(in);
+ this.processorId = in.readInt();
+ // set the processor ID to be able to send reply to sender in case of any
+ // unexpected exception during deserialization etc.
+ ReplyProcessor21.setMessageRPId(this.processorId);
+ this.profile = DataSerializer.readObject(in);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeString(this.adviseePath, out);
+ out.writeInt(this.processorId);
+ DataSerializer.writeObject(this.profile, out);
+ }
+ }
+
+ public static class ValidateCacheServerProfileReplyMessage extends ReplyMessage {
+
+ public static void send(InternalDistributedMember recipient, int processorId,
+ ReplyException exception, ClusterDistributionManager dm) {
+ ValidateCacheServerProfileReplyMessage m = new ValidateCacheServerProfileReplyMessage();
+
+ m.processorId = processorId;
+ if (exception != null) {
+ m.setException(exception);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Replying with exception: {}" + m, exception);
+ }
+ }
+ m.setRecipient(recipient);
+ dm.putOutgoing(m);
+ }
+
+ @Override
+ public int getDSFID() {
+ return VALIDATE_CACHE_SERVER_REPLY_MESSAGE;
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder buff = new StringBuilder();
+ buff.append(this.getClass().getName());
+ buff.append(" (processorId=");
+ buff.append(super.processorId);
+ buff.append(")");
+ return buff.toString();
+ }
+
+ @Override
+ public boolean getInlineProcess() {
+ return true;
+ }
+ }
+
+ public static class ValidateCacheServerProfileReplyException extends ReplyException {
+ public ValidateCacheServerProfileReplyException() {
+ super();
+ }
+
+ public ValidateCacheServerProfileReplyException(String msg) {
+ super(msg);
+ }
+
+ public ValidateCacheServerProfileReplyException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public ValidateCacheServerProfileReplyException(Throwable cause) {
+ super(cause);
+ }
+
+ @Override
+ public void handleCause() {
+ super.handleCause();
+ }
+ }
+}
diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
index 75d2d1d..dd4d529 100644
--- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
+++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt
@@ -806,3 +806,4 @@ org/apache/geode/security/AuthenticationFailedException,true,-820286647227908887
org/apache/geode/security/AuthenticationRequiredException,true,4675976651103154919
org/apache/geode/security/GemFireSecurityException,true,3814254578203076926,cause:java/lang/Throwable
org/apache/geode/security/NotAuthorizedException,true,419215768216387745,principal:java/security/Principal
+org/apache/geode/internal/cache/IncompatibleCacheServiceProfileException,false
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
index a61b609..3cdb008 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/standalone/DUnitLauncher.java
@@ -149,6 +149,15 @@ public class DUnitLauncher {
/**
* Launch DUnit. If the unit test was launched through the hydra framework, leave the test alone.
*/
+ public static void launchIfNeeded(int vmCount) {
+ NUM_VMS = vmCount;
+ launchIfNeeded();
+ }
+
+
+ /**
+ * Launch DUnit. If the unit test was launched through the hydra framework, leave the test alone.
+ */
public static void launchIfNeeded() {
if (System.getProperties().contains(VM_NUM_PARAM)) {
// we're a dunit child vm, do nothing.
@@ -167,14 +176,6 @@ public class DUnitLauncher {
}
/**
- * Launch DUnit. If the unit test was launched through the hydra framework, leave the test alone.
- */
- public static void launchIfNeeded(int vmCount) {
- NUM_VMS = vmCount;
- launchIfNeeded();
- }
-
- /**
* Test it see if the eclise dunit environment is launched.
*/
public static boolean isLaunched() {
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index fef1243..394012a 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -91,4 +91,4 @@ org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$3
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$4
org/apache/geode/cache/query/internal/types/TypeUtils$ComparisonStrategy$5
org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl$ClosedPoolConnectionList
-org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
+org/apache/geode/cache/query/internal/parse/ASTArithmeticOp
\ No newline at end of file
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfile.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfile.java
index 3d9b00d..c4a0b67 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfile.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfile.java
@@ -102,6 +102,7 @@ public class LuceneIndexCreationProfile implements CacheServiceProfile, Versione
@Override
public String checkCompatibility(String regionPath, CacheServiceProfile profile) {
String result = null;
+
LuceneIndexCreationProfile remoteProfile = (LuceneIndexCreationProfile) profile;
// Verify fields are the same
diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
index 211b508..b378328 100644
--- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
+++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneServiceImpl.java
@@ -19,7 +19,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -71,6 +70,7 @@ import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.BucketNotFoundException;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PrimaryBucketException;
@@ -140,7 +140,8 @@ public class LuceneServiceImpl implements InternalLuceneService {
public void beforeRegionDestroyed(Region region) {
List<LuceneIndex> indexes = getIndexes(region.getFullPath());
if (!indexes.isEmpty()) {
- String indexNames = indexes.stream().map(i -> i.getName()).collect(Collectors.joining(","));
+ String indexNames =
+ indexes.stream().map(LuceneIndex::getName).collect(Collectors.joining(","));
throw new IllegalStateException(
LocalizedStrings.LuceneServiceImpl_REGION_0_CANNOT_BE_DESTROYED
.toLocalizedString(region.getFullPath(), indexNames));
@@ -244,8 +245,15 @@ public class LuceneServiceImpl implements InternalLuceneService {
LuceneSerializer serializer) {
validateRegionAttributes(region.getAttributes());
- region.addCacheServiceProfile(new LuceneIndexCreationProfile(indexName, regionPath, fields,
- analyzer, fieldAnalyzers, serializer));
+ LuceneIndexCreationProfile profile = new LuceneIndexCreationProfile(indexName, regionPath,
+ fields, analyzer, fieldAnalyzers, serializer);
+ try {
+ region.addCacheServiceProfile(profile);
+ } catch (IncompatibleCacheServiceProfileException e) {
+ logger.error(e);
+ region.removeCacheServiceProfile(profile);
+ throw new IllegalStateException(e);
+ }
String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath);
region.updatePRConfigWithNewGatewaySender(aeqId);
@@ -266,9 +274,7 @@ public class LuceneServiceImpl implements InternalLuceneService {
PartitionedRepositoryManager repositoryManager =
(PartitionedRepositoryManager) luceneIndex.getRepositoryManager();
Set<Integer> primaryBucketIds = userRegion.getDataStore().getAllLocalPrimaryBucketIds();
- Iterator primaryBucketIterator = primaryBucketIds.iterator();
- while (primaryBucketIterator.hasNext()) {
- int primaryBucketId = (Integer) primaryBucketIterator.next();
+ for (Integer primaryBucketId : primaryBucketIds) {
try {
BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(primaryBucketId);
if (userBucket == null) {
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
index 0a09934..7f178ff 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationDUnitTest.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.cache.lucene;
-import static junitparams.JUnitParamsRunner.$;
import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_ANALYZERS;
import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_ANALYZERS_2;
import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_FIELDS;
@@ -380,7 +379,7 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest {
}
protected Object[] getXmlAndExceptionMessages() {
- return $(
+ return new Object[] {
new Object[] {"verifyDifferentFieldsFails", CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_FIELDS},
new Object[] {"verifyDifferentFieldAnalyzerSizesFails1",
CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_FIELDS_2},
@@ -398,7 +397,7 @@ public class LuceneIndexCreationDUnitTest extends LuceneDUnitTest {
new Object[] {"verifyDifferentIndexesFails1",
CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_1},
new Object[] {"verifyDifferentIndexesFails2",
- CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_2});
+ CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_INDEXES_2}};
}
@Test
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexClientDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexClientDUnitTest.java
index 0986390..7dabb68 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexClientDUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneQueriesReindexClientDUnitTest.java
@@ -17,16 +17,17 @@ package org.apache.geode.cache.lucene;
import static org.apache.geode.cache.lucene.test.LuceneTestUtilities.REGION_NAME;
import junitparams.JUnitParamsRunner;
+import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.test.dunit.SerializableCallableIF;
import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
import org.apache.geode.test.junit.categories.DistributedTest;
import org.apache.geode.test.junit.categories.LuceneTest;
@@ -39,7 +40,8 @@ import org.apache.geode.test.junit.categories.LuceneTest;
@RunWith(JUnitParamsRunner.class)
public class LuceneQueriesReindexClientDUnitTest extends LuceneQueriesReindexDUnitTest {
- private static final long serialVersionUID = 1L;
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
@Override
public void postSetUp() throws Exception {
@@ -58,8 +60,10 @@ public class LuceneQueriesReindexClientDUnitTest extends LuceneQueriesReindexDUn
ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
clientCacheFactory.addPoolServer("localhost", port1);
clientCacheFactory.addPoolServer("localhost", port2);
- ClientCache clientCache = getClientCache(clientCacheFactory);
- clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(REGION_NAME);
+
+ clientCacheRule.createClientCache(clientCacheFactory);
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+ .create(REGION_NAME);
});
}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java
new file mode 100644
index 0000000..17a3d6e
--- /dev/null
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileDUnitTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene.internal;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.awaitility.Awaitility;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.IncompatibleCacheServiceProfileException;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedTestRule;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.LuceneTest;
+
+@Category({DistributedTest.class, LuceneTest.class})
+public class LuceneIndexCreationProfileDUnitTest implements Serializable {
+
+ private static final String INDEX_NAME = "index";
+ private static final String REGION_NAME = "region";
+
+ @ClassRule
+ public static DistributedTestRule distributedTestRule = new DistributedTestRule(2);
+
+ @Rule
+ public CacheRule cacheRule = CacheRule.builder()
+ .addSystemProperty(DistributionConfig.GEMFIRE_PREFIX + "luceneReindex", "true")
+ .createCacheInAll().disconnectAfter().build();
+
+ @Test
+ public void testConcurrentIndexCreationWithDifferentProfiles() {
+ VM vm0 = VM.getVM(0);
+ VM vm1 = VM.getVM(1);
+
+ vm0.invoke(this::setupCacheAndRegion);
+ vm1.invoke(this::setupCacheAndRegion);
+
+ vm0.invoke(() -> {
+ Region<Object, Object> region = cacheRule.getCache().getRegion(REGION_NAME);
+ for (int i = 0; i < 113; i++) {
+ region.put(i, i);
+ }
+ });
+
+ AsyncInvocation<Boolean> asyncInvocation0 = vm0.invokeAsync(() -> {
+ PartitionedRegion region1 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ try {
+ region1.addCacheServiceProfile(getOneFieldLuceneIndexCreationProfile());
+ return false;
+ } catch (IncompatibleCacheServiceProfileException e) {
+ e.printStackTrace();
+ return true;
+ }
+ });
+
+ AsyncInvocation<Boolean> asyncInvocation1 = vm1.invokeAsync(() -> {
+ PartitionedRegion region2 = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME);
+ try {
+ region2.addCacheServiceProfile(getTwoFieldLuceneIndexCreationProfile());
+ return false;
+ } catch (IncompatibleCacheServiceProfileException e) {
+ e.printStackTrace();
+ return true;
+ }
+ });
+
+ Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+ .until(() -> asyncInvocation0.get() && asyncInvocation1.get());
+ }
+
+ private void setupCacheAndRegion() {
+ InternalCache cache = cacheRule.getCache();
+ cache.createRegionFactory(RegionShortcut.PARTITION).create(REGION_NAME);
+ }
+
+ private LuceneIndexCreationProfile getOneFieldLuceneIndexCreationProfile() {
+ return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME, new String[] {"field1"},
+ new StandardAnalyzer(), null, null);
+ }
+
+ private LuceneIndexCreationProfile getTwoFieldLuceneIndexCreationProfile() {
+ return new LuceneIndexCreationProfile(INDEX_NAME, REGION_NAME,
+ new String[] {"field1", "field2"}, new StandardAnalyzer(), null, null);
+ }
+}
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
index 8a88e72..7e76b93 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationProfileJUnitTest.java
@@ -61,11 +61,11 @@ public class LuceneIndexCreationProfileJUnitTest {
}
private Object[] getSerializationProfiles() {
- return $(new Object[] {getOneFieldLuceneIndexCreationProfile()},
+ return new Object[] {new Object[] {getOneFieldLuceneIndexCreationProfile()},
new Object[] {getTwoFieldLuceneIndexCreationProfile()},
new Object[] {getTwoAnalyzersLuceneIndexCreationProfile()},
new Object[] {getDummySerializerCreationProfile()},
- new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()});
+ new Object[] {getNullField1AnalyzerLuceneIndexCreationProfile()}};
}
@Test
diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
index e3c2001..9e7eab0 100644
--- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
+++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/test/LuceneTestUtilities.java
@@ -83,10 +83,8 @@ public class LuceneTestUtilities {
public static final String CANNOT_CREATE_LUCENE_INDEX_DIFFERENT_SERIALIZER =
"Cannot create Lucene index index on region /region with serializer DummyLuceneSerializer because another member defines the same index with different serializer HeterogeneousLuceneSerializer.";
- public static String Quarter1 = "Q1";
- public static String Quarter2 = "Q2";
- public static String Quarter3 = "Q3";
- public static String Quarter4 = "Q4";
+ private static String Quarter1 = "Q1";
+ private static String Quarter2 = "Q2";
public static void verifyResultOrder(Collection<EntryScore<String>> list,
EntryScore<String>... expectedEntries) {
--
To stop receiving notification emails like this one, please contact
udo@apache.org.