You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:45 UTC
[geode] 13/18: GEODE-6588: Cleanup InitialImageOperation
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5b544fffeb890476e892cf88bb3ef164267df1eb
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Fri May 21 08:53:47 2021 -0700
GEODE-6588: Cleanup InitialImageOperation
---
.../geode/internal/cache/MapClearGIIDUnitTest.java | 36 +-
.../apache/geode/internal/cache/CacheObserver.java | 34 +-
.../geode/internal/cache/CacheObserverAdapter.java | 93 +-
.../apache/geode/internal/cache/DiskStoreImpl.java | 4 +-
.../internal/cache/InitialImageOperation.java | 1179 ++++++++++----------
.../entries/AbstractOplogDiskRegionEntry.java | 4 +-
.../cache/entries/AbstractRegionEntry.java | 2 +-
.../geode/internal/cache/entries/DiskEntry.java | 168 ++-
.../query/cq/dunit/CqQueryUsingPoolDUnitTest.java | 2 +-
9 files changed, 698 insertions(+), 824 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/MapClearGIIDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/MapClearGIIDUnitTest.java
index 25ac439..055c01e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/MapClearGIIDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/MapClearGIIDUnitTest.java
@@ -53,10 +53,10 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
protected static boolean wasGIIInProgressDuringClear = false;
- static volatile Region region;
+ static volatile Region<String, String> region;
- public static boolean checkImageStateFlag() throws Exception {
- Region rgn = new MapClearGIIDUnitTest().getCache().getRegion(SEPARATOR + "map");
+ public static boolean checkImageStateFlag() {
+ Region<?, ?> rgn = new MapClearGIIDUnitTest().getCache().getRegion(SEPARATOR + "map");
if (rgn == null) {
fail("Map region not yet created");
}
@@ -77,12 +77,12 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
return true;
}
- public static void createRegionInVm0() throws Exception {
- AttributesFactory factory = new AttributesFactory();
+ public static void createRegionInVm0() {
+ AttributesFactory<String, String> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(true);
- RegionAttributes attr = factory.create();
+ RegionAttributes<String, String> attr = factory.create();
region = new MapClearGIIDUnitTest().getCache().createRegion("map", attr);
@@ -114,7 +114,7 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
// test methods
@Test
- public void testClearImageStateFlag() throws Throwable {
+ public void testClearImageStateFlag() {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
@@ -124,7 +124,7 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
@Override
public void run2() throws CacheException {
InitialImageOperation.slowImageProcessing = 10;
- InitialImageOperation.slowImageSleeps = 0;
+ slowImageSleeps.set(0);
Properties mprops = new Properties();
// mprops.setProperty(DistributionConfig.SystemConfigurationProperties.MCAST_PORT, "7777");
@@ -144,11 +144,11 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
getSystem(mprops);
// ds = DistributedSystem.connect(null);
getCache();
- AttributesFactory factory = new AttributesFactory();
+ AttributesFactory<String, String> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
factory.setConcurrencyChecksEnabled(true);
- RegionAttributes attr = factory.create();
+ RegionAttributes<String, String> attr = factory.create();
region = createRootRegion("map", attr);
// region = region.createSubregion("map",attr);
for (int i = 0; i < 10000; ++i) {
@@ -158,7 +158,7 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
});
LogWriterUtils.getLogWriter().info("Cache created in VM1 successfully");
try {
- AsyncInvocation asyncGII = vm0.invokeAsync(() -> MapClearGIIDUnitTest.createRegionInVm0());
+ AsyncInvocation<Object> asyncGII = vm0.invokeAsync(MapClearGIIDUnitTest::createRegionInVm0);
// wait until vm0's gii has done 20 slow image sleeps (10ms*20 = 200ms)
// before starting the clear
vm0.invoke(new CacheSerializableRunnable("wait for sleeps") {
@@ -167,7 +167,7 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
WaitCriterion ev = new WaitCriterion() {
@Override
public boolean done() {
- return slowImageSleeps >= 20;
+ return slowImageSleeps.get() >= 20;
}
@Override
@@ -179,14 +179,14 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
}
});
// now that the gii has received some entries do the clear
- vm1.invoke(() -> MapClearGIIDUnitTest.clearRegionInVm1());
+ vm1.invoke(MapClearGIIDUnitTest::clearRegionInVm1);
// wait for GII to complete
ThreadUtils.join(asyncGII, 30 * 1000);
if (asyncGII.exceptionOccurred()) {
Throwable t = asyncGII.getException();
Assert.fail("createRegionInVM0 failed", t);
}
- assertTrue(vm0.invoke(() -> MapClearGIIDUnitTest.checkImageStateFlag()));
+ assertTrue(vm0.invoke(MapClearGIIDUnitTest::checkImageStateFlag));
if (asyncGII.exceptionOccurred()) {
Assert.fail("asyncGII failed", asyncGII.getException());
@@ -200,7 +200,7 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
@Override
public void run() {
InitialImageOperation.slowImageProcessing = 0;
- InitialImageOperation.slowImageSleeps = 0;
+ slowImageSleeps.set(0);
}
});
@@ -210,12 +210,12 @@ public class MapClearGIIDUnitTest extends JUnit4CacheTestCase {
public static class CacheObserverImpl extends CacheObserverAdapter {
@Override
- public void afterRegionClear(RegionEvent event) {
+ public void afterRegionClear(RegionEvent<?, ?> event) {
LogWriterUtils.getLogWriter().info("**********Received clear event in VM0 . ");
- Region rgn = event.getRegion();
+ Region<?, ?> rgn = event.getRegion();
wasGIIInProgressDuringClear = ((LocalRegion) rgn).getImageState().wasRegionClearedDuringGII();
InitialImageOperation.slowImageProcessing = 0;
- InitialImageOperation.slowImageSleeps = 0;
+ slowImageSleeps.set(0);
LogWriterUtils.getLogWriter()
.info("wasGIIInProgressDuringClear when clear event was received= "
+ wasGIIInProgressDuringClear);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserver.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserver.java
index 1fa605b..9a08b13 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserver.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserver.java
@@ -20,7 +20,7 @@ import org.apache.geode.cache.RegionEvent;
/**
* This interface is used by testing/debugging code to be notified of query events. See the
- * documentation for class CacheObserverHolder for details. Also the callback is issed only if the
+ * documentation for class CacheObserverHolder for details. Also the callback is issued only if the
* boolean ISSUE_CALLBACKS_TO_CACHE_OBSERVER present in org.apache.geode.internal.cache.LocalRegion
* is made true
*
@@ -31,16 +31,16 @@ public interface CacheObserver {
* Called just after the region's Map is cleared & before Listener callback is issued. The call to
* this method is synchronous
*/
- void afterRegionClear(RegionEvent event);
+ void afterRegionClear(RegionEvent<?, ?> event);
/**
- * Called just beforeclearing the DiskRegion.
+ * Called just before clearing the DiskRegion.
*
*/
void beforeDiskClear();
/**
- * callback to test flushing efficieny. This callback is issued just before the flushing of the
+ * callback to test flushing efficiently. This callback is issued just before the flushing of the
* buffer that is before writing data to the Oplog, but after setting the logical offsets in the
* DiskIds contained in the PendingWrite Buffer
*
@@ -48,8 +48,8 @@ public interface CacheObserver {
void goingToFlush();
/**
- * called immediately after bytes are written to the disk Region. In case of asynch mode, it gets
- * called immedaitely after the asynch writer has written it to disk & just before releasing the
+ * called immediately after bytes are written to the disk Region. In case of async mode, it gets
+ * called immediately after the async writer has written it to disk & just before releasing the
* ByteBuffer to the pool.
*
*/
@@ -69,7 +69,7 @@ public interface CacheObserver {
/**
* Callback just after calculating the conflated byte buffer. This function can get called only in
- * the asynch mode where conflation can happen
+ * the async mode where conflation can happen
*
* @param origBB Original ByteBuffer object for the operation without considering conflation
* @param conflatedBB Resultant ByteBuffer object after conflation
@@ -78,13 +78,13 @@ public interface CacheObserver {
/**
* Callback just after setting oplog offset . The Oplog Offset will be set to non negative number
- * in case it is a synch mode operation as the offset for synch mode is available in the context
- * of thread performing the operation & to -1 for an asynch mode of operation as in case of asynch
- * mode of operation the actual offset is determined only when asynch writer performs the write
+ * in case it is a sync mode operation as the offset for sync mode is available in the context
+ * of thread performing the operation & to -1 for an async mode of operation as in case of async
+ * mode of operation the actual offset is determined only when async writer performs the write
* operation.
*
- * @param offset A non negative number for synch mode of operation indicating the start position
- * in the Oplog for the operation & -1 for asynch mode of operation
+ * @param offset A non negative number for sync mode of operation indicating the start position
+ * in the Oplog for the operation & -1 for async mode of operation
*
*/
void afterSettingOplogOffSet(long offset);
@@ -114,7 +114,7 @@ public interface CacheObserver {
/**
* Callback given immediately before any thread invokes ComplexDiskRegion.OplogCompactor's
- * stopCompactor method. This method normally gets invoked by clear/destory/close methods of the
+ * stopCompactor method. This method normally gets invoked by clear/destroy/close methods of the
* region.
*
*/
@@ -122,7 +122,7 @@ public interface CacheObserver {
/**
* Callback given immediately after any thread invokes ComplexDiskRegion.OplogCompactor's
- * stopCompactor method. This method normally gets invoked by clear/destory/close methods of the
+ * stopCompactor method. This method normally gets invoked by clear/destroy/close methods of the
* region.
*
*/
@@ -146,12 +146,6 @@ public interface CacheObserver {
void afterMarkingGIICompleted();
/**
- * Called after the Oplog.WriterThread (asynch writer thread) swaps the pendingFlushMap and
- * pendingWriteMap for flushing.
- */
- void afterSwitchingWriteAndFlushMaps();
-
- /**
* Invoked just before setting the LBHTree reference in the thread local.
*/
void beforeSettingDiskRef();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserverAdapter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserverAdapter.java
index 67cb3d6..aca73d3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserverAdapter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheObserverAdapter.java
@@ -21,122 +21,59 @@ import org.apache.geode.cache.RegionEvent;
/**
* This class provides 'do-nothing' implementations of all of the methods of interface
* CacheObserver. See the documentation for class CacheObserverHolder for details. Also the callback
- * is issed only if the boolean ISSUE_CALLBACKS_TO_CACHE_OBSERVER present in
+ * is issued only if the boolean ISSUE_CALLBACKS_TO_CACHE_OBSERVER present in
* org.apache.geode.internal.cache.LocalRegion is made true
*
*/
public class CacheObserverAdapter implements CacheObserver {
- /**
- * Called just after the region is cleared & before Listener callback is issued. The call to this
- * method is synchronous
- *
- * @param event RegionEvent object
- */
@Override
- public void afterRegionClear(RegionEvent event) {}
+ public void afterRegionClear(RegionEvent<?, ?> event) {}
@Override
- public void beforeDiskClear() {
- // TODO Auto-generated method stub
- }
+ public void beforeDiskClear() {}
@Override
- public void goingToFlush() {
- // TODO Auto-generated method stub
- }
-
- public void beforeWritingBytes() {}
+ public void goingToFlush() {}
@Override
public void afterWritingBytes() {}
@Override
- public void beforeGoingToCompact() {
- // TODO Auto-generated method stub
- }
+ public void beforeGoingToCompact() {}
@Override
- public void afterHavingCompacted() {
- // TODO Auto-generated method stub
- }
+ public void afterHavingCompacted() {}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.CacheObserver#afterConflation(java.nio.ByteBuffer,
- * java.nio.ByteBuffer)
- */
@Override
- public void afterConflation(ByteBuffer origBB, ByteBuffer conflatedBB) {
- // TODO Auto-generated method stub
- }
+ public void afterConflation(ByteBuffer origBB, ByteBuffer conflatedBB) {}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.CacheObserver#afterSettingOplogOffSet()
- */
@Override
- public void afterSettingOplogOffSet(long offset) {
- // TODO Auto-generated method stub
- }
+ public void afterSettingOplogOffSet(long offset) {}
@Override
- public void beforeSwitchingOplog() {
- // TODO Auto-generated method stub
- }
+ public void beforeSwitchingOplog() {}
@Override
- public void afterSwitchingOplog() {
- // TODO Auto-generated method stub
- }
+ public void afterSwitchingOplog() {}
@Override
- public void afterKrfCreated() {
-
- }
+ public void afterKrfCreated() {}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.CacheObserver#beforeStoppingCompacter()
- */
@Override
- public void beforeStoppingCompactor() {
- // TODO Auto-generated method stub
-
- }
+ public void beforeStoppingCompactor() {}
@Override
- public void afterStoppingCompactor() {
-
- }
+ public void afterStoppingCompactor() {}
@Override
public void afterSignallingCompactor() {}
@Override
- public void afterMarkingGIICompleted() {
- // TODO Auto-generated method stub
-
- }
+ public void afterMarkingGIICompleted() {}
@Override
- public void afterMarkingGIIStarted() {
- // TODO Auto-generated method stub
-
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.CacheObserver#afterSwitchingWriteAndFlushMaps()
- */
- @Override
- public void afterSwitchingWriteAndFlushMaps() {
-
- }
+ public void afterMarkingGIIStarted() {}
@Override
public void afterSettingDiskRef() {}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index a2299bc..7bbade7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -891,7 +891,7 @@ public class DiskStoreImpl implements DiskStore {
} else if (EntryBits.isTombstone(bb.getBits())) {
value = Token.TOMBSTONE;
} else {
- value = readRawValue(bytes, bb.getVersion(), null);
+ value = readRawValue(bytes);
}
return value;
}
@@ -913,7 +913,7 @@ public class DiskStoreImpl implements DiskStore {
} else if (EntryBits.isTombstone(bb.getBits())) {
value = Token.TOMBSTONE;
} else {
- value = readRawValue(bytes, bb.getVersion(), null);
+ value = readRawValue(bytes);
}
return value;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
index f434784..8227aeb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InitialImageOperation.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.AFTER_INITIAL_IMAGE;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import java.io.DataInput;
import java.io.DataOutput;
@@ -122,31 +123,28 @@ public class InitialImageOperation {
*/
@MutableForTesting
public static int CHUNK_SIZE_IN_BYTES =
- Integer.getInteger("GetInitialImage.chunkSize", 500 * 1024).intValue();
+ Integer.getInteger("GetInitialImage.chunkSize", 500 * 1024);
/**
* Allowed number of in flight GII chunks
*/
@MutableForTesting
public static int CHUNK_PERMITS =
- Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.CHUNK_PERMITS", 16)
- .intValue();
+ Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.CHUNK_PERMITS", 16);
/**
* maximum number of unfinished operations to be supported by delta GII
*/
@MutableForTesting
public static int MAXIMUM_UNFINISHED_OPERATIONS = Integer.getInteger(
- GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.MAXIMUM_UNFINISHED_OPERATIONS", 10000)
- .intValue();
+ GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.MAXIMUM_UNFINISHED_OPERATIONS", 10000);
/**
* Allowed number GIIs in parallel
*/
@MutableForTesting
public static final int MAX_PARALLEL_GIIS =
- Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.MAX_PARALLEL_GIIS", 5)
- .intValue();
+ Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.MAX_PARALLEL_GIIS", 5);
/**
* the region we are fetching
@@ -166,7 +164,7 @@ public class InitialImageOperation {
/**
* received region version holder for lost member, used by synchronizeWith only
*/
- protected RegionVersionHolder rcvd_holderToSync;
+ protected RegionVersionHolder<?> rcvd_holderToSync;
/**
* received GC versions from the GCC source
@@ -189,7 +187,7 @@ public class InitialImageOperation {
* for testing purposes
*/
@MutableForTesting
- public static volatile int slowImageSleeps = 0;
+ public static final AtomicInteger slowImageSleeps = new AtomicInteger();
/**
* for testing purposes
@@ -206,17 +204,12 @@ public class InitialImageOperation {
}
/** a flag for inhibiting the use of StateFlushOperation before gii */
- private static final ThreadLocal inhibitStateFlush = new ThreadLocal() {
- @Override
- protected Object initialValue() {
- return Boolean.valueOf(false);
- }
- };
-
+ private static final ThreadLocal<Boolean> inhibitStateFlush =
+ ThreadLocal.withInitial(() -> Boolean.FALSE);
/** inhibit use of StateFlush for the current thread */
public static void setInhibitStateFlush(boolean inhibitIt) {
- inhibitStateFlush.set(Boolean.valueOf(inhibitIt));
+ inhibitStateFlush.set(inhibitIt);
}
public enum GIIStatus {
@@ -237,11 +230,11 @@ public class InitialImageOperation {
}
private GIIStatus reportGIIStatus() {
- if (!this.gotImage) {
+ if (!gotImage) {
return GIIStatus.NO_GII;
} else {
// got image
- if (this.isDeltaGII) {
+ if (isDeltaGII) {
return GIIStatus.GOTIMAGE_BY_DELTAGII;
} else {
return GIIStatus.GOTIMAGE_BY_FULLGII;
@@ -259,7 +252,7 @@ public class InitialImageOperation {
* @throws org.apache.geode.cache.TimeoutException when it is unable to get a reply within the
* limit.
*/
- GIIStatus getFromOne(Set recipientSet, boolean targetReinitialized,
+ GIIStatus getFromOne(Set<InternalDistributedMember> recipientSet, boolean targetReinitialized,
CacheDistributionAdvisor.InitialImageAdvice advice, boolean recoveredFromDisk,
RegionVersionVector recoveredRVV) throws org.apache.geode.cache.TimeoutException {
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -269,13 +262,13 @@ public class InitialImageOperation {
* TODO (ashetkar): recipientSet may contain more than one member. Ensure only the gii-source
* member is vMotioned. The test hook may need to be placed at another point.
*/
- VMotionObserverHolder.getInstance().vMotionDuringGII(recipientSet, this.region);
+ VMotionObserverHolder.getInstance().vMotionDuringGII(recipientSet, region);
}
// Make sure that candidates are regarded in random order
- ArrayList recipients = new ArrayList(recipientSet);
+ ArrayList<InternalDistributedMember> recipients = new ArrayList<>(recipientSet);
- if (this.region.isUsedForSerialGatewaySenderQueue()) {
- AbstractGatewaySender sender = this.region.getSerialGatewaySender();
+ if (region.isUsedForSerialGatewaySenderQueue()) {
+ AbstractGatewaySender sender = region.getSerialGatewaySender();
if (sender != null) {
InternalDistributedMember primary = sender.getSenderAdvisor().advisePrimaryGatewaySender();
if (primary != null) {
@@ -288,13 +281,14 @@ public class InitialImageOperation {
Collections.shuffle(recipients);
}
}
- long giiStart = this.region.getCachePerfStats().startGetInitialImage();
+ long giiStart = region.getCachePerfStats().startGetInitialImage();
InternalDistributedMember provider = null;
- for (Iterator itr = recipients.iterator(); !this.gotImage && itr.hasNext();) {
+ for (Iterator<InternalDistributedMember> itr = recipients.iterator(); !gotImage
+ && itr.hasNext();) {
// if we got a partial image from the previous recipient, then clear it
- InternalDistributedMember recipient = (InternalDistributedMember) itr.next();
+ InternalDistributedMember recipient = itr.next();
provider = recipient;
// In case of HARegion, before getting the region snapshot(image) get the filters
@@ -307,7 +301,7 @@ public class InitialImageOperation {
try {
// HARegion r = (HARegion)region;
// if (!r.isPrimaryQueue()) {
- if (!this.requestFilterInfo(recipient)) {
+ if (!requestFilterInfo(recipient)) {
if (isDebugEnabled) {
logger.debug("Failed to receive interest and CQ information from {}", recipient);
}
@@ -324,11 +318,11 @@ public class InitialImageOperation {
}
}
- PersistenceAdvisor persistenceAdvisor = this.region.getPersistenceAdvisor();
+ PersistenceAdvisor persistenceAdvisor = region.getPersistenceAdvisor();
if (persistenceAdvisor != null) {
try {
persistenceAdvisor.updateMembershipView(recipient, targetReinitialized);
- persistenceAdvisor.setInitializing(this.region.getPersistentID());
+ persistenceAdvisor.setInitializing(region.getPersistentID());
} catch (ReplyException e) {
if (isDebugEnabled) {
logger.debug("Failed to get membership view", e);
@@ -337,14 +331,14 @@ public class InitialImageOperation {
}
}
final ClusterDistributionManager dm =
- (ClusterDistributionManager) this.region.getDistributionManager();
+ (ClusterDistributionManager) region.getDistributionManager();
final boolean allowDeltaGII = !FORCE_FULL_GII;
- Set keysOfUnfinishedOps = null;
+ Set<Object> keysOfUnfinishedOps = null;
RegionVersionVector received_rvv = null;
RegionVersionVector remote_rvv = null;
- if (this.region.getConcurrencyChecksEnabled()) {
+ if (region.getConcurrencyChecksEnabled()) {
if (internalBeforeRequestRVV != null
- && internalBeforeRequestRVV.getRegionName().equals(this.region.getName())) {
+ && internalBeforeRequestRVV.getRegionName().equals(region.getName())) {
internalBeforeRequestRVV.run();
}
// Request the RVV from the provider and discover any operations on this
@@ -364,7 +358,7 @@ public class InitialImageOperation {
remote_rvv = received_rvv.getCloneForTransmission();
keysOfUnfinishedOps = processReceivedRVV(remote_rvv, recoveredRVV);
if (internalAfterCalculatedUnfinishedOps != null
- && internalAfterCalculatedUnfinishedOps.getRegionName().equals(this.region.getName())) {
+ && internalAfterCalculatedUnfinishedOps.getRegionName().equals(region.getName())) {
internalAfterCalculatedUnfinishedOps.run();
}
if (keysOfUnfinishedOps == null) {
@@ -373,15 +367,15 @@ public class InitialImageOperation {
}
}
- Boolean inhibitFlush = (Boolean) inhibitStateFlush.get();
- if (!inhibitFlush && !this.region.doesNotDistribute()) {
+ boolean inhibitFlush = inhibitStateFlush.get();
+ if (!inhibitFlush && !region.doesNotDistribute()) {
if (region instanceof BucketRegionQueue) {
// get the corresponding userPRs and do state flush on all of them
// TODO we should be able to do this state flush with a single
// message, but that will require changing the messaging layer,
// which has implications for a rolling upgrade.
Collection<BucketRegion> userPRBuckets =
- ((BucketRegionQueue) (this.region)).getCorrespondingUserPRBuckets();
+ ((BucketRegionQueue) (region)).getCorrespondingUserPRBuckets();
if (isDebugEnabled) {
logger.debug("The parent buckets of this shadowPR region are {}", userPRBuckets);
}
@@ -392,7 +386,7 @@ public class InitialImageOperation {
}
final StateFlushOperation sf;
sf = new StateFlushOperation(parentBucket);
- final Set<InternalDistributedMember> r = new HashSet<InternalDistributedMember>();
+ final Set<InternalDistributedMember> r = new HashSet<>();
r.addAll(advice.replicates);
r.addAll(advice.preloaded);
r.addAll(advice.others);
@@ -417,8 +411,8 @@ public class InitialImageOperation {
}
}
final StateFlushOperation sf;
- sf = new StateFlushOperation(this.region);
- final Set<InternalDistributedMember> r = new HashSet<InternalDistributedMember>();
+ sf = new StateFlushOperation(region);
+ final Set<InternalDistributedMember> r = new HashSet<>();
r.addAll(advice.replicates);
r.addAll(advice.preloaded);
r.addAll(advice.others);
@@ -434,23 +428,23 @@ public class InitialImageOperation {
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
region.getCancelCriterion().checkCancelInProgress(ie);
- this.region.getCachePerfStats().endNoGIIDone(giiStart);
+ region.getCachePerfStats().endNoGIIDone(giiStart);
return GIIStatus.NO_GII;
}
}
RequestImageMessage m = new RequestImageMessage();
- m.regionPath = this.region.getFullPath();
+ m.regionPath = region.getFullPath();
m.keysOnly = false;
m.targetReinitialized = targetReinitialized;
m.setRecipient(recipient);
- if (this.region.getConcurrencyChecksEnabled()) {
+ if (region.getConcurrencyChecksEnabled()) {
if (allowDeltaGII && recoveredFromDisk) {
- if (!this.region.getDiskRegion().getRVVTrusted()) {
+ if (!region.getDiskRegion().getRVVTrusted()) {
if (isDebugEnabled) {
logger.debug("Region {} recovered without EndGII flag, do full GII",
- this.region.getFullPath());
+ region.getFullPath());
}
m.versionVector = null;
} else if (keysOfUnfinishedOps != null
@@ -458,7 +452,7 @@ public class InitialImageOperation {
if (isDebugEnabled) {
logger.debug(
"Region {} has {} unfinished operations, which exceeded threshold {}, do full GII instead",
- this.region.getFullPath(), keysOfUnfinishedOps.size(),
+ region.getFullPath(), keysOfUnfinishedOps.size(),
MAXIMUM_UNFINISHED_OPERATIONS);
}
m.versionVector = null;
@@ -468,7 +462,7 @@ public class InitialImageOperation {
if (isDebugEnabled) {
logger.debug(
"Region {}: after filled versions of unfinished keys, recovered rvv is still newer than remote rvv:{}. recovered rvv is {}. Do full GII",
- this.region.getFullPath(), remote_rvv, recoveredRVV);
+ region.getFullPath(), remote_rvv, recoveredRVV);
}
} else {
m.versionVector = recoveredRVV;
@@ -476,7 +470,7 @@ public class InitialImageOperation {
if (isDebugEnabled) {
logger.debug(
"Region {} recovered with EndGII flag, rvv is {}. recovered rvv is {}. Do delta GII",
- this.region.getFullPath(), m.versionVector, recoveredRVV);
+ region.getFullPath(), m.versionVector, recoveredRVV);
}
}
}
@@ -485,18 +479,18 @@ public class InitialImageOperation {
if (received_rvv != null) {
// pack the original RVV, then save the received one
if (internalBeforeSavedReceivedRVV != null
- && internalBeforeSavedReceivedRVV.getRegionName().equals(this.region.getName())) {
+ && internalBeforeSavedReceivedRVV.getRegionName().equals(region.getName())) {
internalBeforeSavedReceivedRVV.run();
}
saveReceivedRVV(received_rvv);
if (internalAfterSavedReceivedRVV != null
- && internalAfterSavedReceivedRVV.getRegionName().equals(this.region.getName())) {
+ && internalAfterSavedReceivedRVV.getRegionName().equals(region.getName())) {
internalAfterSavedReceivedRVV.run();
}
}
}
- ImageProcessor processor = new ImageProcessor(this.region.getSystem(), recipient);
+ ImageProcessor processor = new ImageProcessor(region.getSystem(), recipient);
dm.acquireGIIPermitUninterruptibly();
try {
m.processorId = processor.getProcessorId();
@@ -508,24 +502,24 @@ public class InitialImageOperation {
// do not remove the following log statement
logger.info("Region {} requesting initial image from {}",
- new Object[] {this.region.getName(), recipient});
+ new Object[] {region.getName(), recipient});
dm.putOutgoing(m);
- this.region.cache.getCancelCriterion().checkCancelInProgress(null);
+ region.cache.getCancelCriterion().checkCancelInProgress(null);
if (internalAfterSentRequestImage != null
- && internalAfterSentRequestImage.getRegionName().equals(this.region.getName())) {
+ && internalAfterSentRequestImage.getRegionName().equals(region.getName())) {
internalAfterSentRequestImage.run();
}
try {
processor.waitForRepliesUninterruptibly();
// review unfinished keys and remove untouched entries
- if (this.region.getDataPolicy().withPersistence() && keysOfUnfinishedOps != null
+ if (region.getDataPolicy().withPersistence() && keysOfUnfinishedOps != null
&& !keysOfUnfinishedOps.isEmpty()) {
- final DiskRegion dr = this.region.getDiskRegion();
+ final DiskRegion dr = region.getDiskRegion();
assert dr != null;
for (Object key : keysOfUnfinishedOps) {
- RegionEntry re = this.entries.getEntry(key);
+ RegionEntry re = entries.getEntry(key);
if (re == null) {
continue;
}
@@ -537,7 +531,7 @@ public class InitialImageOperation {
synchronized (de) {
DiskId id = de.getDiskId();
if (id != null && EntryBits.isRecoveredFromDisk(id.getUserBits())) {
- this.region.destroyRecoveredEntry(key);
+ region.destroyRecoveredEntry(key);
if (isDebugEnabled) {
logger.debug("Deleted unfinished keys:key={}", key);
}
@@ -567,35 +561,35 @@ public class InitialImageOperation {
// Make sure we have applied the tombstone GC as seen on the GII
// source
- if (this.gcVersions != null) {
- region.getGemFireCache().getTombstoneService().gcTombstones(region, this.gcVersions,
+ if (gcVersions != null) {
+ region.getGemFireCache().getTombstoneService().gcTombstones(region, gcVersions,
false);
}
- if (this.gotImage) {
- RegionLogger.logGII(this.region.getFullPath(), recipient,
+ if (gotImage) {
+ RegionLogger.logGII(region.getFullPath(), recipient,
region.getDistributionManager().getDistributionManagerId(),
region.getPersistentID());
}
- if (this.gotImage) {
+ if (gotImage) {
// TODO add localizedString
- logger.info("{} is done getting image from {}. isDeltaGII is {}", this.region.getName(),
- recipient, this.isDeltaGII);
+ logger.info("{} is done getting image from {}. isDeltaGII is {}", region.getName(),
+ recipient, isDeltaGII);
} else {
// TODO add localizedString
- logger.info("{} failed to get image from {}", this.region.getName(), recipient);
+ logger.info("{} failed to get image from {}", region.getName(), recipient);
}
- if (this.region.getDataPolicy().withPersistence()) {
+ if (region.getDataPolicy().withPersistence()) {
logger.info("Region {} initialized persistent id: {} with data from {}.",
- new Object[] {this.region.getName(), this.region.getPersistentID(),
+ new Object[] {region.getName(), region.getPersistentID(),
recipient});
}
// bug 39050 - no partial images after GII when network partition
// detection is enabled
- if (!this.gotImage) {
- this.region.cleanUpAfterFailedGII(recoveredFromDisk);
+ if (!gotImage) {
+ region.cleanUpAfterFailedGII(recoveredFromDisk);
} else if (received_rvv != null) {
- checkForUnrecordedOperations(recipient);
+ checkForUnrecordedOperations();
}
}
} finally {
@@ -604,14 +598,14 @@ public class InitialImageOperation {
}
} // for
- if (this.gotImage) {
- this.region.recordEventStateFromImageProvider(provider);
- this.region.getCachePerfStats().endGetInitialImage(giiStart);
- if (this.isDeltaGII) {
- this.region.getCachePerfStats().incDeltaGIICompleted();
+ if (gotImage) {
+ region.recordEventStateFromImageProvider(provider);
+ region.getCachePerfStats().endGetInitialImage(giiStart);
+ if (isDeltaGII) {
+ region.getCachePerfStats().incDeltaGIICompleted();
}
} else {
- this.region.getCachePerfStats().endNoGIIDone(giiStart);
+ region.getCachePerfStats().endNoGIIDone(giiStart);
}
return reportGIIStatus();
}
@@ -624,22 +618,22 @@ public class InitialImageOperation {
public void synchronizeWith(InternalDistributedMember target, VersionSource lostMemberVersionID,
InternalDistributedMember lostMember) {
final ClusterDistributionManager dm =
- (ClusterDistributionManager) this.region.getDistributionManager();
+ (ClusterDistributionManager) region.getDistributionManager();
- this.isSynchronizing = true;
+ isSynchronizing = true;
RequestImageMessage m = new RequestImageMessage();
- m.regionPath = this.region.getFullPath();
+ m.regionPath = region.getFullPath();
m.keysOnly = false;
if (lostMemberVersionID != null) {
- m.versionVector = this.region.getVersionVector().getCloneForTransmission(lostMemberVersionID);
+ m.versionVector = region.getVersionVector().getCloneForTransmission(lostMemberVersionID);
m.lostMemberVersionID = lostMemberVersionID;
m.lostMemberID = lostMember;
} else {
- m.versionVector = this.region.getVersionVector().getCloneForTransmission();
+ m.versionVector = region.getVersionVector().getCloneForTransmission();
}
m.setRecipient(target);
- ImageProcessor processor = new ImageProcessor(this.region.getSystem(), target);
+ ImageProcessor processor = new ImageProcessor(region.getSystem(), target);
dm.acquireGIIPermitUninterruptibly();
try {
m.processorId = processor.getProcessorId();
@@ -649,13 +643,13 @@ public class InitialImageOperation {
m.severeAlertEnabled = true;
}
- logger.info("Region {} is requesting synchronization with {} for {}", this.region.getName(),
+ logger.info("Region {} is requesting synchronization with {} for {}", region.getName(),
target, lostMember);
- long hisVersion = this.region.getVersionVector().getVersionForMember(lostMemberVersionID);
+ long hisVersion = region.getVersionVector().getVersionForMember(lostMemberVersionID);
dm.putOutgoing(m);
- this.region.cache.getCancelCriterion().checkCancelInProgress(null);
+ region.cache.getCancelCriterion().checkCancelInProgress(null);
try {
processor.waitForRepliesUninterruptibly();
ImageState imgState = region.getImageState();
@@ -673,30 +667,30 @@ public class InitialImageOperation {
e.handleCause();
}
} finally {
- if (this.gotImage) {
- this.region.getVersionVector().removeExceptionsFor(target, hisVersion);
+ if (gotImage) {
+ region.getVersionVector().removeExceptionsFor(target, hisVersion);
RegionVersionHolder holder =
- this.region.getVersionVector().getHolderForMember(lostMemberVersionID);
- if (this.rcvd_holderToSync != null
- && this.rcvd_holderToSync.isNewerThanOrCanFillExceptionsFor(holder)) {
+ region.getVersionVector().getHolderForMember(lostMemberVersionID);
+ if (rcvd_holderToSync != null
+ && rcvd_holderToSync.isNewerThanOrCanFillExceptionsFor(holder)) {
logger.info(
"synchronizeWith detected mismatch region version holder for lost member {}. Old is {}, new is {}",
- lostMemberVersionID, holder, this.rcvd_holderToSync);
- this.region.getVersionVector().initializeVersionHolder(lostMemberVersionID,
- this.rcvd_holderToSync);
+ lostMemberVersionID, holder, rcvd_holderToSync);
+ region.getVersionVector().initializeVersionHolder(lostMemberVersionID,
+ rcvd_holderToSync);
}
- RegionLogger.logGII(this.region.getFullPath(), target,
+ RegionLogger.logGII(region.getFullPath(), target,
region.getDistributionManager().getDistributionManagerId(), region.getPersistentID());
}
- if (this.gotImage) {
+ if (gotImage) {
if (logger.isDebugEnabled()) {
- logger.debug("{} is done synchronizing with {}", this.region.getName(), target);
+ logger.debug("{} is done synchronizing with {}", region.getName(), target);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"{} received no synchronization data from {} which could mean that we are already synchronized",
- this.region.getName(), target);
+ region.getName(), target);
}
}
}
@@ -706,7 +700,7 @@ public class InitialImageOperation {
}
}
- private void checkForUnrecordedOperations(InternalDistributedMember imageProvider) {
+ private void checkForUnrecordedOperations() {
final boolean isTraceEnabled = logger.isTraceEnabled();
// bug #48962 - a change could have been received from a member
@@ -714,9 +708,9 @@ public class InitialImageOperation {
// image provider is creating the region in parallel with this member.
// We have to check all of the received versions for members that
// left during GII to see if the RVV contains them.
- RegionVersionVector rvv = this.region.getVersionVector();
- if (this.region.getConcurrencyChecksEnabled() && rvv != null) {
- ImageState state = this.region.getImageState();
+ RegionVersionVector rvv = region.getVersionVector();
+ if (region.getConcurrencyChecksEnabled() && rvv != null) {
+ ImageState state = region.getImageState();
if (state.hasLeftMembers()) {
Set<VersionSource> needsSync = null;
Set<VersionSource> leftMembers = state.getLeftMembers();
@@ -729,7 +723,7 @@ public class InitialImageOperation {
if (leftMembers.contains(tag.getMemberID())
&& !rvv.contains(tag.getMemberID(), tag.getRegionVersion())) {
if (needsSync == null) {
- needsSync = new HashSet<VersionSource>();
+ needsSync = new HashSet<>();
}
needsSync.add(tag.getMemberID());
rvv.recordVersion(tag.getMemberID(), tag.getRegionVersion());
@@ -739,16 +733,17 @@ public class InitialImageOperation {
// we need to tell the image provider to request syncs on the given
// member(s) These will either be DistributedMember IDs or DiskStore IDs
RequestSyncMessage msg = new RequestSyncMessage();
- msg.regionPath = this.region.getFullPath();
+ msg.regionPath = region.getFullPath();
msg.lostVersionSources = needsSync.toArray(new VersionSource[0]);
- Set recipients = this.region.getCacheDistributionAdvisor().adviseReplicates();
+ Set<InternalDistributedMember> recipients =
+ region.getCacheDistributionAdvisor().adviseReplicates();
if (!recipients.isEmpty()) {
msg.setRecipients(recipients);
if (logger.isDebugEnabled()) {
logger.debug("Local versions were found that the image provider has not seen for {}",
needsSync);
}
- this.region.getDistributionManager().putOutgoing(msg);
+ region.getDistributionManager().putOutgoing(msg);
}
}
}
@@ -772,14 +767,13 @@ public class InitialImageOperation {
*
* @return whether the operation succeeded in transferring anything
*/
- private boolean requestFilterInfo(InternalDistributedMember recipient) {
- // Request for Filter Information before getting the
- // HARegion snapshot.
- final DistributionManager dm = this.region.getDistributionManager();
- RequestFilterInfoMessage filterInfoMsg = new RequestFilterInfoMessage();
- filterInfoMsg.regionPath = this.region.getFullPath();
+ private boolean requestFilterInfo(final InternalDistributedMember recipient) {
+ // Request for Filter Information before getting the HARegion snapshot.
+ final DistributionManager dm = region.getDistributionManager();
+ final RequestFilterInfoMessage filterInfoMsg = new RequestFilterInfoMessage();
+ filterInfoMsg.regionPath = region.getFullPath();
filterInfoMsg.setRecipient(recipient);
- FilterInfoProcessor processor = new FilterInfoProcessor(this.region.getSystem(), recipient);
+ final FilterInfoProcessor processor = new FilterInfoProcessor(region.getSystem(), recipient);
filterInfoMsg.processorId = processor.getProcessorId();
dm.putOutgoing(filterInfoMsg);
@@ -787,7 +781,7 @@ public class InitialImageOperation {
processor.waitForRepliesUninterruptibly();
return processor.filtersReceived;
} catch (InternalGemFireException ex) {
- Throwable cause = ex.getCause();
+ final Throwable cause = ex.getCause();
if (cause instanceof org.apache.geode.cache.TimeoutException) {
throw (org.apache.geode.cache.TimeoutException) cause;
}
@@ -807,15 +801,15 @@ public class InitialImageOperation {
* @param entries entries to add to the region
* @return false if should abort (region was destroyed or cache was closed)
*/
- boolean processChunk(List entries, InternalDistributedMember sender)
+ boolean processChunk(List<Entry> entries, InternalDistributedMember sender)
throws IOException, ClassNotFoundException {
final boolean isDebugEnabled = logger.isDebugEnabled();
final boolean isTraceEnabled = logger.isTraceEnabled();
// one volatile read of test flag
int slow = slowImageProcessing;
- final CachePerfStats stats = this.region.getCachePerfStats();
- ImageState imgState = this.region.getImageState();
+ final CachePerfStats stats = region.getCachePerfStats();
+ ImageState imgState = region.getImageState();
// Asif : Can the image state be null here. Don't think so
// Assert.assertTrue(imgState != null, "processChunk :ImageState should not have been null ");
// Asif: Set the Htree Reference in Thread Local before the iteration begins so as
@@ -824,22 +818,22 @@ public class InitialImageOperation {
// only once during GII life cycle & so it does not matter if the HTree ref changes after the
// clear
// whenever a conflict is detected in DiskRegion it is Ok to abort the operation
- final DiskRegion diskRegion = this.region.getDiskRegion();
+ final DiskRegion diskRegion = region.getDiskRegion();
if (diskRegion != null) {
diskRegion.setClearCountReference();
}
try {
int entryCount = entries.size();
- Set keys = null;
+ Set<Object> keys = null;
if (entryCount <= 1000 && isDebugEnabled) {
- keys = new HashSet();
+ keys = new HashSet<>();
}
List<Entry> entriesToSynchronize = new ArrayList<>();
for (int i = 0; i < entryCount; i++) {
// stream is null-terminated
if (internalDuringApplyDelta != null && !internalDuringApplyDelta.isRunning
- && internalDuringApplyDelta.getRegionName().equals(this.region.getName())) {
+ && internalDuringApplyDelta.getRegionName().equals(region.getName())) {
internalDuringApplyDelta.run();
}
if (slow > 0) {
@@ -850,10 +844,10 @@ public class InitialImageOperation {
try {
if (isDebugEnabled) {
logger.debug("processChunk: Sleeping for {} ms for rgn {}", slow,
- this.region.getFullPath());
+ region.getFullPath());
}
Thread.sleep(slow);
- slowImageSleeps++;
+ slowImageSleeps.getAndIncrement();
} catch (InterruptedException e) {
interrupted = true;
region.getCancelCriterion().checkCancelInProgress(e);
@@ -865,18 +859,18 @@ public class InitialImageOperation {
}
}
try {
- if (this.region.isDestroyed() || imgState.getClearRegionFlag()) {
+ if (region.isDestroyed() || imgState.getClearRegionFlag()) {
return false;
}
} catch (CancelException e) {
return false;
}
- Entry entry = (Entry) entries.get(i);
+ Entry entry = entries.get(i);
stats.incGetInitialImageKeysReceived();
- final long lastModified = entry.getLastModified(this.region.getDistributionManager());
+ final long lastModified = entry.getLastModified();
Object tmpValue = entry.value;
@@ -893,7 +887,7 @@ public class InitialImageOperation {
}
// re will be null if the gii chunk gives us a create
if (regionEntry != null) {
- synchronized (regionEntry) { // fixes bug 41409
+ synchronized (regionEntry) {
if (diskRegion.testIsRecoveredAndClear(regionEntry)) {
wasRecovered = true;
if (tmpValue == null) {
@@ -920,17 +914,16 @@ public class InitialImageOperation {
tag.replaceNullIDs(sender);
}
boolean record;
- if (this.region.getVersionVector() != null && tag != null) {
- this.region.getVersionVector().recordVersion(tag.getMemberID(), tag);
+ if (region.getVersionVector() != null && tag != null) {
+ region.getVersionVector().recordVersion(tag.getMemberID(), tag);
record = true;
} else {
- // bug #50992
record = (tmpValue != Token.TOMBSTONE);
}
if (record) {
this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered,
- true, tag, sender, this.isSynchronizing);
- if (this.isSynchronizing) {
+ true, tag, sender, isSynchronizing);
+ if (isSynchronizing) {
entriesToSynchronize.add(entry);
}
}
@@ -940,7 +933,6 @@ public class InitialImageOperation {
didIIP = true;
}
}
- // fix for 41814, java level deadlock
this.entries.lruUpdateCallback();
}
}
@@ -949,7 +941,7 @@ public class InitialImageOperation {
if (tag == null) {
keys.add(entry.key);
} else {
- keys.add(String.valueOf(entry.key) + ",v=" + tag);
+ keys.add(entry.key + ",v=" + tag);
}
}
if (!didIIP) {
@@ -969,12 +961,12 @@ public class InitialImageOperation {
"processChunk:initialImagePut:key={},lastModified={},tmpValue={},wasRecovered={},tag={}",
entry.key, lastModified, tmpValue, wasRecovered, tag);
}
- if (this.region.getVersionVector() != null && tag != null) {
- this.region.getVersionVector().recordVersion(tag.getMemberID(), tag);
+ if (region.getVersionVector() != null && tag != null) {
+ region.getVersionVector().recordVersion(tag.getMemberID(), tag);
}
this.entries.initialImagePut(entry.key, lastModified, tmpValue, wasRecovered, false,
- tag, sender, this.isSynchronizing);
- if (this.isSynchronizing) {
+ tag, sender, isSynchronizing);
+ if (isSynchronizing) {
entriesToSynchronize.add(entry);
}
} catch (RegionDestroyedException | CancelException e) {
@@ -982,7 +974,7 @@ public class InitialImageOperation {
}
}
}
- if (this.isSynchronizing && !entriesToSynchronize.isEmpty()) {
+ if (isSynchronizing && !entriesToSynchronize.isEmpty()) {
LocalRegion owner = ((AbstractRegionMap) this.entries)._getOwner();
LocalRegion region = owner instanceof BucketRegion ? owner.getPartitionedRegion() : owner;
owner.getCache().invokeRegionEntrySynchronizationListenersAfterSynchronization(sender,
@@ -994,11 +986,11 @@ public class InitialImageOperation {
}
}
if (internalBeforeCleanExpiredTombstones != null
- && internalBeforeCleanExpiredTombstones.getRegionName().equals(this.region.getName())) {
+ && internalBeforeCleanExpiredTombstones.getRegionName().equals(region.getName())) {
internalBeforeCleanExpiredTombstones.run();
}
if (internalAfterSavedRVVEnd != null
- && internalAfterSavedRVVEnd.getRegionName().equals(this.region.getName())) {
+ && internalAfterSavedRVVEnd.getRegionName().equals(region.getName())) {
internalAfterSavedRVVEnd.run();
}
return true;
@@ -1015,15 +1007,15 @@ public class InitialImageOperation {
// RequestRVVMessage is to send rvv of gii provider for both persistent and non-persistent
// region
RequestRVVMessage rrm = new RequestRVVMessage();
- rrm.regionPath = this.region.getFullPath();
+ rrm.regionPath = region.getFullPath();
rrm.targetReinitialized = targetReinitialized;
rrm.setRecipient(recipient);
- RequestRVVProcessor rvv_processor = new RequestRVVProcessor(this.region.getSystem(), recipient);
+ RequestRVVProcessor rvv_processor = new RequestRVVProcessor(region.getSystem(), recipient);
rrm.processorId = rvv_processor.getProcessorId();
dm.putOutgoing(rrm);
if (internalAfterRequestRVV != null
- && internalAfterRequestRVV.getRegionName().equals(this.region.getName())) {
+ && internalAfterRequestRVV.getRegionName().equals(region.getName())) {
internalAfterRequestRVV.run();
}
@@ -1051,21 +1043,22 @@ public class InitialImageOperation {
* @param localRVV RVV recovered from disk
* @return set for keys of unfinished operations.
*/
- protected Set processReceivedRVV(RegionVersionVector remoteRVV, RegionVersionVector localRVV) {
+ protected Set<Object> processReceivedRVV(RegionVersionVector remoteRVV,
+ RegionVersionVector localRVV) {
if (remoteRVV == null) {
return null;
}
// calculate keys for unfinished ops
- HashSet keys = new HashSet();
- if (this.region.getDataPolicy().withPersistence()
+ HashSet<Object> keys = new HashSet<>();
+ if (region.getDataPolicy().withPersistence()
&& localRVV.isNewerThanOrCanFillExceptionsFor(remoteRVV)) {
// only search for unfinished keys when localRVV has something newer
// and the region is persistent region
- Iterator it = this.region.getBestIterator(false);
+ Iterator<RegionEntry> it = region.getBestIterator(false);
int count = 0;
- VersionSource<?> myId = this.region.getVersionMember();
+ VersionSource<?> myId = region.getVersionMember();
while (it.hasNext()) {
- RegionEntry mapEntry = (RegionEntry) it.next();
+ RegionEntry mapEntry = it.next();
VersionStamp<?> stamp = mapEntry.getVersionStamp();
VersionSource<?> id = stamp.getMemberID();
if (id == null) {
@@ -1144,7 +1137,7 @@ public class InitialImageOperation {
* Keys are the senders (@link {@link InternalDistributedMember}), and values are instances of
* {@link Status}.
*/
- private final Map statusMap = new HashMap();
+ private final Map<InternalDistributedMember, Status> statusMap = new HashMap<>();
/**
* number of outstanding executors currently in-flight on this request
@@ -1201,22 +1194,22 @@ public class InitialImageOperation {
/** Return true if this is the very last reply for this member */
protected synchronized boolean trackMessage(ImageReplyMessage m) {
- if (this.msgsProcessed == null) {
- this.msgsProcessed = new int[m.numSeries];
+ if (msgsProcessed == null) {
+ msgsProcessed = new int[m.numSeries];
}
- if (this.numInSeries == null) {
- this.numInSeries = new int[m.numSeries];
+ if (numInSeries == null) {
+ numInSeries = new int[m.numSeries];
}
- this.msgsProcessed[m.seriesNum]++;
+ msgsProcessed[m.seriesNum]++;
if (m.lastInSeries) {
- this.numInSeries[m.seriesNum] = m.msgNum + 1;
+ numInSeries[m.seriesNum] = m.msgNum + 1;
}
if (logger.isDebugEnabled()) {
logger.debug(
"InitialImage Message Tracking Status: Processor id: {}; Sender: {}; Messages Processed: {}; NumInSeries:{}",
- getProcessorId(), m.getSender(), arrayToString(this.msgsProcessed),
- arrayToString(this.numInSeries));
+ getProcessorId(), m.getSender(), arrayToString(msgsProcessed),
+ arrayToString(numInSeries));
}
// this.numInSeries starts out as zeros and gets initialized
@@ -1224,8 +1217,8 @@ public class InitialImageOperation {
// Since we increment msgsProcessed, the following condition
// cannot be true until sometime after we've received the
// lastInSeries for a given series.
- this.allChunksReceived = Arrays.equals(this.msgsProcessed, this.numInSeries);
- return (this.allChunksReceived);
+ allChunksReceived = Arrays.equals(msgsProcessed, numInSeries);
+ return (allChunksReceived);
}
}
@@ -1235,15 +1228,11 @@ public class InitialImageOperation {
super(system, member);
}
- public ImageProcessor(InternalDistributedSystem system, Set members) {
- super(system, members);
- }
-
@Override
protected boolean processTimeout() {
// if chunk received then no need to process timeout
- boolean ret = this.receivedChunk;
- this.receivedChunk = false;
+ boolean ret = receivedChunk;
+ receivedChunk = false;
return !ret;
}
@@ -1261,7 +1250,7 @@ public class InitialImageOperation {
return;
}
Status status = getStatus(msg.getSender());
- this.msgsBeingProcessed.incrementAndGet();
+ msgsBeingProcessed.incrementAndGet();
EntryLogger.setSource(msg.getSender(), "gii");
try {
boolean isDone;
@@ -1273,42 +1262,43 @@ public class InitialImageOperation {
isDone = true;
ImageReplyMessage m = (ImageReplyMessage) msg;
- boolean isLast = true; // is last message for this member?
if (m.entries != null) {
try {
if (internalAfterReceivedImageReply != null
&& internalAfterReceivedImageReply.getRegionName().equals(region.getName())) {
internalAfterReceivedImageReply.run();
}
- // bug 37461: don't allow abort flag to be reset
- boolean isAborted = this.abort; // volatile fetch
+ // don't allow abort flag to be reset
+ boolean isAborted = abort; // volatile fetch
if (!isAborted) {
isAborted = !processChunk(m.entries, m.getSender());
if (isAborted) {
- this.abort = true; // volatile store
+ abort = true; // volatile store
} else {
- this.receivedChunk = true;
+ receivedChunk = true;
}
}
- isLast = trackMessage(m); // interpret series/msgNum
+ // interpret series/msgNum
+ // is last message for this member?
+ boolean isLast = trackMessage(m);
isDone = isAborted || isLast;
// @todo ericz send an abort message to image provider if
// !doContinue (region was destroyed or cache closed)
if (isDone) {
- if (this.abort) {
+ if (abort) {
// Bug 48578: In deltaGII, if abort in processChunk, we should mark trustRVV=false
// to force full GII next time.
- InitialImageOperation.this.gotImage = false;
+ gotImage = false;
logger.debug(
"processChunk is aborted for region {}, rvv is {}. Do full gii next time.",
- InitialImageOperation.this.region.getFullPath(),
- InitialImageOperation.this.region.getVersionVector());
+ region.getFullPath(),
+ region.getVersionVector());
} else {
- InitialImageOperation.this.gotImage = true;
+ gotImage = true;
}
if (m.isDeltaGII) {
- InitialImageOperation.this.isDeltaGII = true;
+ isDeltaGII = true;
}
}
} catch (DiskAccessException dae) {
@@ -1337,16 +1327,16 @@ public class InitialImageOperation {
// if a null entries was received (no image was found), then
// we're done with that member
if (isDone && m.isDeltaGII) {
- InitialImageOperation.this.gotImage = true;
- InitialImageOperation.this.isDeltaGII = true;
+ gotImage = true;
+ isDeltaGII = true;
}
}
if (m.holderToSend != null) {
- InitialImageOperation.this.rcvd_holderToSync = m.holderToSend;
+ rcvd_holderToSync = m.holderToSend;
}
if (m.gcVersions != null) {
- InitialImageOperation.this.gcVersions = m.gcVersions;
+ gcVersions = m.gcVersions;
}
}
if (isDone) {
@@ -1355,9 +1345,9 @@ public class InitialImageOperation {
}
} catch (RegionDestroyedException e) {
// bug #46135 - disk store can throw this exception
- InitialImageOperation.this.region.getCancelCriterion().checkCancelInProgress(e);
+ region.getCancelCriterion().checkCancelInProgress(e);
} finally {
- this.msgsBeingProcessed.decrementAndGet();
+ msgsBeingProcessed.decrementAndGet();
checkIfDone(); // check to see if decrementing msgsBeingProcessed requires signaling to
// proceed
EntryLogger.clearSource();
@@ -1381,13 +1371,13 @@ public class InitialImageOperation {
if (finishedWaiting) { // volatile fetch
return false;
}
- if (this.msgsBeingProcessed.get() > 0) {
+ if (msgsBeingProcessed.get() > 0) {
// to fix bug 37391 always wait for msgsBeingProcessed to go to 0;
// even if abort is true.
return true;
}
// Volatile fetches and volatile store:
- if (this.abort || !super.stillWaiting()) {
+ if (abort || !super.stillWaiting()) {
finishedWaiting = true;
return false;
} else {
@@ -1400,25 +1390,25 @@ public class InitialImageOperation {
public String toString() {
// bug 37189 These strings are a work-around for an escaped reference
// in ReplyProcessor21 constructor
- String msgsBeingProcessedStr = String.valueOf(this.msgsBeingProcessed.get());
- String regionStr = (InitialImageOperation.this.region == null) ? "nullRef"
- : InitialImageOperation.this.region.getFullPath();
- String numMembersStr = (this.members == null) ? "nullRef" : String.valueOf(numMembers());
+ String msgsBeingProcessedStr = String.valueOf(msgsBeingProcessed.get());
+ String regionStr = (region == null) ? "nullRef"
+ : region.getFullPath();
+ String numMembersStr = (members == null) ? "nullRef" : String.valueOf(numMembers());
// String membersToStr = (this.members == null) ? "nullRef" : membersToString();
- return "<" + this.getClass().getName() + " " + this.getProcessorId() + " waiting for "
+ return "<" + getClass().getName() + " " + getProcessorId() + " waiting for "
+ numMembersStr + " replies" + (exception == null ? "" : (" exception: " + exception))
+ " from " + membersToString() + "; waiting for " + msgsBeingProcessedStr
- + " messages in-flight; " + "region=" + regionStr + "; abort=" + this.abort + ">";
+ + " messages in-flight; " + "region=" + regionStr + "; abort=" + abort + ">";
}
private Status getStatus(InternalDistributedMember sender) {
Status status;
synchronized (this) {
- status = (Status) this.statusMap.get(sender);
+ status = statusMap.get(sender);
if (status == null) {
status = new Status();
- this.statusMap.put(sender, status);
+ statusMap.put(sender, status);
}
}
return status;
@@ -1431,10 +1421,10 @@ public class InitialImageOperation {
}
protected static String arrayToString(int[] a) {
- StringBuffer buf = new StringBuffer();
+ StringBuilder buf = new StringBuilder();
buf.append("[");
for (int i = 0; i < a.length; i++) {
- buf.append(String.valueOf(a[i]));
+ buf.append(a[i]);
if (i < (a.length - 1)) {
buf.append(",");
}
@@ -1448,7 +1438,7 @@ public class InitialImageOperation {
final boolean isDebugEnabled = logger.isDebugEnabled();
- LocalRegion lclRgn = null;
+ LocalRegion localRegion;
final InitializationLevel initLevel = targetReinitialized ? AFTER_INITIAL_IMAGE : ANY_INIT;
final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(initLevel);
try {
@@ -1457,12 +1447,13 @@ public class InitialImageOperation {
regionPath, initLevel);
}
InternalCache cache = dm.getExistingCache();
- lclRgn = cache == null ? null : (LocalRegion) cache.getRegion(regionPath);
+ localRegion = cache == null ? null : (LocalRegion) cache.getRegion(regionPath);
// if this is a targeted getInitialImage after a region was initialized,
// make sure this is the region that was reinitialized.
- if (lclRgn != null && !lclRgn.isUsedForPartitionedRegionBucket() && targetReinitialized
- && !lclRgn.reinitialized_new()) {
- lclRgn = null; // got a region that wasn't reinitialized, so must not be the right one
+ if (localRegion != null && !localRegion.isUsedForPartitionedRegionBucket()
+ && targetReinitialized
+ && !localRegion.reinitialized_new()) {
+ localRegion = null; // got a region that wasn't reinitialized, so must not be the right one
if (isDebugEnabled) {
logger.debug(
"GII message process: Found region, but wasn't reinitialized, so assuming region destroyed and recreated");
@@ -1471,23 +1462,23 @@ public class InitialImageOperation {
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
- if (lclRgn == null || !lclRgn.isInitialized()) {
+ if (localRegion == null || !localRegion.isInitialized()) {
if (isDebugEnabled) {
logger.debug("{}, nothing to do",
- (lclRgn == null ? "region not found" : "region not initialized yet"));
+ (localRegion == null ? "region not found" : "region not initialized yet"));
}
// allow finally block to send a failure message
return null;
}
- if (lclRgn.getScope().isLocal()) {
+ if (localRegion.getScope().isLocal()) {
if (isDebugEnabled) {
logger.debug("local scope region, nothing to do");
}
// allow finally block to send a failure message
return null;
}
- return lclRgn;
+ return localRegion;
}
/**
@@ -1543,7 +1534,7 @@ public class InitialImageOperation {
protected transient boolean severeAlertEnabled;
/* key list for unfinished operations */
- protected Set unfinishedKeys;
+ protected Set<Object> unfinishedKeys;
/** The versions in which this message was modified */
@Immutable
@@ -1551,12 +1542,12 @@ public class InitialImageOperation {
@Override
public int getProcessorId() {
- return this.processorId;
+ return processorId;
}
@Override
public int getProcessorType() {
- return this.targetReinitialized ? OperationExecutors.WAITING_POOL_EXECUTOR
+ return targetReinitialized ? OperationExecutors.WAITING_POOL_EXECUTOR
: OperationExecutors.HIGH_PRIORITY_EXECUTOR;
}
@@ -1595,9 +1586,9 @@ public class InitialImageOperation {
DistributedRegion targetRegion = null;
boolean sendFailureMessage = true;
try {
- Assert.assertTrue(this.regionPath != null, "Region path is null.");
+ Assert.assertTrue(regionPath != null, "Region path is null.");
final DistributedRegion rgn =
- (DistributedRegion) getGIIRegion(dm, this.regionPath, this.targetReinitialized);
+ (DistributedRegion) getGIIRegion(dm, regionPath, targetReinitialized);
if (lostMemberID != null) {
targetRegion = rgn;
}
@@ -1611,13 +1602,13 @@ public class InitialImageOperation {
internalAfterReceivedRequestImage.run();
}
- if (this.versionVector != null) {
- if (this.versionVector.isForSynchronization() && !rgn.getConcurrencyChecksEnabled()) {
+ if (versionVector != null) {
+ if (versionVector.isForSynchronization() && !rgn.getConcurrencyChecksEnabled()) {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE,
"ignoring synchronization request as this region has no version vector");
}
- replyNoData(dm, true, Collections.EMPTY_MAP);
+ replyNoData(dm, true, Collections.emptyMap());
sendFailureMessage = false;
return;
}
@@ -1628,11 +1619,11 @@ public class InitialImageOperation {
// [bruce] I suppose it's possible to have this check return a list of
// specific versions that the sender is missing. The current check
// just stops when it finds the first inconsistency
- if (!rgn.getVersionVector().isNewerThanOrCanFillExceptionsFor(this.versionVector)) {
+ if (!rgn.getVersionVector().isNewerThanOrCanFillExceptionsFor(versionVector)) {
// Delta GII might have unfinished operations to send. Otherwise,
// no need to send any data. This is a synchronization request and this region's
// vector doesn't have anything that the other region needs
- if (this.unfinishedKeys == null || this.unfinishedKeys.isEmpty()) {
+ if (unfinishedKeys == null || unfinishedKeys.isEmpty()) {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE,
"version vector reports that I have nothing that the requester hasn't already seen");
@@ -1645,7 +1636,7 @@ public class InitialImageOperation {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE,
"version vector reports that I have updates the requester hasn't seen, remote rvv is {}",
- this.versionVector);
+ versionVector);
}
}
}
@@ -1667,21 +1658,21 @@ public class InitialImageOperation {
}
boolean markedOngoingGII = false;
try {
- boolean recoveringForLostMember = (this.lostMemberVersionID != null);
+ boolean recoveringForLostMember = (lostMemberVersionID != null);
RegionVersionHolder holderToSync = null;
- if (recoveringForLostMember && this.lostMemberID != null) {
+ if (recoveringForLostMember && lostMemberID != null) {
// wait for the lost member to be gone from this VM's membership and all ops applied to
// the cache
try {
- dm.getDistribution().waitForDeparture(this.lostMemberID);
+ dm.getDistribution().waitForDeparture(lostMemberID);
RegionVersionHolder rvh =
- rgn.getVersionVector().getHolderForMember(this.lostMemberVersionID);
+ rgn.getVersionVector().getHolderForMember(lostMemberVersionID);
if (rvh != null) {
holderToSync = rvh.clone();
}
if (isGiiDebugEnabled) {
RegionVersionHolder holderOfRequest =
- this.versionVector.getHolderForMember(this.lostMemberVersionID);
+ versionVector.getHolderForMember(lostMemberVersionID);
if (holderToSync != null
&& holderToSync.isNewerThanOrCanFillExceptionsFor(holderOfRequest)) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE,
@@ -1693,7 +1684,7 @@ public class InitialImageOperation {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE,
"timed out waiting for the departure of {} before processing delta GII request",
- this.lostMemberID);
+ lostMemberID);
}
}
}
@@ -1702,17 +1693,17 @@ public class InitialImageOperation {
Map<? extends DataSerializable, ? extends DataSerializable> eventState =
rgn.getEventState();
if (eventState != null && eventState.size() > 0) {
- RegionStateMessage.send(dm, getSender(), this.processorId, eventState, true);
+ RegionStateMessage.send(dm, getSender(), processorId, eventState, true);
}
}
- if (this.checkTombstoneVersions && this.versionVector != null
+ if (checkTombstoneVersions && versionVector != null
&& rgn.getConcurrencyChecksEnabled()) {
synchronized (rgn.getCache().getTombstoneService().getBlockGCLock()) {
- if (goWithFullGII(rgn, this.versionVector)) {
+ if (goWithFullGII(rgn, versionVector)) {
if (isGiiDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERBOSE, "have to do fullGII");
}
- this.versionVector = null; // full GII
+ versionVector = null; // full GII
} else {
// lock GIILock only for deltaGII
int count = rgn.getCache().getTombstoneService().incrementGCBlockCount();
@@ -1726,7 +1717,7 @@ public class InitialImageOperation {
}
final RegionVersionHolder holderToSend = holderToSync;
boolean finished = chunkEntries(rgn, CHUNK_SIZE_IN_BYTES, !keysOnly, versionVector,
- (HashSet) this.unfinishedKeys, flowControl, new ObjectIntProcedure() {
+ unfinishedKeys, flowControl, new ObjectIntProcedure() {
int msgNum = 0;
boolean last = false;
@@ -1742,25 +1733,23 @@ public class InitialImageOperation {
return false;
}
- if (this.last) {
- throw new InternalGemFireError(
- "Already processed last chunk");
+ if (last) {
+ throw new InternalGemFireError("Already processed last chunk");
}
- List entries = (List) entList;
- this.last = b > 0 && !lclAbortTest; // if abortTest, then never send last flag set
- // to true
+ List<Entry> entries = uncheckedCast(entList);
+ // if abortTest, then never send last flag set to true
+ last = b > 0 && !lclAbortTest;
try {
boolean abort = rgn.isDestroyed();
if (!abort) {
int flowControlId = flowControl.getId();
Map<VersionSource<?>, Long> gcVersions = null;
- if (this.last && rgn.getVersionVector() != null) {
+ if (last && rgn.getVersionVector() != null) {
gcVersions = rgn.getVersionVector().getMemberToGCVersion();
}
- replyWithData(dm, entries, seriesNum, msgNum++, numSeries, this.last,
- flowControlId,
- versionVector != null, holderToSend, gcVersions);
+ replyWithData(dm, entries, seriesNum, msgNum++, numSeries, last,
+ flowControlId, versionVector != null, holderToSend, gcVersions);
}
return !abort;
} catch (CancelException e) {
@@ -1910,35 +1899,37 @@ public class InitialImageOperation {
* @return true if finished all chunks, false if stopped early
*/
protected boolean chunkEntries(DistributedRegion rgn, int chunkSizeInBytes,
- boolean includeValues, RegionVersionVector versionVector, HashSet unfinishedKeys,
+ boolean includeValues, RegionVersionVector versionVector, Set<Object> unfinishedKeys,
InitialImageFlowControl flowControl, ObjectIntProcedure proc) throws IOException {
- boolean keepGoing = true;
- boolean sentLastChunk = false;
int MAX_ENTRIES_PER_CHUNK = chunkSizeInBytes / 100;
if (MAX_ENTRIES_PER_CHUNK < 1000) {
MAX_ENTRIES_PER_CHUNK = 1000;
}
- ByteArrayDataInput in = null;
- ClusterDistributionManager dm = (ClusterDistributionManager) rgn.getDistributionManager();
- List chunkEntries = null;
- chunkEntries = new InitialImageVersionedEntryList(rgn.getConcurrencyChecksEnabled(),
- MAX_ENTRIES_PER_CHUNK);
- DiskRegion dr = rgn.getDiskRegion();
+ final List<Entry> chunkEntries =
+ new InitialImageVersionedEntryList(rgn.getConcurrencyChecksEnabled(),
+ MAX_ENTRIES_PER_CHUNK);
+
+ final DiskRegion dr = rgn.getDiskRegion();
+ final ByteArrayDataInput in;
if (dr != null) {
dr.setClearCountReference();
in = new ByteArrayDataInput();
+ } else {
+ in = null;
}
+
VersionSource myId = rgn.getVersionMember();
- Set<VersionSource> foundIds = new HashSet<VersionSource>();
+ Set<VersionSource> foundIds = new HashSet<>();
+
if (internalDuringPackingImage != null
- && this.regionPath.endsWith(internalDuringPackingImage.getRegionName())) {
+ && regionPath.endsWith(internalDuringPackingImage.getRegionName())) {
internalDuringPackingImage.run();
}
try {
- Iterator it = null;
+ final Iterator<?> it;
if (versionVector != null) {
// deltaGII
it = rgn.entries.regionEntries().iterator();
@@ -1949,6 +1940,8 @@ public class InitialImageOperation {
final KnownVersion knownVersion = Versioning
.getKnownVersionOrDefault(sender.getVersion(), KnownVersion.CURRENT);
+ boolean keepGoing;
+ boolean sentLastChunk;
do {
flowControl.acquirePermit();
int currentChunkSize = 0;
@@ -1968,15 +1961,14 @@ public class InitialImageOperation {
}
}
}
- InitialImageOperation.Entry entry = null;
+ final InitialImageOperation.Entry entry;
if (includeValues) {
- boolean fillRes = false;
+ final boolean fillRes;
try {
// also fills in lastModifiedTime
VersionStamp<?> stamp = mapEntry.getVersionStamp();
if (stamp != null) {
- synchronized (mapEntry) { // bug #46042 must sync to make sure the tag goes with
- // the value
+ synchronized (mapEntry) { // must sync to make sure the tag goes with the value
VersionSource<?> id = stamp.getMemberID();
if (id == null) {
id = myId;
@@ -2021,7 +2013,7 @@ public class InitialImageOperation {
entry = new InitialImageOperation.Entry();
entry.key = key;
entry.setLocalInvalid();
- entry.setLastModified(rgn.getDistributionManager(), mapEntry.getLastModified());
+ entry.setLastModified(mapEntry.getLastModified());
}
chunkEntries.add(entry);
@@ -2054,14 +2046,14 @@ public class InitialImageOperation {
private void replyNoData(ClusterDistributionManager dm, boolean isDeltaGII,
Map<VersionSource<?>, Long> gcVersions) {
- ImageReplyMessage.send(getSender(), this.processorId, null, dm, null, 0, 0, 1, true, 0,
+ ImageReplyMessage.send(getSender(), processorId, null, dm, null, 0, 0, 1, true, 0,
isDeltaGII, null, gcVersions);
}
- protected void replyWithData(ClusterDistributionManager dm, List entries, int seriesNum,
+ protected void replyWithData(ClusterDistributionManager dm, List<Entry> entries, int seriesNum,
int msgNum, int numSeries, boolean lastInSeries, int flowControlId, boolean isDeltaGII,
RegionVersionHolder holderToSend, Map<VersionSource<?>, Long> gcVersions) {
- ImageReplyMessage.send(getSender(), this.processorId, null, dm, entries, seriesNum, msgNum,
+ ImageReplyMessage.send(getSender(), processorId, null, dm, entries, seriesNum, msgNum,
numSeries, lastInSeries, flowControlId, isDeltaGII, holderToSend, gcVersions);
}
@@ -2092,30 +2084,30 @@ public class InitialImageOperation {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.regionPath = DataSerializer.readString(in);
- this.processorId = in.readInt();
- this.keysOnly = in.readBoolean();
- this.targetReinitialized = in.readBoolean();
- this.checkTombstoneVersions = in.readBoolean();
- this.lostMemberVersionID = (VersionSource) context.getDeserializer().readObject(in);
- this.versionVector = (RegionVersionVector) context.getDeserializer().readObject(in);
- this.lostMemberID = (InternalDistributedMember) context.getDeserializer().readObject(in);
- this.unfinishedKeys = (Set) context.getDeserializer().readObject(in);
+ regionPath = DataSerializer.readString(in);
+ processorId = in.readInt();
+ keysOnly = in.readBoolean();
+ targetReinitialized = in.readBoolean();
+ checkTombstoneVersions = in.readBoolean();
+ lostMemberVersionID = context.getDeserializer().readObject(in);
+ versionVector = context.getDeserializer().readObject(in);
+ lostMemberID = context.getDeserializer().readObject(in);
+ unfinishedKeys = context.getDeserializer().readObject(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- DataSerializer.writeString(this.regionPath, out);
- out.writeInt(this.processorId);
- out.writeBoolean(this.keysOnly);
- out.writeBoolean(this.targetReinitialized);
- out.writeBoolean(this.checkTombstoneVersions);
- context.getSerializer().writeObject(this.lostMemberVersionID, out);
- context.getSerializer().writeObject(this.versionVector, out);
- context.getSerializer().writeObject(this.lostMemberID, out);
- context.getSerializer().writeObject(this.unfinishedKeys, out);
+ DataSerializer.writeString(regionPath, out);
+ out.writeInt(processorId);
+ out.writeBoolean(keysOnly);
+ out.writeBoolean(targetReinitialized);
+ out.writeBoolean(checkTombstoneVersions);
+ context.getSerializer().writeObject(lostMemberVersionID, out);
+ context.getSerializer().writeObject(versionVector, out);
+ context.getSerializer().writeObject(lostMemberID, out);
+ context.getSerializer().writeObject(unfinishedKeys, out);
}
@Override
@@ -2125,22 +2117,22 @@ public class InitialImageOperation {
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
buff.append(cname);
buff.append("(region path='"); // make sure this is the first one
- buff.append(this.regionPath);
+ buff.append(regionPath);
buff.append("'; sender=");
buff.append(getSender());
buff.append("; keysOnly=");
- buff.append(this.keysOnly);
+ buff.append(keysOnly);
buff.append("; processorId=");
- buff.append(this.processorId);
+ buff.append(processorId);
buff.append("; waitForInit=");
- buff.append(this.targetReinitialized);
+ buff.append(targetReinitialized);
buff.append("; checkTombstoneVersions=");
- buff.append(this.checkTombstoneVersions);
- if (this.lostMemberVersionID != null) {
+ buff.append(checkTombstoneVersions);
+ if (lostMemberVersionID != null) {
buff.append("; lostMember=").append(lostMemberVersionID);
}
buff.append("; versionVector=").append(versionVector);
@@ -2166,10 +2158,6 @@ public class InitialImageOperation {
super(system, member);
}
- public FilterInfoProcessor(InternalDistributedSystem system, Set members) {
- super(system, members);
- }
-
@Override
public void process(DistributionMessage msg) {
// ignore messages from members not in the wait list
@@ -2204,7 +2192,7 @@ public class InitialImageOperation {
logger.info("Exception while registering filters during GII: {}", ex.getMessage(), ex);
}
- this.filtersReceived = true;
+ filtersReceived = true;
} finally {
super.process(msg);
@@ -2214,7 +2202,7 @@ public class InitialImageOperation {
@Override
public String toString() {
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
- return "<" + cname + " " + this.getProcessorId() + " replies"
+ return "<" + cname + " " + getProcessorId() + " replies"
+ (exception == null ? "" : (" exception: " + exception)) + " from " + membersToString()
+ ">";
}
@@ -2243,7 +2231,7 @@ public class InitialImageOperation {
@Override
public int getProcessorId() {
- return this.processorId;
+ return processorId;
}
@Override
@@ -2258,7 +2246,7 @@ public class InitialImageOperation {
InternalRegion lclRgn = null;
ReplyException rex = null;
try {
- Assert.assertTrue(this.regionPath != null, "Region path is null.");
+ Assert.assertTrue(regionPath != null, "Region path is null.");
InternalCache cache = dm.getCache();
lclRgn = cache == null ? null : cache.getInternalRegionByPath(regionPath);
@@ -2277,7 +2265,7 @@ public class InitialImageOperation {
}
final DistributedRegion rgn = (DistributedRegion) lclRgn;
- FilterInfoMessage.send(dm, getSender(), this.processorId, rgn, null);
+ FilterInfoMessage.send(dm, getSender(), processorId, rgn, null);
sendFailureMessage = false;
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
@@ -2308,7 +2296,7 @@ public class InitialImageOperation {
if (rex == null) {
rex = new ReplyException("Failed to process filter info request.");
}
- FilterInfoMessage.send(dm, getSender(), this.processorId, (LocalRegion) lclRgn, rex);
+ FilterInfoMessage.send(dm, getSender(), processorId, (LocalRegion) lclRgn, rex);
} // !success
}
}
@@ -2322,29 +2310,29 @@ public class InitialImageOperation {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.regionPath = DataSerializer.readString(in);
- this.processorId = in.readInt();
+ regionPath = DataSerializer.readString(in);
+ processorId = in.readInt();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- DataSerializer.writeString(this.regionPath, out);
- out.writeInt(this.processorId);
+ DataSerializer.writeString(regionPath, out);
+ out.writeInt(processorId);
}
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
buff.append(cname);
buff.append("(region path='");
- buff.append(this.regionPath);
+ buff.append(regionPath);
buff.append("'; sender=");
buff.append(getSender());
buff.append("; processorId=");
- buff.append(this.processorId);
+ buff.append(processorId);
buff.append(")");
return buff.toString();
}
@@ -2363,10 +2351,6 @@ public class InitialImageOperation {
super(system, member);
}
- public RequestRVVProcessor(InternalDistributedSystem system, Set members) {
- super(system, members);
- }
-
@Override
public void process(DistributionMessage msg) {
final boolean isGiiDebugEnabled = logger.isTraceEnabled(LogMarker.INITIAL_IMAGE_VERBOSE);
@@ -2411,10 +2395,8 @@ public class InitialImageOperation {
@Override
public String toString() {
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
- StringBuffer sb = new StringBuffer();
- sb.append("<" + cname + " " + this.getProcessorId());
- sb.append(" ,from " + membersToString() + ">");
- return sb.toString();
+ return "<" + cname + " " + getProcessorId()
+ + " ,from " + membersToString() + ">";
}
@Override
@@ -2442,7 +2424,7 @@ public class InitialImageOperation {
RegionVersionVector rvv) {
setRecipient(mbr);
setProcessorId(processorId);
- this.versionVector = rvv;
+ versionVector = rvv;
}
public static void send(DistributionManager dm, InternalDistributedMember dest, int processorId,
@@ -2521,12 +2503,12 @@ public class InitialImageOperation {
@Override
public int getProcessorId() {
- return this.processorId;
+ return processorId;
}
@Override
public int getProcessorType() {
- return this.targetReinitialized ? OperationExecutors.WAITING_POOL_EXECUTOR
+ return targetReinitialized ? OperationExecutors.WAITING_POOL_EXECUTOR
: OperationExecutors.HIGH_PRIORITY_EXECUTOR;
}
@@ -2534,12 +2516,11 @@ public class InitialImageOperation {
protected void process(final ClusterDistributionManager dm) {
Throwable thr = null;
boolean sendFailureMessage = true;
- LocalRegion lclRgn = null;
ReplyException rex = null;
try {
- Assert.assertTrue(this.regionPath != null, "Region path is null.");
+ Assert.assertTrue(regionPath != null, "Region path is null.");
final DistributedRegion rgn =
- (DistributedRegion) getGIIRegion(dm, this.regionPath, this.targetReinitialized);
+ (DistributedRegion) getGIIRegion(dm, regionPath, targetReinitialized);
if (rgn == null) {
return;
}
@@ -2549,13 +2530,11 @@ public class InitialImageOperation {
}
// allow finally block to send a failure message
RVVReplyMessage.send(dm, getSender(), processorId, null, null);
- sendFailureMessage = false;
- return;
} else {
RegionVersionVector rvv = rgn.getVersionVector().getCloneForTransmission();
RVVReplyMessage.send(dm, getSender(), processorId, rvv, null);
- sendFailureMessage = false;
}
+ sendFailureMessage = false;
} catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("{}; Region destroyed: Request RVV aborting.", this);
@@ -2599,33 +2578,33 @@ public class InitialImageOperation {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.regionPath = DataSerializer.readString(in);
- this.processorId = in.readInt();
- this.targetReinitialized = in.readBoolean();
+ regionPath = DataSerializer.readString(in);
+ processorId = in.readInt();
+ targetReinitialized = in.readBoolean();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- DataSerializer.writeString(this.regionPath, out);
- out.writeInt(this.processorId);
- out.writeBoolean(this.targetReinitialized);
+ DataSerializer.writeString(regionPath, out);
+ out.writeInt(processorId);
+ out.writeBoolean(targetReinitialized);
}
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
buff.append(cname);
buff.append("(region path='");
- buff.append(this.regionPath);
+ buff.append(regionPath);
buff.append("'; sender=");
buff.append(getSender());
buff.append("; processorId=");
- buff.append(this.processorId);
+ buff.append(processorId);
buff.append("; targetReinitalized=");
- buff.append(this.targetReinitialized);
+ buff.append(targetReinitialized);
buff.append(")");
return buff.toString();
}
@@ -2651,15 +2630,14 @@ public class InitialImageOperation {
@Override
protected void process(final ClusterDistributionManager dm) {
- LocalRegion lclRgn = null;
try {
- Assert.assertTrue(this.regionPath != null, "Region path is null.");
- final DistributedRegion rgn = (DistributedRegion) getGIIRegion(dm, this.regionPath, false);
+ Assert.assertTrue(regionPath != null, "Region path is null.");
+ final DistributedRegion rgn = (DistributedRegion) getGIIRegion(dm, regionPath, false);
if (rgn != null) {
if (logger.isDebugEnabled()) {
logger.debug("synchronizing region with {}", Arrays.toString(lostVersionSources));
}
- for (VersionSource lostSource : this.lostVersionSources) {
+ for (VersionSource lostSource : lostVersionSources) {
InternalDistributedMember mbr = null;
if (lostSource instanceof InternalDistributedMember) {
mbr = (InternalDistributedMember) lostSource;
@@ -2693,10 +2671,10 @@ public class InitialImageOperation {
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- DataSerializer.writeString(this.regionPath, out);
- out.writeBoolean(this.lostVersionSources[0] instanceof DiskStoreID);
- out.writeInt(this.lostVersionSources.length);
- for (VersionSource id : this.lostVersionSources) {
+ DataSerializer.writeString(regionPath, out);
+ out.writeBoolean(lostVersionSources[0] instanceof DiskStoreID);
+ out.writeInt(lostVersionSources.length);
+ for (VersionSource id : lostVersionSources) {
id.writeEssentialData(out);
}
}
@@ -2705,26 +2683,26 @@ public class InitialImageOperation {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.regionPath = DataSerializer.readString(in);
+ regionPath = DataSerializer.readString(in);
boolean persistentIDs = in.readBoolean();
int len = in.readInt();
- this.lostVersionSources = new VersionSource[len];
+ lostVersionSources = new VersionSource[len];
for (int i = 0; i < len; i++) {
- this.lostVersionSources[i] = (persistentIDs ? DiskStoreID.readEssentialData(in)
+ lostVersionSources[i] = (persistentIDs ? DiskStoreID.readEssentialData(in)
: InternalDistributedMember.readEssentialData(in));
}
}
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
buff.append(cname);
buff.append("(region path='");
- buff.append(this.regionPath);
+ buff.append(regionPath);
buff.append("'; sender=");
buff.append(getSender());
- buff.append("; sources=").append(Arrays.toString(this.lostVersionSources));
+ buff.append("; sources=").append(Arrays.toString(lostVersionSources));
buff.append(")");
return buff.toString();
}
@@ -2733,7 +2711,7 @@ public class InitialImageOperation {
public static class ImageReplyMessage extends ReplyMessage {
/** the next entries in this chunk. Null means abort. */
- protected List entries;
+ protected List<Entry> entries;
/** total number of series, duplicated in each message */
protected int numSeries;
@@ -2785,7 +2763,7 @@ public class InitialImageOperation {
* @param holderToSend higher version holder to sync for the lost member
*/
public static void send(InternalDistributedMember recipient, int processorId,
- ReplyException exception, ClusterDistributionManager dm, List entries, int seriesNum,
+ ReplyException exception, ClusterDistributionManager dm, List<Entry> entries, int seriesNum,
int msgNum, int numSeries, boolean lastInSeries, int flowControlId, boolean isDeltaGII,
RegionVersionHolder holderToSend, Map<VersionSource<?>, Long> gcVersions) {
ImageReplyMessage m = new ImageReplyMessage();
@@ -2823,8 +2801,8 @@ public class InitialImageOperation {
// TODO we probably should send an abort message to the sender
// if we have aborted, but at the very least we need to keep
// the permits going.
- if (this.flowControlId != 0) {
- FlowControlPermitMessage.send(dm, getSender(), this.flowControlId);
+ if (flowControlId != 0) {
+ FlowControlPermitMessage.send(dm, getSender(), flowControlId);
}
}
}
@@ -2844,26 +2822,26 @@ public class InitialImageOperation {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- ArrayList list = DataSerializer.readArrayList(in);
+ ArrayList<?> list = DataSerializer.readArrayList(in);
Object listData = null;
- if (list != null /* fix bug 46874 */ && list.size() > 0) {
+ if (list != null && list.size() > 0) {
listData = list.get(0);
}
if (listData instanceof InitialImageVersionedEntryList) {
- this.entries = (List) listData;
+ entries = (InitialImageVersionedEntryList) listData;
} else {
- this.entries = list;
+ entries = uncheckedCast(list);
}
- this.seriesNum = in.readInt();
- this.msgNum = in.readInt();
- this.numSeries = in.readInt();
- this.lastInSeries = in.readBoolean();
- this.flowControlId = in.readInt();
- this.remoteVersion = StaticSerialization.getVersionForDataStreamOrNull(in);
- this.isDeltaGII = in.readBoolean();
- this.hasHolderToSend = in.readBoolean();
- if (this.hasHolderToSend) {
- this.holderToSend = new RegionVersionHolder(in);
+ seriesNum = in.readInt();
+ msgNum = in.readInt();
+ numSeries = in.readInt();
+ lastInSeries = in.readBoolean();
+ flowControlId = in.readInt();
+ remoteVersion = StaticSerialization.getVersionForDataStreamOrNull(in);
+ isDeltaGII = in.readBoolean();
+ hasHolderToSend = in.readBoolean();
+ if (hasHolderToSend) {
+ holderToSend = new RegionVersionHolder(in);
}
int gcVersionsLength = in.readShort();
@@ -2881,22 +2859,22 @@ public class InitialImageOperation {
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
- if (this.entries instanceof InitialImageVersionedEntryList) {
- ArrayList list = new ArrayList(1);
- list.add(this.entries);
+ if (entries instanceof InitialImageVersionedEntryList) {
+ ArrayList<List<Entry>> list = new ArrayList<>(1);
+ list.add(entries);
DataSerializer.writeArrayList(list, out);
} else {
- DataSerializer.writeArrayList((ArrayList) this.entries, out);
+ DataSerializer.writeArrayList((ArrayList<Entry>) entries, out);
}
- out.writeInt(this.seriesNum);
- out.writeInt(this.msgNum);
- out.writeInt(this.numSeries);
- out.writeBoolean(this.lastInSeries);
- out.writeInt(this.flowControlId);
- out.writeBoolean(this.isDeltaGII);
- out.writeBoolean(this.hasHolderToSend);
- if (this.hasHolderToSend) {
- InternalDataSerializer.invokeToData(this.holderToSend, out);
+ out.writeInt(seriesNum);
+ out.writeInt(msgNum);
+ out.writeInt(numSeries);
+ out.writeBoolean(lastInSeries);
+ out.writeInt(flowControlId);
+ out.writeBoolean(isDeltaGII);
+ out.writeBoolean(hasHolderToSend);
+ if (hasHolderToSend) {
+ InternalDataSerializer.invokeToData(holderToSend, out);
}
out.writeShort(gcVersions == null ? -1 : gcVersions.size());
if (gcVersions != null) {
@@ -2909,14 +2887,14 @@ public class InitialImageOperation {
@Override
public String toString() {
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
String cname = getClass().getName().substring(getClass().getPackage().getName().length() + 1);
buff.append(cname);
buff.append("(processorId=");
- buff.append(this.processorId);
+ buff.append(processorId);
buff.append(" from ");
- buff.append(this.getSender());
- ReplyException ex = this.getException();
+ buff.append(getSender());
+ ReplyException ex = getException();
if (ex != null) {
buff.append(" with exception ");
buff.append(ex);
@@ -2925,25 +2903,25 @@ public class InitialImageOperation {
buff.append("; with no data - abort");
} else {
buff.append("; entryCount=");
- buff.append(this.entries.size());
+ buff.append(entries.size());
buff.append("; msgNum=");
- buff.append(this.msgNum);
+ buff.append(msgNum);
buff.append("; Series=");
- buff.append(this.seriesNum);
+ buff.append(seriesNum);
buff.append("/");
- buff.append(this.numSeries);
+ buff.append(numSeries);
buff.append("; lastInSeries=");
- buff.append(this.lastInSeries);
+ buff.append(lastInSeries);
buff.append("; flowControlId=");
- buff.append(this.flowControlId);
+ buff.append(flowControlId);
buff.append("; isDeltaGII=");
- buff.append(this.isDeltaGII);
+ buff.append(isDeltaGII);
}
- if (this.remoteVersion != null) {
- buff.append("; remoteVersion=").append(this.remoteVersion);
+ if (remoteVersion != null) {
+ buff.append("; remoteVersion=").append(remoteVersion);
}
- if (this.holderToSend != null) {
- buff.append("; holderToSend=").append(this.holderToSend);
+ if (holderToSend != null) {
+ buff.append("; holderToSend=").append(holderToSend);
}
buff.append(")");
return buff.toString();
@@ -2988,43 +2966,43 @@ public class InitialImageOperation {
private VersionTag versionTag;
/** Given local milliseconds, store as cache milliseconds */
- public void setLastModified(DistributionManager dm, long localMillis) {
- this.lastModified = localMillis;
+ public void setLastModified(long localMillis) {
+ lastModified = localMillis;
}
/** Return lastModified as local milliseconds */
- public long getLastModified(DistributionManager dm) {
- return this.lastModified;
+ public long getLastModified() {
+ return lastModified;
}
public boolean isSerialized() {
- return EntryBits.isSerialized(this.entryBits);
+ return EntryBits.isSerialized(entryBits);
}
public void setSerialized(boolean isSerialized) {
- this.entryBits = EntryBits.setSerialized(this.entryBits, isSerialized);
+ entryBits = EntryBits.setSerialized(entryBits, isSerialized);
}
public boolean isInvalid() {
- return (this.value == null) && !EntryBits.isLocalInvalid(this.entryBits);
+ return (value == null) && !EntryBits.isLocalInvalid(entryBits);
}
public void setInvalid() {
- this.entryBits = EntryBits.setLocalInvalid(this.entryBits, false);
- this.value = null;
+ entryBits = EntryBits.setLocalInvalid(entryBits, false);
+ value = null;
}
public boolean isLocalInvalid() {
- return EntryBits.isLocalInvalid(this.entryBits);
+ return EntryBits.isLocalInvalid(entryBits);
}
public void setLocalInvalid() {
- this.entryBits = EntryBits.setLocalInvalid(this.entryBits, true);
- this.value = null;
+ entryBits = EntryBits.setLocalInvalid(entryBits, true);
+ value = null;
}
public void setTombstone() {
- this.entryBits = EntryBits.setTombstone(this.entryBits, true);
+ entryBits = EntryBits.setTombstone(entryBits, true);
}
public Object getKey() {
@@ -3036,7 +3014,7 @@ public class InitialImageOperation {
}
public void setVersionTag(VersionTag tag) {
- this.versionTag = tag;
+ versionTag = tag;
}
@Override
@@ -3047,17 +3025,17 @@ public class InitialImageOperation {
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
- out.writeByte(this.entryBits);
- byte flags = (this.versionTag != null) ? HAS_VERSION : 0;
- flags |= (this.versionTag instanceof DiskVersionTag) ? PERSISTENT_VERSION : 0;
+ out.writeByte(entryBits);
+ byte flags = (versionTag != null) ? HAS_VERSION : 0;
+ flags |= (versionTag instanceof DiskVersionTag) ? PERSISTENT_VERSION : 0;
out.writeByte(flags);
- context.getSerializer().writeObject(this.key, out);
- if (!EntryBits.isTombstone(this.entryBits)) {
- DataSerializer.writeObjectAsByteArray(this.value, out);
+ context.getSerializer().writeObject(key, out);
+ if (!EntryBits.isTombstone(entryBits)) {
+ DataSerializer.writeObjectAsByteArray(value, out);
}
- out.writeLong(this.lastModified);
- if (this.versionTag != null) {
- InternalDataSerializer.invokeToData(this.versionTag, out);
+ out.writeLong(lastModified);
+ if (versionTag != null) {
+ InternalDataSerializer.invokeToData(versionTag, out);
}
}
@@ -3067,19 +3045,19 @@ public class InitialImageOperation {
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
- this.entryBits = in.readByte();
+ entryBits = in.readByte();
byte flags = in.readByte();
- this.key = context.getDeserializer().readObject(in);
+ key = context.getDeserializer().readObject(in);
- if (EntryBits.isTombstone(this.entryBits)) {
- this.value = Token.TOMBSTONE;
+ if (EntryBits.isTombstone(entryBits)) {
+ value = Token.TOMBSTONE;
} else {
- this.value = DataSerializer.readByteArray(in);
+ value = DataSerializer.readByteArray(in);
}
- this.lastModified = in.readLong();
+ lastModified = in.readLong();
if ((flags & HAS_VERSION) != 0) {
// note that null IDs must be later replaced with the image provider's ID
- this.versionTag = VersionTag.create((flags & PERSISTENT_VERSION) != 0, in);
+ versionTag = VersionTag.create((flags & PERSISTENT_VERSION) != 0, in);
}
}
@@ -3089,16 +3067,13 @@ public class InitialImageOperation {
toData(dos, InternalDataSerializer.createSerializationContext(dos));
return dos.size();
} catch (IOException ex) {
- RuntimeException ex2 = new IllegalArgumentException(
- "Could not calculate size of object");
- ex2.initCause(ex);
- throw ex2;
+ throw new IllegalArgumentException("Could not calculate size of object", ex);
}
}
@Override
public String toString() {
- return "GIIEntry[key=" + this.key + "]";
+ return "GIIEntry[key=" + key + "]";
}
@Override
@@ -3137,16 +3112,16 @@ public class InitialImageOperation {
public InitialImageVersionedEntryList() {
super();
- this.versionTags = new ArrayList();
+ versionTags = new ArrayList<>();
}
public InitialImageVersionedEntryList(boolean isRegionVersioned, int size) {
super(size);
this.isRegionVersioned = isRegionVersioned;
if (isRegionVersioned) {
- this.versionTags = new ArrayList(size);
+ versionTags = new ArrayList<>(size);
} else {
- this.versionTags = Collections.EMPTY_LIST;
+ versionTags = Collections.emptyList();
}
}
@@ -3168,13 +3143,13 @@ public class InitialImageOperation {
private boolean addEntryAndVersion(Entry entry, VersionTag versionTag) {
// version tag can be null if only keys are sent in InitialImage.
- if (this.isRegionVersioned && versionTag != null) {
- int tagsSize = this.versionTags.size();
+ if (isRegionVersioned && versionTag != null) {
+ int tagsSize = versionTags.size();
if (tagsSize != super.size()) {
// this should not happen - either all or none of the entries should have tags
throw new InternalGemFireException();
}
- this.versionTags.add(versionTag);
+ versionTags.add(versionTag);
}
// Add entry without version tag in top-level ArrayList.
@@ -3196,8 +3171,8 @@ public class InitialImageOperation {
}
private VersionTag<VersionSource> getVersionTag(int index) {
- VersionTag tag = null;
- if (isRegionVersioned && this.versionTags != null) {
+ VersionTag<VersionSource> tag = null;
+ if (isRegionVersioned && versionTags != null) {
tag = versionTags.get(index);
}
return tag;
@@ -3217,7 +3192,7 @@ public class InitialImageOperation {
@Override
public void clear() {
super.clear();
- this.versionTags.clear();
+ versionTags.clear();
}
/**
@@ -3225,7 +3200,7 @@ public class InitialImageOperation {
* @return whether the source region had concurrency checks enabled
*/
public boolean isRegionVersioned() {
- return this.isRegionVersioned;
+ return isRegionVersioned;
}
/**
@@ -3264,10 +3239,10 @@ public class InitialImageOperation {
flags |= 0x02;
hasEntries = true;
}
- if (this.versionTags.size() > 0) {
+ if (versionTags.size() > 0) {
flags |= 0x04;
hasTags = true;
- for (VersionTag tag : this.versionTags) {
+ for (VersionTag tag : versionTags) {
if (tag != null) {
if (tag instanceof DiskVersionTag) {
flags |= 0x20;
@@ -3276,7 +3251,7 @@ public class InitialImageOperation {
}
}
}
- if (this.isRegionVersioned) {
+ if (isRegionVersioned) {
flags |= 0x08;
}
@@ -3294,10 +3269,10 @@ public class InitialImageOperation {
}
}
if (hasTags) {
- InternalDataSerializer.writeUnsignedVL(this.versionTags.size(), out);
- Map<VersionSource, Integer> ids = new HashMap<VersionSource, Integer>(versionTags.size());
+ InternalDataSerializer.writeUnsignedVL(versionTags.size(), out);
+ Map<VersionSource, Integer> ids = new HashMap<>(versionTags.size());
int idCount = 0;
- for (VersionTag tag : this.versionTags) {
+ for (VersionTag tag : versionTags) {
if (tag == null) {
out.writeByte(FLAG_NULL_TAG);
} else {
@@ -3309,7 +3284,7 @@ public class InitialImageOperation {
Integer idNumber = ids.get(id);
if (idNumber == null) {
out.writeByte(FLAG_TAG_WITH_NEW_ID);
- idNumber = Integer.valueOf(idCount++);
+ idNumber = idCount++;
ids.put(id, idNumber);
InternalDataSerializer.invokeToData(tag, out);
} else {
@@ -3333,7 +3308,7 @@ public class InitialImageOperation {
int flags = in.readByte();
boolean hasEntries = (flags & 0x02) == 0x02;
boolean hasTags = (flags & 0x04) == 0x04;
- this.isRegionVersioned = (flags & 0x08) == 0x08;
+ isRegionVersioned = (flags & 0x08) == 0x08;
boolean persistent = (flags & 0x20) == 0x20;
if (isGiiVersionEntryDebugEnabled) {
@@ -3347,7 +3322,7 @@ public class InitialImageOperation {
logger.trace(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE, "reading {} keys", size);
}
for (int i = 0; i < size; i++) {
- super.add((Entry) context.getDeserializer().readObject(in));
+ super.add(context.getDeserializer().readObject(in));
}
}
@@ -3356,32 +3331,32 @@ public class InitialImageOperation {
if (isGiiVersionEntryDebugEnabled) {
logger.trace(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE, "reading {} version tags", size);
}
- this.versionTags = new ArrayList<VersionTag>(size);
- List<VersionSource> ids = new ArrayList<VersionSource>(size);
+ versionTags = new ArrayList<>(size);
+ List<VersionSource> ids = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
byte entryType = in.readByte();
switch (entryType) {
case FLAG_NULL_TAG:
- this.versionTags.add(null);
+ versionTags.add(null);
break;
case FLAG_FULL_TAG:
- this.versionTags.add(VersionTag.create(persistent, in));
+ versionTags.add(VersionTag.create(persistent, in));
break;
case FLAG_TAG_WITH_NEW_ID:
VersionTag tag = VersionTag.create(persistent, in);
ids.add(tag.getMemberID());
- this.versionTags.add(tag);
+ versionTags.add(tag);
break;
case FLAG_TAG_WITH_NUMBER_ID:
tag = VersionTag.create(persistent, in);
int idNumber = (int) InternalDataSerializer.readUnsignedVL(in);
tag.setMemberID(ids.get(idNumber));
- this.versionTags.add(tag);
+ versionTags.add(tag);
break;
}
}
} else {
- this.versionTags = new ArrayList<VersionTag>();
+ versionTags = new ArrayList<>();
}
}
@@ -3415,13 +3390,14 @@ public class InitialImageOperation {
return true;
}
- Map eventState;
+ Map<? extends DataSerializable, ? extends DataSerializable> eventState;
private boolean isHARegion;
RegionVersionVector versionVector;
public RegionStateMessage() {}
- private RegionStateMessage(InternalDistributedMember mbr, int processorId, Map eventState,
+ private RegionStateMessage(InternalDistributedMember mbr, int processorId,
+ Map<? extends DataSerializable, ? extends DataSerializable> eventState,
boolean isHARegion) {
setRecipient(mbr);
setProcessorId(processorId);
@@ -3433,7 +3409,7 @@ public class InitialImageOperation {
RegionVersionVector rvv, boolean isHARegion) {
setRecipient(mbr);
setProcessorId(processorId);
- this.versionVector = rvv;
+ versionVector = rvv;
this.isHARegion = isHARegion;
}
@@ -3493,7 +3469,7 @@ public class InitialImageOperation {
isHARegion = dip.readBoolean();
boolean has = dip.readBoolean();
if (has) {
- eventState = EventStateHelper.deDataSerialize(dip, isHARegion);
+ eventState = uncheckedCast(EventStateHelper.deDataSerialize(dip, isHARegion));
}
has = dip.readBoolean();
if (has) {
@@ -3521,27 +3497,27 @@ public class InitialImageOperation {
private LocalRegion haRegion;
- private Map emptyRegionMap;
+ private Map<String, Integer> emptyRegionMap;
static class InterestMaps {
Map<String, String> allKeys;
Map<String, String> allKeysInv;
- Map<String, Set> keysOfInterest;
+ Map<String, Set<?>> keysOfInterest;
- Map<String, Set> keysOfInterestInv;
+ Map<String, Set<?>> keysOfInterestInv;
- Map<String, Set> patternsOfInterest;
+ Map<String, Set<?>> patternsOfInterest;
- Map<String, Set> patternsOfInterestInv;
+ Map<String, Set<?>> patternsOfInterestInv;
- Map<String, Set> filtersOfInterest;
+ Map<String, Set<?>> filtersOfInterest;
- Map<String, Set> filtersOfInterestInv;
+ Map<String, Set<?>> filtersOfInterestInv;
}
- private final InterestMaps interestMaps[] =
+ private final InterestMaps[] interestMaps =
new InterestMaps[] {new InterestMaps(), new InterestMaps()};
/** index values for interestMaps[] */
@@ -3570,7 +3546,7 @@ public class InitialImageOperation {
* Collects all the filters registered by this client on regions.
*/
public void fillInFilterInfo() {
- LocalRegion haReg = this.haRegion;
+ LocalRegion haReg = haRegion;
if (haReg == null || haReg.getName() == null) {
throw new ReplyException("HARegion for the proxy is Null.");
@@ -3583,13 +3559,12 @@ public class InitialImageOperation {
return;
}
- CacheClientProxy clientProxy = null;
- ClientProxyMembershipID clientID =
+ final ClientProxyMembershipID clientID =
((HAContainerWrapper) ccn.getHaContainer()).getProxyID(haReg.getName());
if (clientID == null) {
throw new ReplyException("Client proxy ID not found for queue " + haReg.getName());
}
- clientProxy = ccn.getClientProxy(clientID);
+ final CacheClientProxy clientProxy = ccn.getClientProxy(clientID);
if (clientProxy == null) {
throw new ReplyException("Client proxy not found for queue " + haReg.getName());
}
@@ -3598,7 +3573,7 @@ public class InitialImageOperation {
logger.debug("Gathering interest information for {}", clientProxy);
}
- this.emptyRegionMap = clientProxy.getRegionsWithEmptyDataPolicy();
+ emptyRegionMap = clientProxy.getRegionsWithEmptyDataPolicy();
Set<String> regions = clientProxy.getInterestRegisteredRegions();
// Get Filter Info from all regions.
@@ -3627,9 +3602,9 @@ public class InitialImageOperation {
try {
List<ServerCQ> cqsList = cqService.getAllClientCqs(clientID);
if (!cqsList.isEmpty()) {
- this.cqs = new HashMap<String, ServerCQ>();
+ cqs = new HashMap<>();
for (ServerCQ cq : cqsList) {
- this.cqs.put(cq.getName(), cq);
+ cqs.put(cq.getName(), cq);
}
}
} catch (Exception ex) {
@@ -3648,73 +3623,73 @@ public class InitialImageOperation {
// Check if interested in all keys.
boolean all = pf.isInterestedInAllKeys(interestID);
if (all) {
- if (this.interestMaps[mapIndex].allKeys == null) {
- this.interestMaps[mapIndex].allKeys = new HashMap<String, String>();
+ if (interestMaps[mapIndex].allKeys == null) {
+ interestMaps[mapIndex].allKeys = new HashMap<>();
}
- this.interestMaps[mapIndex].allKeys.put(rName, ".*");
+ interestMaps[mapIndex].allKeys.put(rName, ".*");
}
// Check if interested in all keys, for which updates are sent as invalidates.
all = pf.isInterestedInAllKeysInv(interestID);
if (all) {
- if (this.interestMaps[mapIndex].allKeysInv == null) {
- this.interestMaps[mapIndex].allKeysInv = new HashMap<String, String>();
+ if (interestMaps[mapIndex].allKeysInv == null) {
+ interestMaps[mapIndex].allKeysInv = new HashMap<>();
}
- this.interestMaps[mapIndex].allKeysInv.put(rName, ".*");
+ interestMaps[mapIndex].allKeysInv.put(rName, ".*");
}
// Collect interest of type keys.
- Set keys = pf.getKeysOfInterest(interestID);
+ Set<?> keys = pf.getKeysOfInterest(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].keysOfInterest == null) {
- this.interestMaps[mapIndex].keysOfInterest = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].keysOfInterest == null) {
+ interestMaps[mapIndex].keysOfInterest = new HashMap<>();
}
- this.interestMaps[mapIndex].keysOfInterest.put(rName, keys);
+ interestMaps[mapIndex].keysOfInterest.put(rName, keys);
}
// Collect interest of type keys, for which updates are sent as invalidates.
keys = pf.getKeysOfInterestInv(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].keysOfInterestInv == null) {
- this.interestMaps[mapIndex].keysOfInterestInv = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].keysOfInterestInv == null) {
+ interestMaps[mapIndex].keysOfInterestInv = new HashMap<>();
}
- this.interestMaps[mapIndex].keysOfInterestInv.put(rName, keys);
+ interestMaps[mapIndex].keysOfInterestInv.put(rName, keys);
}
// Collect interest of type expression.
keys = pf.getPatternsOfInterest(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].patternsOfInterest == null) {
- this.interestMaps[mapIndex].patternsOfInterest = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].patternsOfInterest == null) {
+ interestMaps[mapIndex].patternsOfInterest = new HashMap<>();
}
- this.interestMaps[mapIndex].patternsOfInterest.put(rName, keys);
+ interestMaps[mapIndex].patternsOfInterest.put(rName, keys);
}
// Collect interest of type expression, for which updates are sent as invalidates.
keys = pf.getPatternsOfInterestInv(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].patternsOfInterestInv == null) {
- this.interestMaps[mapIndex].patternsOfInterestInv = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].patternsOfInterestInv == null) {
+ interestMaps[mapIndex].patternsOfInterestInv = new HashMap<>();
}
- this.interestMaps[mapIndex].patternsOfInterestInv.put(rName, keys);
+ interestMaps[mapIndex].patternsOfInterestInv.put(rName, keys);
}
// Collect interest of type filter.
keys = pf.getFiltersOfInterest(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].filtersOfInterest == null) {
- this.interestMaps[mapIndex].filtersOfInterest = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].filtersOfInterest == null) {
+ interestMaps[mapIndex].filtersOfInterest = new HashMap<>();
}
- this.interestMaps[mapIndex].filtersOfInterest.put(rName, keys);
+ interestMaps[mapIndex].filtersOfInterest.put(rName, keys);
}
// Collect interest of type filter, for which updates are sent as invalidates.
keys = pf.getFiltersOfInterestInv(interestID);
if (keys != null) {
- if (this.interestMaps[mapIndex].filtersOfInterestInv == null) {
- this.interestMaps[mapIndex].filtersOfInterestInv = new HashMap<String, Set>();
+ if (interestMaps[mapIndex].filtersOfInterestInv == null) {
+ interestMaps[mapIndex].filtersOfInterestInv = new HashMap<>();
}
- this.interestMaps[mapIndex].filtersOfInterestInv.put(rName, keys);
+ interestMaps[mapIndex].filtersOfInterestInv.put(rName, keys);
}
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
@@ -3759,12 +3734,12 @@ public class InitialImageOperation {
}
// Register CQs.
- if (this.cqs != null && !this.cqs.isEmpty()) {
+ if (cqs != null && !cqs.isEmpty()) {
try {
CqService cqService =
((DefaultQueryService) (region.getCache().getQueryService())).getCqService();
- for (Map.Entry<String, ServerCQ> e : this.cqs.entrySet()) {
+ for (Map.Entry<String, ServerCQ> e : cqs.entrySet()) {
ServerCQ cq = e.getValue();
try {
// Passing regionDataPolicy as -1, the actual value is
@@ -3772,7 +3747,7 @@ public class InitialImageOperation {
// found.
cqService.executeCq(e.getKey(), cq.getQueryString(),
((CqStateImpl) cq.getState()).getState(), proxy.getProxyID(), ccn, cq.isDurable(),
- true, -1, this.emptyRegionMap);
+ true, -1, emptyRegionMap);
} catch (Exception ex) {
logger.info("Failed to register CQ during HARegion GII. CQ: {} {}", e.getKey(),
ex.getMessage(), ex);
@@ -3787,13 +3762,13 @@ public class InitialImageOperation {
private void registerFilters(LocalRegion region, CacheClientProxy proxy, boolean durable) {
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
- Set<String> regionsWithInterest = new HashSet<String>();
+ Set<String> regionsWithInterest = new HashSet<>();
int mapIndex = durable ? DURABLE : NON_DURABLE;
// Register interest for all keys.
try {
- registerInterestKeys(this.interestMaps[mapIndex].allKeys, true, region, ccn, proxy, durable,
+ registerInterestKeys(interestMaps[mapIndex].allKeys, true, region, ccn, proxy, durable,
false, InterestType.REGULAR_EXPRESSION, regionsWithInterest);
} catch (Exception ex) {
logger.info("Failed to register interest of type keys during HARegion GII. {}",
@@ -3802,7 +3777,7 @@ public class InitialImageOperation {
// Register interest for all keys, for which updates are sent as invalidates.
try {
- registerInterestKeys(this.interestMaps[mapIndex].allKeysInv, true, region, ccn, proxy,
+ registerInterestKeys(interestMaps[mapIndex].allKeysInv, true, region, ccn, proxy,
durable, false, InterestType.REGULAR_EXPRESSION, regionsWithInterest);
} catch (Exception ex) {
logger.info("Failed to register interest of type keys during HARegion GII. {}",
@@ -3811,7 +3786,7 @@ public class InitialImageOperation {
// Register interest of type keys.
try {
- registerInterestKeys(this.interestMaps[mapIndex].keysOfInterest, false, region, ccn, proxy,
+ registerInterestKeys(interestMaps[mapIndex].keysOfInterest, false, region, ccn, proxy,
durable, false, InterestType.KEY, regionsWithInterest);
} catch (Exception ex) {
logger.info("Failed to register interest of type keys during HARegion GII. {}",
@@ -3820,7 +3795,7 @@ public class InitialImageOperation {
// Register interest of type keys, for which updates are sent as invalidates.
try {
- registerInterestKeys(this.interestMaps[mapIndex].keysOfInterestInv, false, region, ccn,
+ registerInterestKeys(interestMaps[mapIndex].keysOfInterestInv, false, region, ccn,
proxy, durable, true, InterestType.KEY, regionsWithInterest);
} catch (Exception ex) {
logger.info(
@@ -3830,7 +3805,7 @@ public class InitialImageOperation {
// Register interest of type expression.
try {
- registerInterestKeys(this.interestMaps[mapIndex].patternsOfInterest, false, region, ccn,
+ registerInterestKeys(interestMaps[mapIndex].patternsOfInterest, false, region, ccn,
proxy, durable, false, InterestType.REGULAR_EXPRESSION, regionsWithInterest);
} catch (Exception ex) {
logger.info("Failed to register interest of type expression during HARegion GII. {}",
@@ -3839,7 +3814,7 @@ public class InitialImageOperation {
// Register interest of type expression, for which updates are sent as invalidates.
try {
- registerInterestKeys(this.interestMaps[mapIndex].patternsOfInterestInv, false, region, ccn,
+ registerInterestKeys(interestMaps[mapIndex].patternsOfInterestInv, false, region, ccn,
proxy, durable, true, InterestType.REGULAR_EXPRESSION, regionsWithInterest);
} catch (Exception ex) {
logger.info(
@@ -3849,7 +3824,7 @@ public class InitialImageOperation {
// Register interest of type expression.
try {
- registerInterestKeys(this.interestMaps[mapIndex].filtersOfInterest, false, region, ccn,
+ registerInterestKeys(interestMaps[mapIndex].filtersOfInterest, false, region, ccn,
proxy, durable, false, InterestType.FILTER_CLASS, regionsWithInterest);
} catch (Exception ex) {
logger.info("Failed to register interest of type filter during HARegion GII. {}",
@@ -3858,7 +3833,7 @@ public class InitialImageOperation {
// Register interest of type expression, for which updates are sent as invalidates.
try {
- registerInterestKeys(this.interestMaps[mapIndex].filtersOfInterestInv, false, region, ccn,
+ registerInterestKeys(interestMaps[mapIndex].filtersOfInterestInv, false, region, ccn,
proxy, durable, true, InterestType.FILTER_CLASS, regionsWithInterest);
} catch (Exception ex) {
logger.info(
@@ -3877,7 +3852,7 @@ public class InitialImageOperation {
/**
* Helper method to register the filters.
*/
- private void registerInterestKeys(Map regionKeys, boolean allKey, LocalRegion region,
+ private void registerInterestKeys(Map<String, ?> regionKeys, boolean allKey, LocalRegion region,
CacheClientNotifier ccn, CacheClientProxy proxy, boolean isDurable,
boolean updatesAsInvalidates, int interestType, Set<String> regionsWithInterest)
throws IOException {
@@ -3885,30 +3860,28 @@ public class InitialImageOperation {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (regionKeys != null) {
- Iterator iter = regionKeys.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry e = (Map.Entry) iter.next();
- String regionName = (String) e.getKey();
+ for (final Map.Entry<String, ?> e : regionKeys.entrySet()) {
+ String regionName = e.getKey();
if (region.getCache().getRegion(regionName) == null) {
if (isDebugEnabled) {
logger.debug("Unable to register interests. Region not found :{}" + regionName);
}
} else {
boolean manageEmptyRegions = false;
- if (this.emptyRegionMap != null) {
- manageEmptyRegions = this.emptyRegionMap.containsKey(regionName);
+ if (emptyRegionMap != null) {
+ manageEmptyRegions = emptyRegionMap.containsKey(regionName);
}
regionsWithInterest.add(regionName);
if (allKey) {
ccn.registerClientInterest(regionName, e.getValue(), proxy.getProxyID(), interestType,
isDurable, updatesAsInvalidates, manageEmptyRegions, 0, false);
} else if (InterestType.REGULAR_EXPRESSION == interestType) {
- for (Iterator i = ((Set) e.getValue()).iterator(); i.hasNext();) {
- ccn.registerClientInterest(regionName, (String) i.next(), proxy.getProxyID(),
+ for (final Object o : asSet(e.getValue())) {
+ ccn.registerClientInterest(regionName, o, proxy.getProxyID(),
interestType, isDurable, updatesAsInvalidates, manageEmptyRegions, 0, false);
}
} else {
- ccn.registerClientInterest(regionName, new ArrayList((Set) e.getValue()),
+ ccn.registerClientInterest(regionName, new ArrayList<>(asSet(e.getValue())),
proxy.getProxyID(), isDurable, updatesAsInvalidates, manageEmptyRegions,
interestType, false);
}
@@ -3917,6 +3890,10 @@ public class InitialImageOperation {
}
}
+ private static Set<Object> asSet(final Object o) {
+ return uncheckedCast(o);
+ }
+
public static void send(DistributionManager dm, InternalDistributedMember dest, int processorId,
LocalRegion rgn, ReplyException ex) {
FilterInfoMessage msg = new FilterInfoMessage(dest, processorId, rgn);
@@ -3936,69 +3913,66 @@ public class InitialImageOperation {
public String toString() {
String descr = super.toString();
descr += "; NON_DURABLE allKeys="
- + (this.interestMaps[NON_DURABLE].allKeys != null
- ? this.interestMaps[NON_DURABLE].allKeys.size() : 0)
+ + (interestMaps[NON_DURABLE].allKeys != null
+ ? interestMaps[NON_DURABLE].allKeys.size() : 0)
+ "; allKeysInv="
- + (this.interestMaps[NON_DURABLE].allKeysInv != null
- ? this.interestMaps[NON_DURABLE].allKeysInv.size() : 0)
+ + (interestMaps[NON_DURABLE].allKeysInv != null
+ ? interestMaps[NON_DURABLE].allKeysInv.size() : 0)
+ "; keysOfInterest="
- + (this.interestMaps[NON_DURABLE].keysOfInterest != null
- ? this.interestMaps[NON_DURABLE].keysOfInterest.size() : 0)
+ + (interestMaps[NON_DURABLE].keysOfInterest != null
+ ? interestMaps[NON_DURABLE].keysOfInterest.size() : 0)
+ "; keysOfInterestInv="
- + (this.interestMaps[NON_DURABLE].keysOfInterestInv != null
- ? this.interestMaps[NON_DURABLE].keysOfInterestInv.size() : 0)
+ + (interestMaps[NON_DURABLE].keysOfInterestInv != null
+ ? interestMaps[NON_DURABLE].keysOfInterestInv.size() : 0)
+ "; patternsOfInterest="
- + (this.interestMaps[NON_DURABLE].patternsOfInterest != null
- ? this.interestMaps[NON_DURABLE].patternsOfInterest.size() : 0)
+ + (interestMaps[NON_DURABLE].patternsOfInterest != null
+ ? interestMaps[NON_DURABLE].patternsOfInterest.size() : 0)
+ "; patternsOfInterestInv="
- + (this.interestMaps[NON_DURABLE].patternsOfInterestInv != null
- ? this.interestMaps[NON_DURABLE].patternsOfInterestInv.size() : 0)
+ + (interestMaps[NON_DURABLE].patternsOfInterestInv != null
+ ? interestMaps[NON_DURABLE].patternsOfInterestInv.size() : 0)
+ "; filtersOfInterest="
- + (this.interestMaps[NON_DURABLE].filtersOfInterest != null
- ? this.interestMaps[NON_DURABLE].filtersOfInterest.size() : 0)
- + "; filtersOfInterestInv=" + (this.interestMaps[NON_DURABLE].filtersOfInterestInv != null
- ? this.interestMaps[NON_DURABLE].filtersOfInterestInv.size() : 0);
+ + (interestMaps[NON_DURABLE].filtersOfInterest != null
+ ? interestMaps[NON_DURABLE].filtersOfInterest.size() : 0)
+ + "; filtersOfInterestInv=" + (interestMaps[NON_DURABLE].filtersOfInterestInv != null
+ ? interestMaps[NON_DURABLE].filtersOfInterestInv.size() : 0);
descr += "; DURABLE allKeys="
- + (this.interestMaps[DURABLE].allKeys != null ? this.interestMaps[DURABLE].allKeys.size()
+ + (interestMaps[DURABLE].allKeys != null ? interestMaps[DURABLE].allKeys.size()
: 0)
+ "; allKeysInv="
- + (this.interestMaps[DURABLE].allKeysInv != null
- ? this.interestMaps[DURABLE].allKeysInv.size() : 0)
+ + (interestMaps[DURABLE].allKeysInv != null
+ ? interestMaps[DURABLE].allKeysInv.size() : 0)
+ "; keysOfInterest="
- + (this.interestMaps[DURABLE].keysOfInterest != null
- ? this.interestMaps[DURABLE].keysOfInterest.size() : 0)
+ + (interestMaps[DURABLE].keysOfInterest != null
+ ? interestMaps[DURABLE].keysOfInterest.size() : 0)
+ "; keysOfInterestInv="
- + (this.interestMaps[DURABLE].keysOfInterestInv != null
- ? this.interestMaps[DURABLE].keysOfInterestInv.size() : 0)
+ + (interestMaps[DURABLE].keysOfInterestInv != null
+ ? interestMaps[DURABLE].keysOfInterestInv.size() : 0)
+ "; patternsOfInterest="
- + (this.interestMaps[DURABLE].patternsOfInterest != null
- ? this.interestMaps[DURABLE].patternsOfInterest.size() : 0)
+ + (interestMaps[DURABLE].patternsOfInterest != null
+ ? interestMaps[DURABLE].patternsOfInterest.size() : 0)
+ "; patternsOfInterestInv="
- + (this.interestMaps[DURABLE].patternsOfInterestInv != null
- ? this.interestMaps[DURABLE].patternsOfInterestInv.size() : 0)
+ + (interestMaps[DURABLE].patternsOfInterestInv != null
+ ? interestMaps[DURABLE].patternsOfInterestInv.size() : 0)
+ "; filtersOfInterest="
- + (this.interestMaps[DURABLE].filtersOfInterest != null
- ? this.interestMaps[DURABLE].filtersOfInterest.size() : 0)
- + "; filtersOfInterestInv=" + (this.interestMaps[DURABLE].filtersOfInterestInv != null
- ? this.interestMaps[DURABLE].filtersOfInterestInv.size() : 0);
- descr += "; cqs=" + (this.cqs != null ? this.cqs.size() : 0);
+ + (interestMaps[DURABLE].filtersOfInterest != null
+ ? interestMaps[DURABLE].filtersOfInterest.size() : 0)
+ + "; filtersOfInterestInv=" + (interestMaps[DURABLE].filtersOfInterestInv != null
+ ? interestMaps[DURABLE].filtersOfInterestInv.size() : 0);
+ descr += "; cqs=" + (cqs != null ? cqs.size() : 0);
return descr;
}
public boolean isEmpty() {
- if (this.interestMaps[NON_DURABLE].keysOfInterest != null
- || this.interestMaps[NON_DURABLE].keysOfInterestInv != null
- || this.interestMaps[NON_DURABLE].patternsOfInterest != null
- || this.interestMaps[NON_DURABLE].patternsOfInterestInv != null
- || this.interestMaps[NON_DURABLE].filtersOfInterest != null
- || this.interestMaps[NON_DURABLE].filtersOfInterestInv != null
- || this.interestMaps[DURABLE].patternsOfInterest != null
- || this.interestMaps[DURABLE].patternsOfInterestInv != null
- || this.interestMaps[DURABLE].filtersOfInterest != null
- || this.interestMaps[DURABLE].filtersOfInterestInv != null || this.cqs != null) {
- return false;
- }
- return true;
+ return interestMaps[NON_DURABLE].keysOfInterest == null
+ && interestMaps[NON_DURABLE].keysOfInterestInv == null
+ && interestMaps[NON_DURABLE].patternsOfInterest == null
+ && interestMaps[NON_DURABLE].patternsOfInterestInv == null
+ && interestMaps[NON_DURABLE].filtersOfInterest == null
+ && interestMaps[NON_DURABLE].filtersOfInterestInv == null
+ && interestMaps[DURABLE].patternsOfInterest == null
+ && interestMaps[DURABLE].patternsOfInterestInv == null
+ && interestMaps[DURABLE].filtersOfInterest == null
+ && interestMaps[DURABLE].filtersOfInterestInv == null && cqs == null;
}
@Override
@@ -4006,30 +3980,30 @@ public class InitialImageOperation {
SerializationContext context) throws IOException {
super.toData(dop, context);
// DataSerializer.writeString(this.haRegion.getName(), dop);
- DataSerializer.writeHashMap((HashMap) this.emptyRegionMap, dop);
+ DataSerializer.writeHashMap(emptyRegionMap, dop);
// Write interest info.
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].allKeys, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].allKeysInv, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].keysOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].keysOfInterestInv, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].patternsOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].patternsOfInterestInv,
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].allKeys, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].allKeysInv, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].keysOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].keysOfInterestInv, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].patternsOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].patternsOfInterestInv,
dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].filtersOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[NON_DURABLE].filtersOfInterestInv,
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].filtersOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[NON_DURABLE].filtersOfInterestInv,
dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].allKeys, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].allKeysInv, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].keysOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].keysOfInterestInv, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].patternsOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].patternsOfInterestInv, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].filtersOfInterest, dop);
- DataSerializer.writeHashMap((HashMap) this.interestMaps[DURABLE].filtersOfInterestInv, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].allKeys, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].allKeysInv, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].keysOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].keysOfInterestInv, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].patternsOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].patternsOfInterestInv, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].filtersOfInterest, dop);
+ DataSerializer.writeHashMap(interestMaps[DURABLE].filtersOfInterestInv, dop);
// Write CQ info.
- DataSerializer.writeHashMap((HashMap) this.cqs, dop);
+ DataSerializer.writeHashMap(cqs, dop);
}
@Override
@@ -4037,28 +4011,28 @@ public class InitialImageOperation {
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(dip, context);
// String regionName = DataSerializer.readString(dip);
- this.emptyRegionMap = DataSerializer.readHashMap(dip);
+ emptyRegionMap = DataSerializer.readHashMap(dip);
// Read interest info.
- this.interestMaps[NON_DURABLE].allKeys = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].allKeysInv = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].keysOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].keysOfInterestInv = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].patternsOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].patternsOfInterestInv = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].filtersOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[NON_DURABLE].filtersOfInterestInv = DataSerializer.readHashMap(dip);
-
- this.interestMaps[DURABLE].allKeys = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].allKeysInv = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].keysOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].keysOfInterestInv = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].patternsOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].patternsOfInterestInv = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].filtersOfInterest = DataSerializer.readHashMap(dip);
- this.interestMaps[DURABLE].filtersOfInterestInv = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].allKeys = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].allKeysInv = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].keysOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].keysOfInterestInv = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].patternsOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].patternsOfInterestInv = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].filtersOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[NON_DURABLE].filtersOfInterestInv = DataSerializer.readHashMap(dip);
+
+ interestMaps[DURABLE].allKeys = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].allKeysInv = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].keysOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].keysOfInterestInv = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].patternsOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].patternsOfInterestInv = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].filtersOfInterest = DataSerializer.readHashMap(dip);
+ interestMaps[DURABLE].filtersOfInterestInv = DataSerializer.readHashMap(dip);
// read CQ info.
- this.cqs = DataSerializer.readHashMap(dip);
+ cqs = DataSerializer.readHashMap(dip);
}
/*
@@ -4080,15 +4054,15 @@ public class InitialImageOperation {
public GIITestHook(GIITestHookType type, String region_name) {
this.type = type;
this.region_name = region_name;
- this.isRunning = false;
+ isRunning = false;
}
public GIITestHookType getType() {
- return this.type;
+ return type;
}
public String getRegionName() {
- return this.region_name;
+ return region_name;
}
public String toString() {
@@ -4102,9 +4076,6 @@ public class InitialImageOperation {
}
- public static final boolean TRACE_GII =
- Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.TRACE_GII");
-
@MutableForTesting
public static boolean FORCE_FULL_GII =
Boolean.getBoolean(GeodeGlossary.GEMFIRE_PREFIX + "GetInitialImage.FORCE_FULL_GII");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractOplogDiskRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractOplogDiskRegionEntry.java
index 5ba295c..2122140 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractOplogDiskRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractOplogDiskRegionEntry.java
@@ -66,13 +66,13 @@ public abstract class AbstractOplogDiskRegionEntry extends AbstractDiskRegionEnt
@Override
public boolean fillInValue(InternalRegion region, Entry entry, ByteArrayDataInput in,
DistributionManager mgr, final KnownVersion version) {
- return Helper.fillInValue(this, entry, region.getDiskRegion(), mgr, in, region, version);
+ return Helper.fillInValue(this, entry, region.getDiskRegion(), region, version);
}
@Override
public boolean isOverflowedToDisk(InternalRegion region,
DistributedRegion.DiskPosition diskPosition) {
- return Helper.isOverflowedToDisk(this, region.getDiskRegion(), diskPosition, region);
+ return Helper.isOverflowedToDisk(this, region.getDiskRegion(), diskPosition);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
index 4aa3783..b67f154 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/AbstractRegionEntry.java
@@ -346,7 +346,7 @@ public abstract class AbstractRegionEntry implements HashRegionEntry<Object, Obj
}
}
- entry.setLastModified(mgr, getLastModified()); // fix for bug 31059
+ entry.setLastModified(getLastModified());
if (v == Token.INVALID) {
entry.setInvalid();
} else if (v == Token.LOCAL_INVALID) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
index c5c9078..5ede287 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/entries/DiskEntry.java
@@ -22,7 +22,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.DiskAccessException;
-import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.AbstractDiskRegion;
@@ -171,7 +170,6 @@ public interface DiskEntry extends RegionEntry {
* @param entry the entry used to identify the value to fetch
* @param dr the persistent storage from which to fetch the value
* @return either null, byte array, or CacheDeserializable
- * @since GemFire 57_hotfix
*/
public static Object getSerializedValueOnDisk(DiskEntry entry, DiskRegion dr) {
DiskId did = entry.getDiskId();
@@ -253,8 +251,7 @@ public interface DiskEntry extends RegionEntry {
}
static boolean isOverflowedToDisk(DiskEntry de, DiskRegion dr,
- DistributedRegion.DiskPosition dp, RegionEntryContext context) {
- Object v = null;
+ DistributedRegion.DiskPosition dp) {
DiskId did;
synchronized (de) {
did = de.getDiskId();
@@ -274,7 +271,7 @@ public interface DiskEntry extends RegionEntry {
did = de.getDiskId();
}
assert did != null;
- return isOverflowedToDisk(de, dr, dp, context);
+ return isOverflowedToDisk(de, dr, dp);
} else {
dp.setPosition(did.getOplogId(), did.getOffsetInOplog());
return true;
@@ -296,11 +293,11 @@ public interface DiskEntry extends RegionEntry {
* @since GemFire 3.2.1
*/
static boolean fillInValue(DiskEntry de, InitialImageOperation.Entry entry, DiskRegion dr,
- DistributionManager mgr, ByteArrayDataInput in, RegionEntryContext context,
+ RegionEntryContext context,
KnownVersion version) {
@Retained
@Released
- Object v = null;
+ Object value;
DiskId did;
synchronized (de) {
did = de.getDiskId();
@@ -314,31 +311,30 @@ public interface DiskEntry extends RegionEntry {
}
try {
synchronized (syncObj) {
- entry.setLastModified(mgr, de.getLastModified());
+ entry.setLastModified(de.getLastModified());
ReferenceCountHelper.setReferenceCountOwner(entry);
// OFFHEAP copied to heap entry;
// TODO: allow entry to refer to offheap since it will be copied to network.
- v = de.getValueRetain(context, true);
+ value = de.getValueRetain(context, true);
ReferenceCountHelper.setReferenceCountOwner(null);
- if (v == null) {
+ if (value == null) {
if (did == null) {
- // fix for bug 41449
synchronized (de) {
did = de.getDiskId();
}
assert did != null;
// do recursive call to get readLock on did
- return fillInValue(de, entry, dr, mgr, in, context, version);
+ return fillInValue(de, entry, dr, context, version);
}
if (logger.isDebugEnabled()) {
logger.debug(
"DiskEntry.Helper.fillInValue, key={}; getting value from disk, disk id={}",
entry.getKey(), did);
}
- BytesAndBits bb = null;
+ final BytesAndBits bb;
try {
bb = dr.getBytesAndBits(did, false);
} catch (DiskAccessException ignore) {
@@ -362,11 +358,10 @@ public interface DiskEntry extends RegionEntry {
dr.releaseReadLock();
}
}
- if (Token.isRemovedFromDisk(v)) {
- // fix for bug 31757
+ if (Token.isRemovedFromDisk(value)) {
return false;
- } else if (v instanceof CachedDeserializable) {
- CachedDeserializable cd = (CachedDeserializable) v;
+ } else if (value instanceof CachedDeserializable) {
+ CachedDeserializable cd = (CachedDeserializable) value;
try {
if (!cd.isSerialized()) {
entry.setSerialized(false);
@@ -377,7 +372,7 @@ public interface DiskEntry extends RegionEntry {
Object tmp = cd.getValue();
if (tmp instanceof byte[]) {
- entry.setValue((byte[]) tmp);
+ entry.setValue(tmp);
entry.setSerialized(true);
} else {
try {
@@ -397,22 +392,21 @@ public interface DiskEntry extends RegionEntry {
// If v == entry.value then v is assumed to be an OffHeapByteSource
// and release() will be called on v after the bytes have been read from
// off-heap.
- if (v != entry.getValue()) {
- OffHeapHelper.releaseWithNoTracking(v);
+ if (value != entry.getValue()) {
+ OffHeapHelper.releaseWithNoTracking(value);
}
}
- } else if (v instanceof byte[]) {
- entry.setValue(v);
+ } else if (value instanceof byte[]) {
+ entry.setValue(value);
entry.setSerialized(false);
- } else if (v == Token.INVALID) {
+ } else if (value == Token.INVALID) {
entry.setInvalid();
- } else if (v == Token.LOCAL_INVALID) {
- // fix for bug 31107
+ } else if (value == Token.LOCAL_INVALID) {
entry.setLocalInvalid();
- } else if (v == Token.TOMBSTONE) {
+ } else if (value == Token.TOMBSTONE) {
entry.setTombstone();
} else {
- Object preparedValue = v;
+ Object preparedValue = value;
if (preparedValue != null) {
preparedValue = AbstractRegionEntry.prepareValueForGII(preparedValue);
if (preparedValue == null) {
@@ -427,10 +421,7 @@ public interface DiskEntry extends RegionEntry {
entry.setValue(hdos);
entry.setSerialized(true);
} catch (IOException e) {
- RuntimeException e2 = new IllegalArgumentException(
- "An IOException was thrown while serializing.");
- e2.initCause(e);
- throw e2;
+ throw new IllegalArgumentException("An IOException was thrown while serializing.", e);
}
}
}
@@ -518,12 +509,12 @@ public interface DiskEntry extends RegionEntry {
@Override
public boolean isSerialized() {
- return this.isSerializedObject;
+ return isSerializedObject;
}
@Override
public int getLength() {
- return (this.bytes != null) ? this.bytes.length : 0;
+ return (bytes != null) ? bytes.length : 0;
}
private boolean isInvalidToken() {
@@ -549,9 +540,9 @@ public interface DiskEntry extends RegionEntry {
} else if (isLocalInvalidToken()) {
userBits = EntryBits.setLocalInvalid(userBits, true);
} else {
- if (this.bytes == null) {
+ if (bytes == null) {
throw new IllegalStateException("userBits==1 and value is null");
- } else if (this.bytes.length == 0) {
+ } else if (bytes.length == 0) {
throw new IllegalStateException("userBits==1 and value is zero length");
}
userBits = EntryBits.setSerialized(userBits, true);
@@ -571,7 +562,7 @@ public interface DiskEntry extends RegionEntry {
needsFlush = true;
bytesThisTime = bb.remaining();
}
- bb.put(this.bytes, offset, bytesThisTime);
+ bb.put(bytes, offset, bytesThisTime);
offset += bytesThisTime;
if (needsFlush) {
flushable.flush();
@@ -581,13 +572,13 @@ public interface DiskEntry extends RegionEntry {
@Override
public String getBytesAsString() {
- if (this.bytes == null) {
+ if (bytes == null) {
return "null";
}
StringBuilder sb = new StringBuilder();
int len = getLength();
for (int i = 0; i < len; i++) {
- sb.append(this.bytes[i]).append(", ");
+ sb.append(bytes[i]).append(", ");
}
return sb.toString();
}
@@ -614,7 +605,7 @@ public interface DiskEntry extends RegionEntry {
@Override
public int getLength() {
- return this.length;
+ return length;
}
@Override
@@ -636,17 +627,17 @@ public interface DiskEntry extends RegionEntry {
public OffHeapValueWrapper(StoredObject so) {
assert so.hasRefCount();
assert !so.isCompressed();
- this.offHeapData = so;
+ offHeapData = so;
}
@Override
public boolean isSerialized() {
- return this.offHeapData.isSerialized();
+ return offHeapData.isSerialized();
}
@Override
public int getLength() {
- return this.offHeapData.getDataSize();
+ return offHeapData.getDataSize();
}
@Override
@@ -665,7 +656,7 @@ public interface DiskEntry extends RegionEntry {
return;
}
if (maxOffset > bb.capacity()) {
- ByteBuffer chunkbb = this.offHeapData.createDirectByteBuffer();
+ ByteBuffer chunkbb = offHeapData.createDirectByteBuffer();
if (chunkbb != null) {
flushable.flush(bb, chunkbb);
return;
@@ -676,7 +667,7 @@ public interface DiskEntry extends RegionEntry {
int bytesRemaining = maxOffset;
int availableSpace = bb.remaining();
long addrToWrite = bbAddress + bb.position();
- long addrToRead = this.offHeapData.getAddressForReadingData(0, maxOffset);
+ long addrToRead = offHeapData.getAddressForReadingData(0, maxOffset);
if (bytesRemaining > availableSpace) {
do {
AddressableMemoryManager.copyMemory(addrToRead, addrToWrite, availableSpace);
@@ -691,7 +682,7 @@ public interface DiskEntry extends RegionEntry {
AddressableMemoryManager.copyMemory(addrToRead, addrToWrite, bytesRemaining);
bb.position(bb.position() + bytesRemaining);
} else {
- long addr = this.offHeapData.getAddressForReadingData(0, maxOffset);
+ long addr = offHeapData.getAddressForReadingData(0, maxOffset);
final long endAddr = addr + maxOffset;
while (addr != endAddr) {
bb.put(AddressableMemoryManager.readByte(addr));
@@ -705,7 +696,7 @@ public interface DiskEntry extends RegionEntry {
@Override
public String getBytesAsString() {
- return this.offHeapData.getStringForm();
+ return offHeapData.getStringForm();
}
}
@@ -716,11 +707,7 @@ public interface DiskEntry extends RegionEntry {
*/
public static boolean wrapOffHeapReference(Object o) {
if (o instanceof StoredObject) {
- StoredObject so = (StoredObject) o;
- if (so.hasRefCount()) {
- //
- return true;
- }
+ return ((StoredObject) o).hasRefCount();
}
return false;
}
@@ -787,12 +774,9 @@ public interface DiskEntry extends RegionEntry {
// Since NIO is used if the chunk of memory is large we can write it
// to the file using the off-heap memory with no extra copying.
// So we give preference to getRawNewValue over getCachedSerializedNewValue
- Object rawValue = null;
- {
- rawValue = event.getRawNewValue();
- if (wrapOffHeapReference(rawValue)) {
- return new OffHeapValueWrapper((StoredObject) rawValue);
- }
+ final Object rawValue = event.getRawNewValue();
+ if (wrapOffHeapReference(rawValue)) {
+ return new OffHeapValueWrapper((StoredObject) rawValue);
}
if (event.getCachedSerializedNewValue() != null) {
return new ByteArrayValueWrapper(true, event.getCachedSerializedNewValue());
@@ -802,7 +786,7 @@ public interface DiskEntry extends RegionEntry {
}
}
@Retained
- Object value = entry.getValueRetain(region, true);
+ final Object value = entry.getValueRetain(region, true);
try {
return createValueWrapper(value, event);
} finally {
@@ -851,7 +835,6 @@ public interface DiskEntry extends RegionEntry {
boolean basicUpdateCalled = false;
try {
- AsyncDiskEntry asyncDiskEntry = null;
DiskRegion dr = region.getDiskRegion();
DiskId did = entry.getDiskId();
Object syncObj = did;
@@ -861,6 +844,8 @@ public interface DiskEntry extends RegionEntry {
if (syncObj == did) {
dr.acquireReadLock();
}
+
+ final AsyncDiskEntry asyncDiskEntry;
try {
synchronized (syncObj) {
basicUpdateCalled = true;
@@ -897,7 +882,7 @@ public interface DiskEntry extends RegionEntry {
oldValue = entry.getValueAsToken();
if (Token.isRemovedFromDisk(newValue)) {
if (dr.isBackup()) {
- dr.testIsRecoveredAndClear(did); // fixes bug 41409
+ dr.testIsRecoveredAndClear(did);
}
boolean caughtCacheClosed = false;
try {
@@ -908,9 +893,8 @@ public interface DiskEntry extends RegionEntry {
caughtCacheClosed = true;
throw e;
} finally {
- if (caughtCacheClosed) {
- // 47616: not to set the value to be removedFromDisk since it failed to persist
- } else {
+ // do not to set the value to be removedFromDisk since it failed to persist
+ if (!caughtCacheClosed) {
// Ensure that the value is rightly set despite clear so
// that it can be distributed correctly
entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already
@@ -924,9 +908,6 @@ public interface DiskEntry extends RegionEntry {
try {
// The new value in the entry needs to be set after the disk writing
// has succeeded.
-
- // entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-
if (did != null && did.isPendingAsync()) {
// if the entry was not yet written to disk, we didn't update
// the bytes on disk.
@@ -936,7 +917,7 @@ public interface DiskEntry extends RegionEntry {
}
if (dr.isBackup()) {
- dr.testIsRecoveredAndClear(did); // fixes bug 41409
+ dr.testIsRecoveredAndClear(did);
if (doSynchronousWrite(region, dr)) {
if (AbstractRegionEntry.isCompressible(dr, newValue)) {
// In case of compression the value is being set first
@@ -985,7 +966,6 @@ public interface DiskEntry extends RegionEntry {
// The disk remove that this section used to do caused bug 30961
// @todo this seems wrong. How does leaving it on disk fix the bug?
did.markForWriting();
- // did.setValueSerializedSize(0);
} else {
newValueStoredInEntry = true;
entry.setValueWithContext(region, newValue);
@@ -1101,7 +1081,7 @@ public interface DiskEntry extends RegionEntry {
did.setPendingAsync(false);
}
}
- lruEntryFaultIn((EvictableEntry) entry, (DiskRecoveryStore) region);
+ lruEntryFaultIn((EvictableEntry) entry, region);
lruFaultedIn = true;
}
}
@@ -1111,10 +1091,10 @@ public interface DiskEntry extends RegionEntry {
v = entry.getValueRetain(region, true);
if (v == null) {
- v = readValueFromDisk(entry, (DiskRecoveryStore) region);
+ v = readValueFromDisk(entry, region);
if (entry instanceof EvictableEntry) {
if (v != null && !Token.isInvalid(v)) {
- lruEntryFaultIn((EvictableEntry) entry, (DiskRecoveryStore) region);
+ lruEntryFaultIn((EvictableEntry) entry, region);
lruFaultedIn = true;
}
@@ -1135,7 +1115,7 @@ public interface DiskEntry extends RegionEntry {
entry.setRecentlyUsed(region);
}
if (lruFaultedIn) {
- lruUpdateCallback((DiskRecoveryStore) region);
+ lruUpdateCallback(region);
}
return v; // OFFHEAP: the value ends up being returned by RegionEntry.getValue
}
@@ -1200,7 +1180,7 @@ public interface DiskEntry extends RegionEntry {
} else if (EntryBits.isSerialized(bb.getBits())) {
value = readSerializedValue(bb.getBytes(), bb.getVersion(), in, false, cache);
} else {
- value = readRawValue(bb.getBytes(), bb.getVersion(), in);
+ value = readRawValue(bb.getBytes());
}
}
}
@@ -1228,9 +1208,9 @@ public interface DiskEntry extends RegionEntry {
}
private static void lruEntryFaultIn(EvictableEntry entry, DiskRecoveryStore recoveryStore) {
- RegionMap rm = (RegionMap) recoveryStore.getRegionMap();
+ RegionMap rm = recoveryStore.getRegionMap();
try {
- rm.lruEntryFaultIn((EvictableEntry) entry);
+ rm.lruEntryFaultIn(entry);
} catch (DiskAccessException dae) {
recoveryStore.handleDiskAccessException(dae);
throw dae;
@@ -1273,8 +1253,6 @@ public interface DiskEntry extends RegionEntry {
@Unretained
private static Object setValueOnFaultIn(Object value, DiskId did, DiskEntry entry,
DiskRegionView dr, DiskRecoveryStore region) {
- // dr.getOwner().getCache().getLogger().info("DEBUG: faulting in entry with key " +
- // entry.getKey());
int bytesOnDisk = getValueLength(did);
// Retained by the prepareValueForCache call for the region entry.
// NOTE that we return this value unretained because the retain is owned by the region entry
@@ -1283,10 +1261,7 @@ public interface DiskEntry extends RegionEntry {
Object preparedValue = entry.prepareValueForCache((RegionEntryContext) region, value, false);
region.updateSizeOnFaultIn(entry.getKey(), region.calculateValueSize(preparedValue),
bytesOnDisk);
- // did.setValueSerializedSize(0);
- // I think the following assertion is true but need to run
- // a regression with it. Reenable this post 6.5
- // Assert.assertTrue(entry._getValue() == null);
+
entry.setValueWithContext((RegionEntryContext) region, preparedValue);
if (!Token.isInvalidOrRemoved(value)) {
updateStats(dr, region, 1/* InVM */, -1/* OnDisk */, -bytesOnDisk);
@@ -1300,15 +1275,14 @@ public interface DiskEntry extends RegionEntry {
// deserialize checking for product version change
return EntryEventImpl.deserialize(valueBytes, version, in);
} else {
- // TODO: upgrades: is there a case where GemFire values are internal
+ // TODO: upgrades: is there a case where Geode values are internal
// ones that need to be upgraded transparently; probably messages
// being persisted (gateway events?)
return CachedDeserializableFactory.create(valueBytes, cache);
}
}
- public static Object readRawValue(byte[] valueBytes, KnownVersion version,
- ByteArrayDataInput in) {
+ public static Object readRawValue(byte[] valueBytes) {
return valueBytes;
}
@@ -1352,11 +1326,11 @@ public interface DiskEntry extends RegionEntry {
// Get diskID . If it is null, it implies it is overflow only mode.
DiskId did = entry.getDiskId();
if (did == null) {
- ((EvictableEntry) entry).setDelayedDiskId((DiskRecoveryStore) region);
+ ((EvictableEntry) entry).setDelayedDiskId(region);
did = entry.getDiskId();
}
- int change = 0;
+ final int change;
boolean scheduledAsyncHere = false;
dr.acquireReadLock();
try {
@@ -1438,8 +1412,6 @@ public interface DiskEntry extends RegionEntry {
/**
* Flush an entry that was previously scheduled to be written to disk.
- *
- * @since GemFire prPersistSprint1
*/
public static void doAsyncFlush(DiskEntry entry, InternalRegion region, VersionTag tag) {
writeEntryToDisk(entry, region, tag, false);
@@ -1556,7 +1528,7 @@ public interface DiskEntry extends RegionEntry {
if (did == null) {
syncObj = entry;
}
- AsyncDiskEntry asyncDiskEntry = null;
+ final AsyncDiskEntry asyncDiskEntry;
if (syncObj == did) {
dr.acquireReadLock();
}
@@ -1705,7 +1677,7 @@ public interface DiskEntry extends RegionEntry {
public RecoveredEntry(long keyId, long oplogId, long offsetInOplog, byte userBits,
int valueLength, Object value, boolean valueRecovered) {
- this.recoveredKeyId = keyId;
+ recoveredKeyId = keyId;
this.value = value;
this.oplogId = oplogId;
this.offsetInOplog = offsetInOplog;
@@ -1718,7 +1690,7 @@ public interface DiskEntry extends RegionEntry {
* Returns the disk id of the entry being recovered
*/
public long getRecoveredKeyId() {
- return this.recoveredKeyId;
+ return recoveredKeyId;
}
/**
@@ -1726,7 +1698,7 @@ public interface DiskEntry extends RegionEntry {
* not been faulted in and this method will return null.
*/
public Object getValue() {
- return this.value;
+ return value;
}
/**
@@ -1735,11 +1707,11 @@ public interface DiskEntry extends RegionEntry {
* is false . In other cases the exact value is not needed
*/
public byte getUserBits() {
- return this.userBits;
+ return userBits;
}
public int getValueLength() {
- return this.valueLength;
+ return valueLength;
}
public long getOffsetInOplog() {
@@ -1747,19 +1719,19 @@ public interface DiskEntry extends RegionEntry {
}
public long getOplogId() {
- return this.oplogId;
+ return oplogId;
}
public void setOplogId(long v) {
- this.oplogId = v;
+ oplogId = v;
}
public boolean getValueRecovered() {
- return this.valueRecovered;
+ return valueRecovered;
}
public VersionTag getVersionTag() {
- return this.tag;
+ return tag;
}
public void setVersionTag(VersionTag tag) {
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
index addf464..f1f8ced 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java
@@ -2626,7 +2626,7 @@ public class CqQueryUsingPoolDUnitTest extends JUnit4CacheTestCase {
}
while (true) {
- if (InitialImageOperation.slowImageSleeps > 0) {
+ if (InitialImageOperation.slowImageSleeps.get() > 0) {
// Create events while GII for HARegion is in progress.
LocalRegion region1 = (LocalRegion) getRootRegion().getSubregion(regions[0]);
for (int i = 90; i <= 120; i++) {