You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/11 21:07:02 UTC
[23/52] [abbrv] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java
index 06c4679..9a403da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionHelper.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.IOException;
@@ -47,7 +46,6 @@ import org.apache.geode.cache.Scope;
import org.apache.geode.cache.partition.PartitionNotAvailableException;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.CacheWriterAdapter;
-import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DM;
@@ -63,21 +61,12 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-/**
- */
public class PartitionedRegionHelper {
private static final Logger logger = LogService.getLogger();
- // ///////////// All the final variable //////////////////
/** 1 MB */
static final long BYTES_PER_MB = 1024 * 1024;
- /** Name of allPartitionedRegions Region * */
- // static final String PARTITIONED_REGION_CONFIG_NAME = "__Config";
-
- /** Prefix for the bucket2Node Region name defined in the global space. */
- // static final String BUCKET_2_NODE_TABLE_PREFIX = "_B2N_";
-
/**
* The administrative region used for storing Partitioned Region meta data sub regions *
*/
@@ -121,8 +110,6 @@ public class PartitionedRegionHelper {
ALLOWED_DATA_POLICIES = Collections.unmodifiableSet(policies);
}
-
-
/**
* This function is used for cleaning the config meta data for the failed or closed
* PartitionedRegion node.
@@ -132,7 +119,7 @@ public class PartitionedRegionHelper {
* @param cache GemFire cache.
*/
static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
- GemFireCacheImpl cache) {
+ InternalCache cache) {
removeGlobalMetadataForFailedNode(failedNode, regionIdentifier, cache, true);
}
@@ -146,13 +133,11 @@ public class PartitionedRegionHelper {
* @param lock True if this removal should acquire and release the RegionLock
*/
static void removeGlobalMetadataForFailedNode(Node failedNode, String regionIdentifier,
- GemFireCacheImpl cache, final boolean lock) {
+ InternalCache cache, final boolean lock) {
Region root = PartitionedRegionHelper.getPRRoot(cache, false);
if (root == null) {
return; // no partitioned region info to clean up
}
- // Region allPartitionedRegions = PartitionedRegionHelper.getPRConfigRegion(
- // root, cache);
PartitionRegionConfig prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
if (null == prConfig || !prConfig.containsNode(failedNode)) {
return;
@@ -163,9 +148,6 @@ public class PartitionedRegionHelper {
try {
if (lock) {
rl.lock();
- // if (!rl.lock()) {
- // return;
- // }
}
prConfig = (PartitionRegionConfig) root.get(regionIdentifier);
if (prConfig != null && prConfig.containsNode(failedNode)) {
@@ -204,7 +186,7 @@ public class PartitionedRegionHelper {
/**
* Return a region that is the root for all Partitioned Region metadata on this node
*/
- public static LocalRegion getPRRoot(final Cache cache) {
+ public static LocalRegion getPRRoot(final InternalCache cache) {
return getPRRoot(cache, true);
}
@@ -215,9 +197,8 @@ public class PartitionedRegionHelper {
*
* @return a GLOBLAL scoped root region used for PartitionedRegion administration
*/
- public static LocalRegion getPRRoot(final Cache cache, boolean createIfAbsent) {
- GemFireCacheImpl gemCache = (GemFireCacheImpl) cache;
- DistributedRegion root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true);
+ public static LocalRegion getPRRoot(final InternalCache cache, boolean createIfAbsent) {
+ DistributedRegion root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
if (root == null) {
if (!createIfAbsent) {
return null;
@@ -287,13 +268,13 @@ public class PartitionedRegionHelper {
};
try {
- root = (DistributedRegion) gemCache.createVMRegion(PR_ROOT_REGION_NAME, ra,
+ root = (DistributedRegion) cache.createVMRegion(PR_ROOT_REGION_NAME, ra,
new InternalRegionArguments().setIsUsedForPartitionedRegionAdmin(true)
.setInternalRegion(true).setCachePerfStatsHolder(prMetaStatsHolder));
root.getDistributionAdvisor().addMembershipListener(new MemberFailureListener());
- } catch (RegionExistsException silly) {
+ } catch (RegionExistsException ignore) {
// we avoid this before hand, but yet we have to catch it
- root = (DistributedRegion) gemCache.getRegion(PR_ROOT_REGION_NAME, true);
+ root = (DistributedRegion) cache.getRegion(PR_ROOT_REGION_NAME, true);
} catch (IOException ieo) {
Assert.assertTrue(false, "IOException creating Partitioned Region root: " + ieo);
} catch (ClassNotFoundException cne) {
@@ -326,7 +307,7 @@ public class PartitionedRegionHelper {
*/
public static void cleanUpMetaDataOnNodeFailure(DistributedMember failedMemId) {
try {
- final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ final InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null || cache.getCancelCriterion().isCancelInProgress()) {
return;
}
@@ -343,13 +324,13 @@ public class PartitionedRegionHelper {
final ArrayList<String> ks = new ArrayList<String>(rootReg.keySet());
if (ks.size() > 1) {
- Collections.shuffle(ks, PartitionedRegion.rand);
+ Collections.shuffle(ks, PartitionedRegion.RANDOM);
}
for (String prName : ks) {
try {
cleanUpMetaDataForRegion(cache, prName, failedMemId, null);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// okay to ignore this - metadata will be cleaned up by cache close operation
} catch (Exception e) {
if (logger.isDebugEnabled()) {
@@ -357,12 +338,12 @@ public class PartitionedRegionHelper {
}
}
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
// ignore
}
}
- public static void cleanUpMetaDataForRegion(final GemFireCacheImpl cache, final String prName,
+ public static void cleanUpMetaDataForRegion(final InternalCache cache, final String prName,
final DistributedMember failedMemId, final Runnable postCleanupTask) {
boolean runPostCleanUp = true;
try {
@@ -373,7 +354,7 @@ public class PartitionedRegionHelper {
}
try {
prConf = (PartitionRegionConfig) rootReg.get(prName);
- } catch (EntryDestroyedException ede) {
+ } catch (EntryDestroyedException ignore) {
return;
}
if (prConf == null) {
@@ -419,7 +400,7 @@ public class PartitionedRegionHelper {
* This is a function for cleaning the config meta data (both the configuration data and the
* buckets) for a Node that hosted a PartitionedRegion
*/
- private static void cleanPartitionedRegionMetaDataForNode(GemFireCacheImpl cache, Node node,
+ private static void cleanPartitionedRegionMetaDataForNode(InternalCache cache, Node node,
PartitionRegionConfig prConf, String regionIdentifier) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -692,7 +673,6 @@ public class PartitionedRegionHelper {
/**
* Find a ProxyBucketRegion by parsing the region fullPath
*
- * @param cache
* @param fullPath full region path to parse
* @param postInit true if caller should wait for bucket initialization to complete
* @return ProxyBucketRegion as Bucket or null if not found
@@ -781,15 +761,15 @@ public class PartitionedRegionHelper {
public static String escapePRPath(String prFullPath) {
String escaped = prFullPath.replace("_", "__");
- escaped = escaped.replace(LocalRegion.SEPARATOR_CHAR, '_');
+ escaped = escaped.replace(Region.SEPARATOR_CHAR, '_');
return escaped;
}
- public static String TWO_SEPARATORS = LocalRegion.SEPARATOR + LocalRegion.SEPARATOR;
+ public static String TWO_SEPARATORS = Region.SEPARATOR + Region.SEPARATOR;
public static String unescapePRPath(String escapedPath) {
- String path = escapedPath.replace('_', LocalRegion.SEPARATOR_CHAR);
+ String path = escapedPath.replace('_', Region.SEPARATOR_CHAR);
path = path.replace(TWO_SEPARATORS, "_");
return path;
}
@@ -842,33 +822,9 @@ public class PartitionedRegionHelper {
}
/**
- * This method returns true if the member is found in the membership list of this member, else
- * false.
- *
- * @param mem
- * @param cache
- * @return true if mem is found in membership list of this member.
- */
- public static boolean isMemberAlive(DistributedMember mem, GemFireCacheImpl cache) {
- return getMembershipSet(cache).contains(mem);
- }
-
- /**
- * Returns the current membership Set for this member.
- *
- * @param cache
- * @return membership Set.
- */
- public static Set getMembershipSet(GemFireCacheImpl cache) {
- return cache.getInternalDistributedSystem().getDistributionManager()
- .getDistributionManagerIds();
- }
-
- /**
* Utility method to print warning when nodeList in b2n region is found empty. This will signify
* potential data loss scenario.
*
- * @param partitionedRegion
* @param bucketId Id of Bucket whose nodeList in b2n is empty.
* @param callingMethod methodName of the calling method.
*/
@@ -888,7 +844,7 @@ public class PartitionedRegionHelper {
Set members = partitionedRegion.getDistributionManager().getDistributionManagerIds();
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____SIZE_OF_NODELIST_AFTER_VERIFYBUCKETNODES_FOR_BUKID___1__IS_0,
- new Object[] {callingMethod, Integer.valueOf(bucketId)}));
+ new Object[] {callingMethod, bucketId}));
logger.warn(LocalizedMessage.create(
LocalizedStrings.PartitionedRegionHelper_DATALOSS___0____NODELIST_FROM_PRCONFIG___1,
new Object[] {callingMethod, printCollection(prConfig.getNodes())}));
@@ -900,12 +856,11 @@ public class PartitionedRegionHelper {
/**
* Utility method to print a collection.
*
- * @param c
* @return String
*/
public static String printCollection(Collection c) {
if (c != null) {
- StringBuffer sb = new StringBuffer("[");
+ StringBuilder sb = new StringBuilder("[");
Iterator itr = c.iterator();
while (itr.hasNext()) {
sb.append(itr.next());
@@ -920,42 +875,6 @@ public class PartitionedRegionHelper {
}
}
- /**
- * Destroys and removes the distributed lock service. This is called from cache closure operation.
- *
- * @see PartitionedRegion#afterRegionsClosedByCacheClose(GemFireCacheImpl)
- */
- static void destroyLockService() {
- DistributedLockService dls = null;
- synchronized (dlockMonitor) {
- dls = DistributedLockService.getServiceNamed(PARTITION_LOCK_SERVICE_NAME);
- }
- if (dls != null) {
- try {
- DistributedLockService.destroy(PARTITION_LOCK_SERVICE_NAME);
- } catch (IllegalArgumentException ex) {
- // Our dlockService is already destroyed,
- // probably by another thread - ignore
- }
- }
- }
-
- public static boolean isBucketPrimary(Bucket buk) {
- return buk.getBucketAdvisor().isPrimary();
- }
-
- public static boolean isRemotePrimaryAvailable(PartitionedRegion region,
- FixedPartitionAttributesImpl fpa) {
- List<FixedPartitionAttributesImpl> fpaList = region.getRegionAdvisor().adviseSameFPAs(fpa);
-
- for (FixedPartitionAttributes remotefpa : fpaList) {
- if (remotefpa.isPrimary()) {
- return true;
- }
- }
- return false;
- }
-
public static FixedPartitionAttributesImpl getFixedPartitionAttributesForBucket(
PartitionedRegion pr, int bucketId) {
List<FixedPartitionAttributesImpl> localFPAs = pr.getFixedPartitionAttributesImpl();
@@ -975,7 +894,7 @@ public class PartitionedRegionHelper {
return fpa;
}
}
- Object[] prms = new Object[] {pr.getName(), Integer.valueOf(bucketId)};
+ Object[] prms = new Object[] {pr.getName(), bucketId};
throw new PartitionNotAvailableException(
LocalizedStrings.PartitionedRegionHelper_FOR_FIXED_PARTITIONED_REGION_0_FIXED_PARTITION_IS_NOT_AVAILABLE_FOR_BUCKET_1_ON_ANY_DATASTORE
.toLocalizedString(prms));
@@ -1028,42 +947,41 @@ public class PartitionedRegionHelper {
List<InternalDistributedMember> remaining) {}
}
-}
-
-class FixedPartitionAttributesListener extends CacheListenerAdapter {
- private static final Logger logger = LogService.getLogger();
+ static class FixedPartitionAttributesListener extends CacheListenerAdapter {
+ private static final Logger logger = LogService.getLogger();
- public void afterCreate(EntryEvent event) {
- PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
- if (!prConfig.getElderFPAs().isEmpty()) {
- updatePartitionMap(prConfig);
+ public void afterCreate(EntryEvent event) {
+ PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
+ if (!prConfig.getElderFPAs().isEmpty()) {
+ updatePartitionMap(prConfig);
+ }
}
- }
- public void afterUpdate(EntryEvent event) {
- PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
- if (!prConfig.getElderFPAs().isEmpty()) {
- updatePartitionMap(prConfig);
+ public void afterUpdate(EntryEvent event) {
+ PartitionRegionConfig prConfig = (PartitionRegionConfig) event.getNewValue();
+ if (!prConfig.getElderFPAs().isEmpty()) {
+ updatePartitionMap(prConfig);
+ }
}
- }
- private void updatePartitionMap(PartitionRegionConfig prConfig) {
- int prId = prConfig.getPRId();
- PartitionedRegion pr = null;
+ private void updatePartitionMap(PartitionRegionConfig prConfig) {
+ int prId = prConfig.getPRId();
+ PartitionedRegion pr = null;
- try {
- pr = PartitionedRegion.getPRFromId(prId);
- if (pr != null) {
- Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
- for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) {
- partitionMap.put(fxPrAttr.getPartitionName(),
- new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()});
+ try {
+ pr = PartitionedRegion.getPRFromId(prId);
+ if (pr != null) {
+ Map<String, Integer[]> partitionMap = pr.getPartitionsMap();
+ for (FixedPartitionAttributesImpl fxPrAttr : prConfig.getElderFPAs()) {
+ partitionMap.put(fxPrAttr.getPartitionName(),
+ new Integer[] {fxPrAttr.getStartingBucketID(), fxPrAttr.getNumBuckets()});
+ }
}
+ } catch (PRLocallyDestroyedException e) {
+ logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node",
+ prConfig.getPRId(), e);
}
- } catch (PRLocallyDestroyedException e) {
- logger.debug("PRLocallyDestroyedException : Region ={} is locally destroyed on this node",
- prConfig.getPRId(), e);
}
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java
index ef7cf03..00f50d2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PoolFactoryImpl.java
@@ -14,6 +14,19 @@
*/
package org.apache.geode.internal.cache;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.DataSerializable;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
@@ -28,18 +41,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.pdx.internal.TypeRegistry;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
/**
* Implementation of PoolFactory.
@@ -217,7 +218,7 @@ public class PoolFactoryImpl implements PoolFactory {
InetAddress hostAddr = InetAddress.getByName(host);
InetSocketAddress sockAddr = new InetSocketAddress(hostAddr, port);
l.add(sockAddr);
- } catch (UnknownHostException cause) {
+ } catch (UnknownHostException ignore) {
// IllegalArgumentException ex = new IllegalArgumentException("Unknown host " + host);
// ex.initCause(cause);
// throw ex;
@@ -310,7 +311,7 @@ public class PoolFactoryImpl implements PoolFactory {
* @since GemFire 5.7
*/
public Pool create(String name) throws CacheException {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
TypeRegistry registry = cache.getPdxRegistry();
if (registry != null && !attributes.isGateway()) {
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
index 06378f2..9d4b5e2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyBucketRegion.java
@@ -206,7 +206,7 @@ public final class ProxyBucketRegion implements Bucket {
+ getPartitionedRegion().getBucketName(this.bid);
}
- public GemFireCacheImpl getCache() {
+ public InternalCache getCache() {
return this.partitionedRegion.getCache();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
index c4bf506..efa580b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ProxyRegionMap.java
@@ -50,10 +50,8 @@ import org.apache.geode.internal.offheap.annotations.Released;
* always empty.
*
* @since GemFire 5.0
- *
- *
*/
-final class ProxyRegionMap implements RegionMap {
+class ProxyRegionMap implements RegionMap {
protected ProxyRegionMap(LocalRegion owner, Attributes attr,
InternalRegionArguments internalRegionArgs) {
@@ -280,8 +278,7 @@ final class ProxyRegionMap implements RegionMap {
List<EntryEventImpl> pendingCallbacks, FilterRoutingInfo filterRoutingInfo,
ClientProxyMembershipID bridgeContext, TXEntryState txEntryState, VersionTag versionTag,
long tailKey) {
- this.owner.txApplyInvalidatePart2(markerEntry, key, didDestroy, true,
- false /* Clear conflic occurred */);
+ this.owner.txApplyInvalidatePart2(markerEntry, key, didDestroy, true);
if (this.owner.isInitialized()) {
if (txEvent != null) {
txEvent.addInvalidate(this.owner, markerEntry, key, newValue, aCallbackArgument);
@@ -318,7 +315,7 @@ final class ProxyRegionMap implements RegionMap {
long tailKey) {
Operation putOp = p_putOp.getCorrespondingCreateOp();
long lastMod = owner.cacheTimeMillis();
- this.owner.txApplyPutPart2(markerEntry, key, newValue, lastMod, true, didDestroy,
+ this.owner.txApplyPutPart2(markerEntry, key, lastMod, true, didDestroy,
false /* Clear conflict occurred */);
if (this.owner.isInitialized()) {
if (txEvent != null) {
@@ -582,12 +579,6 @@ final class ProxyRegionMap implements RegionMap {
.toLocalizedString(DataPolicy.EMPTY));
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.RegionEntry#getSerializedValueOnDisk(org.apache.geode.
- * internal.cache.LocalRegion)
- */
public Object getSerializedValueOnDisk(LocalRegion localRegion) {
throw new UnsupportedOperationException(
LocalizedStrings.ProxyRegionMap_NO_ENTRY_SUPPORT_ON_REGIONS_WITH_DATAPOLICY_0
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
index a467726..96d871d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEvictorTask.java
@@ -14,25 +14,24 @@
*/
package org.apache.geode.internal.cache;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.lru.HeapEvictor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.Callable;
/**
- *
* Takes delta to be evicted and tries to evict the least no of LRU entry which would make
* evictedBytes more than or equal to the delta
*
* @since GemFire 6.0
- *
*/
public class RegionEvictorTask implements Runnable {
@@ -77,7 +76,7 @@ public class RegionEvictorTask implements Runnable {
}
}
- private GemFireCacheImpl getGemFireCache() {
+ private InternalCache getInternalCache() {
return getHeapEvictor().getGemFireCache();
}
@@ -87,12 +86,12 @@ public class RegionEvictorTask implements Runnable {
@Override
public void run() {
- getGemFireCache().getCachePerfStats().incEvictorJobsStarted();
+ getInternalCache().getCachePerfStats().incEvictorJobsStarted();
long bytesEvicted = 0;
long totalBytesEvicted = 0;
try {
while (true) {
- getGemFireCache().getCachePerfStats();
+ getInternalCache().getCachePerfStats();
final long start = CachePerfStats.getStatTime();
synchronized (this.regionSet) {
if (this.regionSet.isEmpty()) {
@@ -121,15 +120,15 @@ public class RegionEvictorTask implements Runnable {
logger.warn(LocalizedMessage.create(LocalizedStrings.Eviction_EVICTOR_TASK_EXCEPTION,
new Object[] {e.getMessage()}), e);
} finally {
- getGemFireCache().getCachePerfStats();
+ getInternalCache().getCachePerfStats();
long end = CachePerfStats.getStatTime();
- getGemFireCache().getCachePerfStats().incEvictWorkTime(end - start);
+ getInternalCache().getCachePerfStats().incEvictWorkTime(end - start);
}
}
}
}
} finally {
- getGemFireCache().getCachePerfStats().incEvictorJobsCompleted();
+ getInternalCache().getCachePerfStats().incEvictorJobsCompleted();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java
index b6989f9..813f3c6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionFactoryImpl.java
@@ -14,33 +14,29 @@
*/
package org.apache.geode.internal.cache;
-import java.io.File;
-import java.util.Properties;
-
-import org.apache.geode.CancelException;
-import org.apache.geode.cache.*;
-import org.apache.geode.cache.client.ClientNotReadyException;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
/**
- * <code>RegionFactoryImpl</code> extends RegionFactory adding {@link RegionShortcut} support.
+ * {@code RegionFactoryImpl} extends RegionFactory adding {@link RegionShortcut} support.
*
* @since GemFire 6.5
*/
-
public class RegionFactoryImpl<K, V> extends RegionFactory<K, V> {
- public RegionFactoryImpl(GemFireCacheImpl cache) {
+ public RegionFactoryImpl(InternalCache cache) {
super(cache);
}
- public RegionFactoryImpl(GemFireCacheImpl cache, RegionShortcut pra) {
+ public RegionFactoryImpl(InternalCache cache, RegionShortcut pra) {
super(cache, pra);
}
- public RegionFactoryImpl(GemFireCacheImpl cache, RegionAttributes ra) {
+ public RegionFactoryImpl(InternalCache cache, RegionAttributes ra) {
super(cache, ra);
}
- public RegionFactoryImpl(GemFireCacheImpl cache, String regionAttributesId) {
+ public RegionFactoryImpl(InternalCache cache, String regionAttributesId) {
super(cache, regionAttributesId);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java
index 2c3fc95..765f707 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoteOperationMessage.java
@@ -43,8 +43,6 @@ import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
-import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.partitioned.PutMessage;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -77,6 +75,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
* The unique transaction Id on the sending member, used to construct a TXId on the receiving side
*/
private int txUniqId = TXManagerImpl.NOTX;
+
private InternalDistributedMember txMemberId = null;
protected transient short flags;
@@ -84,8 +83,9 @@ public abstract class RemoteOperationMessage extends DistributionMessage
/* TODO [DISTTX] Convert into flag */
protected boolean isTransactionDistributed = false;
- public RemoteOperationMessage() {}
-
+ public RemoteOperationMessage() {
+ // do nothing
+ }
public RemoteOperationMessage(InternalDistributedMember recipient, String regionPath,
ReplyProcessor21 processor) {
@@ -93,7 +93,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
setRecipient(recipient);
this.regionPath = regionPath;
this.processorId = processor == null ? 0 : processor.getProcessorId();
- if (processor != null && this.isSevereAlertCompatible()) {
+ if (processor != null && isSevereAlertCompatible()) {
processor.enableSevereAlertProcessing();
}
this.txUniqId = TXManagerImpl.getCurrentTXUniqueId();
@@ -108,7 +108,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
setRecipients(recipients);
this.regionPath = regionPath;
this.processorId = processor == null ? 0 : processor.getProcessorId();
- if (processor != null && this.isSevereAlertCompatible()) {
+ if (processor != null && isSevereAlertCompatible()) {
processor.enableSevereAlertProcessing();
}
this.txUniqId = TXManagerImpl.getCurrentTXUniqueId();
@@ -121,8 +121,6 @@ public abstract class RemoteOperationMessage extends DistributionMessage
/**
* Copy constructor that initializes the fields declared in this class
- *
- * @param other
*/
public RemoteOperationMessage(RemoteOperationMessage other) {
this.regionPath = other.regionPath;
@@ -152,7 +150,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
/**
* @return the full path of the region
*/
- public final String getRegionPath() {
+ public String getRegionPath() {
return regionPath;
}
@@ -161,30 +159,15 @@ public abstract class RemoteOperationMessage extends DistributionMessage
* is required.
*/
@Override
- public final int getProcessorId() {
+ public int getProcessorId() {
return this.processorId;
}
/**
- * @param processorId1 the {@link org.apache.geode.distributed.internal.ReplyProcessor21} id
- * associated with the message, null if no acknowlegement is required.
- */
- public final void registerProcessor(int processorId1) {
- this.processorId = processorId1;
- }
-
- public void setCacheOpRecipients(Collection cacheOpRecipients) {
- // TODO need to implement this for other remote ops
- assert this instanceof RemotePutMessage;
- }
-
-
- /**
* check to see if the cache is closing
*/
public boolean checkCacheClosing(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
- // return (cache != null && cache.isClosed());
+ InternalCache cache = GemFireCacheImpl.getInstance();
return cache == null || cache.isClosed();
}
@@ -218,14 +201,14 @@ public abstract class RemoteOperationMessage extends DistributionMessage
.toLocalizedString(dm.getId()));
return;
}
- GemFireCacheImpl gfc = getCache(dm);
- r = getRegionByPath(gfc);
+ InternalCache cache = getCache(dm);
+ r = getRegionByPath(cache);
if (r == null && failIfRegionMissing()) {
// if the distributed system is disconnecting, don't send a reply saying
// the partitioned region can't be found (bug 36585)
thr = new RegionDestroyedException(
- LocalizedStrings.RemoteOperationMessage_0_COULD_NOT_FIND_REGION_1.toLocalizedString(
- new Object[] {dm.getDistributionManagerId(), regionPath}),
+ LocalizedStrings.RemoteOperationMessage_0_COULD_NOT_FIND_REGION_1
+ .toLocalizedString(dm.getDistributionManagerId(), regionPath),
regionPath);
return; // reply sent in finally block below
}
@@ -233,7 +216,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
thr = UNHANDLED_EXCEPTION;
// [bruce] r might be null here, so we have to go to the cache instance to get the txmgr
- TXManagerImpl txMgr = getTXManager(gfc);
+ TXManagerImpl txMgr = getTXManager(cache);
TXStateProxy tx = txMgr.masqueradeAs(this);
if (tx == null) {
sendReply = operateOnRegion(dm, r, startTime);
@@ -315,16 +298,16 @@ public abstract class RemoteOperationMessage extends DistributionMessage
}
}
- TXManagerImpl getTXManager(GemFireCacheImpl cache) {
+ TXManagerImpl getTXManager(InternalCache cache) {
return cache.getTxManager();
}
- LocalRegion getRegionByPath(GemFireCacheImpl gfc) {
- return gfc.getRegionByPathForProcessing(this.regionPath);
+ LocalRegion getRegionByPath(InternalCache internalCache) {
+ return internalCache.getRegionByPathForProcessing(this.regionPath);
}
- GemFireCacheImpl getCache(final DistributionManager dm) {
- return (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache getCache(final DistributionManager dm) {
+ return (InternalCache) CacheFactory.getInstance(dm.getSystem());
}
/**
@@ -441,7 +424,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
}
}
- protected final InternalDistributedMember getTXMemberId() {
+ protected InternalDistributedMember getTXMemberId() {
return txMemberId;
}
@@ -502,12 +485,11 @@ public abstract class RemoteOperationMessage extends DistributionMessage
/**
* @return the txUniqId
*/
- public final int getTXUniqId() {
+ public int getTXUniqId() {
return txUniqId;
}
-
- public final InternalDistributedMember getMemberToMasqueradeAs() {
+ public InternalDistributedMember getMemberToMasqueradeAs() {
if (txMemberId == null) {
return getSender();
}
@@ -583,15 +565,15 @@ public abstract class RemoteOperationMessage extends DistributionMessage
if (removeMember(id, true)) {
this.prce = new ForceReattemptException(
LocalizedStrings.PartitionMessage_PARTITIONRESPONSE_GOT_MEMBERDEPARTED_EVENT_FOR_0_CRASHED_1
- .toLocalizedString(new Object[] {id, Boolean.valueOf(crashed)}));
+ .toLocalizedString(id, crashed));
}
checkIfDone();
} else {
Exception e = new Exception(
LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID.toLocalizedString());
logger.info(LocalizedMessage.create(
- LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID_CRASHED_0,
- Boolean.valueOf(crashed)), e);
+ LocalizedStrings.PartitionMessage_MEMBERDEPARTED_GOT_NULL_MEMBERID_CRASHED_0, crashed),
+ e);
}
}
@@ -599,9 +581,8 @@ public abstract class RemoteOperationMessage extends DistributionMessage
* Waits for the response from the {@link RemoteOperationMessage}'s recipient
*
* @throws CacheException if the recipient threw a cache exception during message processing
- * @throws PrimaryBucketException
*/
- final public void waitForCacheException()
+ public void waitForCacheException()
throws CacheException, RemoteOperationException, PrimaryBucketException {
try {
waitForRepliesUninterruptibly();
@@ -630,8 +611,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
throw new PrimaryBucketException(
LocalizedStrings.PartitionMessage_PEER_FAILED_PRIMARY_TEST.toLocalizedString(), t);
} else if (t instanceof RegionDestroyedException) {
- RegionDestroyedException rde = (RegionDestroyedException) t;
- throw rde;
+ throw (RegionDestroyedException) t;
} else if (t instanceof CancelException) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -677,7 +657,7 @@ public abstract class RemoteOperationMessage extends DistributionMessage
* For Distributed Tx
*/
private void setIfTransactionDistributed() {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
if (cache.getTxManager() != null) {
this.isTransactionDistributed = cache.getTxManager().isDistributed();
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java
index 889c019..acf77ba 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RemotePutMessage.java
@@ -52,19 +52,13 @@ import org.apache.geode.internal.cache.versions.DiskVersionTag;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
-import org.apache.geode.internal.util.BlobHelper;
-import org.apache.geode.internal.util.Breadcrumbs;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_OLD_VALUE;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
-import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_BYTES;
-import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_SERIALIZED_OBJECT;
-import static org.apache.geode.internal.cache.DistributedCacheOperation.VALUE_IS_OBJECT;
/**
* A Replicate Region update message. Meant to be sent only to the peer who hosts transactional
@@ -479,11 +473,6 @@ public final class RemotePutMessage extends RemoteOperationMessageWithDirectRepl
this.op = operation;
}
- @Override
- public void setCacheOpRecipients(Collection cacheOpRecipients) {
- this.cacheOpRecipients = cacheOpRecipients;
- }
-
/**
* sets the instance variable hasOldValue to the giving boolean value.
*/
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
index 65cda5d..34f6b73 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/SearchLoadAndWriteProcessor.java
@@ -12,16 +12,56 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
-/* enumerate each imported class because conflict with dl.u.c.TimeoutException */
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.NotSerializableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.logging.log4j.Logger;
-import org.apache.geode.*;
-import org.apache.geode.cache.*;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.CancelException;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.GemFireException;
+import org.apache.geode.InternalGemFireException;
+import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.CacheLoader;
+import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import org.apache.geode.distributed.internal.MembershipListener;
+import org.apache.geode.distributed.internal.PooledDistributionMessage;
+import org.apache.geode.distributed.internal.ProcessorKeeper21;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
@@ -31,19 +71,9 @@ import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.NotSerializableException;
-import java.util.*;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-
/**
* Implementation for distributed search, load and write operations in the GemFire system. Provides
@@ -54,18 +84,15 @@ import java.util.concurrent.locks.Lock;
* times.netLoad happens as a one phase operation in all cases except where the scope is GLOBAL At
* the receiving end, the request is converted into an appropriate message whose process method
* responds to the request.
- *
*/
-
public class SearchLoadAndWriteProcessor implements MembershipListener {
private static final Logger logger = LogService.getLogger();
public static final int SMALL_BLOB_SIZE =
- Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000).intValue();
+ Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000);
static final long RETRY_TIME =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000).longValue();
-
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000);
private volatile InternalDistributedMember selectedNode;
private boolean selectedNodeDead = false;
@@ -200,7 +227,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
} finally {
if (event != listenerEvent) {
if (listenerEvent instanceof EntryEventImpl) {
- ((EntryEventImpl) listenerEvent).release();
+ ((Releasable) listenerEvent).release();
}
}
}
@@ -334,7 +361,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
if (this.advisor != null) {
this.advisor.removeMembershipListener(this);
}
- } catch (IllegalArgumentException e) {
+ } catch (IllegalArgumentException ignore) {
} finally {
getProcessorKeeper().remove(this.processorId);
}
@@ -343,13 +370,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
void remove() {
getProcessorKeeper().remove(this.processorId);
-
}
-
-
void initialize(LocalRegion theRegion, Object theKey, Object theCallbackArg) {
-
this.region = theRegion;
this.regionName = theRegion.getFullPath();
this.key = theKey;
@@ -358,10 +381,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
Scope scope = attrs.getScope();
if (scope.isDistributed()) {
this.advisor = ((CacheDistributionAdvisee) this.region).getCacheDistributionAdvisor();
- this.distributionManager = ((CacheDistributionAdvisee) theRegion).getDistributionManager();
+ this.distributionManager = theRegion.getDistributionManager();
this.timeout = getSearchTimeout();
this.advisor.addMembershipListener(this);
-
}
}
@@ -369,7 +391,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
this.key = key;
}
- /************** Protected Methods ********************/
protected void setSelectedNode(InternalDistributedMember selectedNode) {
this.selectedNode = selectedNode;
this.selectedNodeDead = false;
@@ -383,18 +404,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
return this.key;
}
- /************** Package Methods **********************/
-
InternalDistributedMember getSelectedNode() {
return this.selectedNode;
}
- /************** Private Methods **********************/
/**
* Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region,
* most of the services it provides are relevant to distribution only. The 3 services it provides
* are netSearch, netLoad, netWrite
- *
*/
private SearchLoadAndWriteProcessor() {
resetResults();
@@ -410,7 +427,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
this.responseQueue = null;
}
-
/**
* If we have a local cache loader and the region is not global, then invoke the loader If the
* region is local, or the result is non-null, then return whatever the loader returned do a
@@ -614,7 +630,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
.toLocalizedString(key));
}
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
region.getCancelCriterion().checkCancelInProgress(null);
// continue;
@@ -871,7 +887,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
} finally {
if (event != pevent) {
if (event instanceof EntryEventImpl) {
- ((EntryEventImpl) event).release();
+ ((Releasable) event).release();
}
}
}
@@ -1005,7 +1021,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
this.remoteGetInProgress = true;
setSelectedNode(sender);
return; // sendValueRequest does the rest of the work
- } catch (RejectedExecutionException ex) {
+ } catch (RejectedExecutionException ignore) {
// just fall through since we must be shutting down.
}
}
@@ -1195,7 +1211,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
if (waitTimeMs <= 0) {
throw new TimeoutException(
LocalizedStrings.SearchLoadAndWriteProcessor_TIMED_OUT_WHILE_DOING_NETSEARCHNETLOADNETWRITE_PROCESSORID_0_KEY_IS_1
- .toLocalizedString(new Object[] {Integer.valueOf(this.processorId), this.key}));
+ .toLocalizedString(new Object[] {this.processorId, this.key}));
}
boolean interrupted = Thread.interrupted();
@@ -1229,14 +1245,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
sb.append(" msRemaining=").append(waitTimeMs);
}
if (lastNS != 0) {
- sb.append(" lastNotifySpot=" + lastNS);
+ sb.append(" lastNotifySpot=").append(lastNS);
}
throw new TimeoutException(
LocalizedStrings.SearchLoadAndWriteProcessor_TIMEOUT_DURING_NETSEARCHNETLOADNETWRITE_DETAILS_0
.toLocalizedString(sb));
}
return;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
region.getCancelCriterion().checkCancelInProgress(null);
// keep waiting until we are done
@@ -1305,14 +1321,14 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
DiskRegion dr = rgn.getDiskRegion();
if (dr != null) {
dr.setClearCountReference();
- } ;
+ }
}
protected static void removeClearCountReference(LocalRegion rgn) {
DiskRegion dr = rgn.getDiskRegion();
if (dr != null) {
dr.removeClearCountReference();
- } ;
+ }
}
/**
@@ -1326,12 +1342,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
nMsg.doGet((DistributionManager) this.distributionManager);
}
- /*****************************************************************************
- * INNER CLASSES
- *****************************************************************************/
-
-
-
/**
* A QueryMessage is broadcast to every node that has the region defined, to find out who has a
* valid copy of the requested object.
@@ -1368,7 +1378,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
private static final short HAS_IDLE_TIME = (HAS_TTL << 1);
private static final short ALWAYS_SEND_RESULT = (HAS_IDLE_TIME << 1);
- public QueryMessage() {};
+ public QueryMessage() {
+ // do nothing
+ }
/**
* Using a new or pooled message instance, create and send the query to all nodes.
@@ -1492,8 +1504,8 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
try {
// check to see if we would have to wait on initialization latch (if global)
// if so abort and reply with null
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
- if (gfc.isGlobalRegionInitializing(this.regionName)) {
+ InternalCache cache = (InternalCache) CacheFactory.getInstance(dm.getSystem());
+ if (cache.isGlobalRegionInitializing(this.regionName)) {
replyWithNull(dm);
if (logger.isDebugEnabled()) {
logger.debug("Global Region not initialized yet");
@@ -1512,31 +1524,28 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
if (entry != null) {
synchronized (entry) {
assert region.isInitialized();
- {
- if (dm.cacheTimeMillis() - startTime < timeoutMs) {
- o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes,
- // decrc
- if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o)
- && !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) {
- isPresent = true;
- VersionStamp stamp = entry.getVersionStamp();
- if (stamp != null && stamp.hasValidVersion()) {
- tag = stamp.asVersionTag();
- }
- long lastModified = entry.getLastModified();
- lastModifiedCacheTime = lastModified;
- isSer = o instanceof CachedDeserializable;
- if (isSer) {
- o = ((CachedDeserializable) o).getSerializedValue();
- }
- if (isPresent && (this.alwaysSendResult
- || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) {
- sendResult = true;
- }
+ if (dm.cacheTimeMillis() - startTime < timeoutMs) {
+ o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes,
+ // decrc
+ if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o)
+ && !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) {
+ isPresent = true;
+ VersionStamp stamp = entry.getVersionStamp();
+ if (stamp != null && stamp.hasValidVersion()) {
+ tag = stamp.asVersionTag();
+ }
+ lastModifiedCacheTime = entry.getLastModified();
+ isSer = o instanceof CachedDeserializable;
+ if (isSer) {
+ o = ((CachedDeserializable) o).getSerializedValue();
+ }
+ if (isPresent && (this.alwaysSendResult
+ || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) {
+ sendResult = true;
}
- } else {
- requestorTimedOut = true;
}
+ } else {
+ requestorTimedOut = true;
}
}
} else if (logger.isDebugEnabled()) {
@@ -1549,10 +1558,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
ResponseMessage.sendMessage(this.key, this.getSender(), processorId,
(sendResult ? o : null), lastModifiedCacheTime, isPresent, isSer, requestorTimedOut, dm,
tag);
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
logger.debug("Region Destroyed Exception in QueryMessage doGet, null");
replyWithNull(dm);
- } catch (CancelException cce) {
+ } catch (CancelException ignore) {
logger.debug("CacheClosedException in QueryMessage doGet, null");
replyWithNull(dm);
} catch (VirtualMachineError err) {
@@ -1577,14 +1586,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
private void replyWithNull(DistributionManager dm) {
ResponseMessage.sendMessage(this.key, this.getSender(), processorId, null, 0, false, false,
false, dm, null);
-
}
-
}
- /********************* ResponseMessage ***************************************/
-
-
/**
* The ResponseMessage is a reply to a QueryMessage, and contains the object's value, if it is
* below the byte limit, otherwise an indication of whether the sender has the value.
@@ -1605,7 +1609,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/** is the value present */
private boolean isPresent;
-
/** Is blob serialized? */
private boolean isSerialized;
@@ -1865,11 +1868,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
// bytes, decrc
if (eov != null) {
if (eov == Token.INVALID || eov == Token.LOCAL_INVALID) {
- // ebv = null; (redundant assignment)
+ // nothing?
} else if (dm.cacheTimeMillis() - startTime < timeoutMs) {
if (!region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) {
- long lastModified = entry.getLastModified();
- lastModifiedCacheTime = lastModified;
+ lastModifiedCacheTime = entry.getLastModified();
if (eov instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) eov;
if (!cd.isSerialized()) {
@@ -1911,10 +1913,10 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId,
this.key, ebv, ebvObj, ebvLen, lastModifiedCacheTime, isSer, requestorTimedOut,
authoritative, dm, versionTag);
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
replyWithNull(dm);
- } catch (CancelException cce) {
+ } catch (CancelException ignore) {
replyWithNull(dm);
} catch (VirtualMachineError err) {
@@ -1940,13 +1942,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
private void replyWithNull(DistributionManager dm) {
NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId,
this.key, null, null, 0, 0, false, false, false, dm, null);
-
}
-
}
- /********************* NetSearchReplyMessage ***************************************/
-
/**
* The NetSearchReplyMessage is a reply to a NetSearchRequestMessage, and contains the object's
* value.
@@ -1961,8 +1959,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/** The gemfire id of the SearchLoadAndWrite object waiting for response */
private int processorId;
-
-
/** The object value being transferred */
private byte[] value;
@@ -2150,7 +2146,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
try {
processor.distributionManager.putOutgoingUserData(msg);
- } catch (NotSerializableException e) {
+ } catch (NotSerializableException ignore) {
throw new IllegalArgumentException(
LocalizedStrings.SearchLoadAndWriteProcessor_MESSAGE_NOT_SERIALIZABLE
.toLocalizedString());
@@ -2210,13 +2206,11 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
+ "\" in region \"" + this.regionName + "\", processorId " + processorId;
}
-
-
private void doLoad(DistributionManager dm) {
long startTime = dm.cacheTimeMillis();
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
try {
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName);
if (region != null && region.isInitialized()
&& (dm.cacheTimeMillis() - startTime < timeoutMs)) {
@@ -2282,16 +2276,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
void replyWithException(Exception e, DistributionManager dm) {
NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId, null, dm,
this.aCallbackArgument, e, false, false);
-
}
-
-
}
-
-
- /********************* NetLoadReplyMessage ***************************************/
-
/**
* The NetLoadReplyMessage is a reply to a RequestMessage, and contains the object's value.
*/
@@ -2303,7 +2290,6 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
/** The object value being transferred */
private Object result;
-
/** Loader parameter returned to sender */
private Object aCallbackArgument;
@@ -2481,7 +2467,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
long startTime = dm.cacheTimeMillis();
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
try {
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName);
if (region != null && region.isInitialized()
&& (dm.cacheTimeMillis() - startTime < timeoutMs)) {
@@ -2560,7 +2546,7 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
true);
}
- } catch (RegionDestroyedException rde) {
+ } catch (RegionDestroyedException ignore) {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm,
false, null, false);
@@ -2594,16 +2580,9 @@ public class SearchLoadAndWriteProcessor implements MembershipListener {
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
-
-
-
}
-
-
}
- /********************* NetWriteReplyMessage *********************************/
-
/**
* The NetWriteReplyMessage is a reply to a NetWriteRequestMessage, and contains the success code
* or exception that is propagated back to the requestor
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java
index aa37880..7f28d5a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ServerPingMessage.java
@@ -56,7 +56,7 @@ public class ServerPingMessage extends PooledDistributionMessage {
*
* @return true if all the recipients are pingable
*/
- public static boolean send(GemFireCacheImpl cache, Set<InternalDistributedMember> recipients) {
+ public static boolean send(InternalCache cache, Set<InternalDistributedMember> recipients) {
InternalDistributedSystem ids = cache.getInternalDistributedSystem();
DM dm = ids.getDistributionManager();
http://git-wip-us.apache.org/repos/asf/geode/blob/21f405b8/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
index eb93b76..c745754 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/StateFlushOperation.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -100,18 +99,16 @@ public class StateFlushOperation {
private DM dm;
-
/** flush current ops to the given members for the given region */
public static void flushTo(Set<InternalDistributedMember> targets, DistributedRegion region) {
DM dm = region.getDistributionManager();
- DistributedRegion r = region;
- boolean initialized = r.isInitialized();
+ boolean initialized = region.isInitialized();
if (initialized) {
- r.getDistributionAdvisor().forceNewMembershipVersion(); // force a new "view" so we can track
- // current ops
+ // force a new "view" so we can track current ops
+ region.getDistributionAdvisor().forceNewMembershipVersion();
try {
- r.getDistributionAdvisor().waitForCurrentOperations();
- } catch (RegionDestroyedException e) {
+ region.getDistributionAdvisor().waitForCurrentOperations();
+ } catch (RegionDestroyedException ignore) {
return;
}
}
@@ -137,14 +134,14 @@ public class StateFlushOperation {
processors.add(processor);
}
- if (r.getRegionMap().getARMLockTestHook() != null) {
- r.getRegionMap().getARMLockTestHook().beforeStateFlushWait();
+ if (region.getRegionMap().getARMLockTestHook() != null) {
+ region.getRegionMap().getARMLockTestHook().beforeStateFlushWait();
}
for (ReplyProcessor21 processor : processors) {
try {
processor.waitForReplies();
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
return;
}
@@ -319,7 +316,7 @@ public class StateFlushOperation {
// 36175)
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
try {
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
Region r = gfc.getRegionByPathForProcessing(this.regionPath);
if (r instanceof DistributedRegion) {
region = (DistributedRegion) r;
@@ -336,9 +333,9 @@ public class StateFlushOperation {
// 36175)
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
try {
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache cache = (InternalCache) CacheFactory.getInstance(dm.getSystem());
Set<DistributedRegion> result = new HashSet();
- for (LocalRegion r : gfc.getAllRegions()) {
+ for (LocalRegion r : cache.getAllRegions()) {
// it's important not to check if the cache is closing, so access
// the isDestroyed boolean directly
if (r instanceof DistributedRegion && !r.isDestroyed) {
@@ -400,7 +397,7 @@ public class StateFlushOperation {
}
try {
r.getDistributionAdvisor().waitForCurrentOperations();
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
// continue with the next region
}
}
@@ -422,7 +419,7 @@ public class StateFlushOperation {
}
}
}
- } catch (CancelException cce) {
+ } catch (CancelException ignore) {
// cache is closed - no distribution advisor available for the region so nothing to do but
// send the stabilization message
} catch (Exception e) {
@@ -530,7 +527,7 @@ public class StateFlushOperation {
return "unknown channelState content";
} else {
Map csmap = (Map) state;
- StringBuffer result = new StringBuffer(200);
+ StringBuilder result = new StringBuilder(200);
for (Iterator it = csmap.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
result.append(entry.getKey()).append('=').append(entry.getValue());
@@ -565,7 +562,7 @@ public class StateFlushOperation {
try {
dm.getMembershipManager().waitForMessageState(getSender(), channelState);
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -697,7 +694,7 @@ public class StateFlushOperation {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("StateStabilizedMessage ");
sb.append(this.processorId);
if (super.getSender() != null) {