You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/09 21:09:19 UTC
[31/50] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/d888c75e/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 172fabe..aa40508 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
- *
+ *
* [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
* all
*/
@@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -566,7 +566,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* Handle response of precommit reply
- *
+ *
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
@@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -756,7 +756,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postPutAll(putallOp, successfulPuts, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
@@ -835,7 +835,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postRemoveAll(op, successfulOps, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
"DistTXStateProxyImplOnCoordinator.postRemoveAll "
http://git-wip-us.apache.org/repos/asf/geode/blob/d888c75e/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.util.DelayedAction;
-/**
- *
- */
public abstract class DistributedCacheOperation {
private static final Logger logger = LogService.getLogger();
public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
public static Random LOSS_SIMULATION_GENERATOR;
public static long SLOW_DISTRIBUTION_MS = 0; // test hook
// constants used in subclasses and distribution messages
// should use enum in source level 1.5+
+
/**
* Deserialization policy: do not deserialize (for byte array, null or cases where the value
* should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
}
- public final static byte DESERIALIZATION_POLICY_NUMBITS =
+ public static final byte DESERIALIZATION_POLICY_NUMBITS =
DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
public static final short DESERIALIZATION_POLICY_END =
(short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
try {
_distribute();
} catch (InvalidVersionException e) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
}
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
DistributedRegion region = getRegion();
if (viewVersion != -1) {
region.getDistributionAdvisor().endOperation(viewVersion);
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
viewVersion);
}
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
if (SLOW_DISTRIBUTION_MS > 0) { // test hook
try {
Thread.sleep(SLOW_DISTRIBUTION_MS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
}
// some members requiring old value are also in the cache op recipients set
- Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+ Set needsOldValueInCacheOp = Collections.emptySet();
// set client routing information into the event
boolean routingComputed = false;
FilterRoutingInfo filterRouting = null;
// recipients that will get a cacheop msg and also a PR message
- Set twoMessages = Collections.EMPTY_SET;
+ Set twoMessages = Collections.emptySet();
if (region.isUsedForPartitionedRegionBucket()) {
- twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+ twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
routingComputed = true;
filterRouting = getRecipientFilterRouting(recipients);
if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
// some members need PR notification of the change for client/wan
// notification
- Set adjunctRecipients = Collections.EMPTY_SET;
+ Set adjunctRecipients = Collections.emptySet();
// Partitioned region listener notification messages piggyback on this
// operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
recipients.removeAll(needsOldValueInCacheOp);
}
- Set cachelessNodes = Collections.EMPTY_SET;
- Set adviseCacheServers = Collections.EMPTY_SET;
- Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
- new HashSet<InternalDistributedMember>();
+ Set cachelessNodes = Collections.emptySet();
+ Set adviseCacheServers;
+ Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
if (!cachelessNodes.isEmpty()) {
List list = new ArrayList(cachelessNodes);
for (Object member : cachelessNodes) {
- if (!recipients.contains(member)) {
+ if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
// Don't include those originally excluded.
list.remove(member);
- } else if (adjunctRecipients.contains(member)) {
- list.remove(member);
}
}
cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
if (!reliableOp || region.isNoDistributionOk()) {
// nothing needs be done in this case
} else {
- region.handleReliableDistribution(Collections.EMPTY_SET);
+ region.handleReliableDistribution(Collections.emptySet());
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
} else {
boolean directAck = false;
boolean useMulticast = region.getMulticastEnabled()
- && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+ && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
boolean shouldAck = shouldAck();
if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
recipients);
}
waitForMembers.removeAll(recipients);
- recipients = Collections.EMPTY_SET;
+ recipients = Collections.emptySet();
}
}
if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
}
adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
- adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+ adviseCacheServers = ((Bucket) region).getPartitionedRegion()
.getCacheDistributionAdvisor().adviseCacheServers();
adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
}
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
}
}
-
/**
* Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
* key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
continue;
}
- CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+ CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
.getCacheDistributionAdvisor().getProfile(m);
if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
continue;
}
-
for (Object value : cf.filterProfile.getCqMap().values()) {
ServerCQ cq = (ServerCQ) value;
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
Long cqID = e.getKey();
// For the CQs satisfying the event with destroy CQEvent, remove
// the entry form CQ cache.
- if (cq.getFilterID() == cqID
- && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
- cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+ if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+ cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
}
}
}
}
}
-
/**
* Get the adjunct receivers for a partitioned region operation
*
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
/**
* perform any operation-specific initialization on the given reply processor
- *
- * @param p
- * @param msg
*/
protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
// nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
}
}
- /**
- * @param closedMembers
- */
private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
return null;
}
CacheDistributionAdvisor advisor;
- // if (region.isUsedForPartitionedRegionBucket()) {
- advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
- // } else {
- // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
- // }
+ advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
}
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
-
private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
return this.op;
}
-
/** sets the concurrency versioning tag for this message */
public void setVersionTag(VersionTag tag) {
this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
/**
* process a reply
*
- * @param reply
- * @param processor
* @return true if the reply-processor should continue to process this response
*/
boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
* @param event the entry event that contains the old value
*/
public void appendOldValueToMessage(EntryEventImpl event) {
- {
- @Unretained
- Object val = event.getRawOldValue();
- if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
- || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
- return;
- }
+ @Unretained
+ Object val = event.getRawOldValue();
+ if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+ || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+ return;
}
event.exportOldValue(this);
}
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
Assert.assertTrue(this.regionPath != null, "regionPath was null");
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
return gfc.getRegionByPathForProcessing(this.regionPath);
}
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
sendReply = false;
basicProcess(dm, lclRgn);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
// region
if (!rgn.isEventTrackerInitialized()
&& (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
}
return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
sendReply = operateOnRegion(event, dm) && sendReply;
} finally {
if (event instanceof EntryEventImpl) {
- ((EntryEventImpl) event).release();
+ ((Releasable) event).release();
}
}
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Region destroyed: nothing to do", this);
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
if (!lclRgn.isDestroyed()) {
logger.error("Got disk access exception, expected region to be destroyed", e);
}
- } catch (EntryNotFoundException e) {
+ } catch (EntryNotFoundException ignore) {
this.appliedOperation = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
// distributed-no-ack message. Don't respond
} else {
- ReplyException exception = rex;
- ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+ ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
isInternal());
}
}
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
* When an event is discarded because of an attempt to overwrite a more recent change we still
* need to deliver that event to clients. Clients can then perform their own concurrency checks
* on the event.
- *
- * @param rgn
- * @param ev
*/
protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
rgn.notifyBridgeClients(ev);
}
- // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
- // String path) {
- // return LocalRegion.getRegionFromPath(sys, path);
- // }
-
protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException;
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- // super.fromData(in);
short bits = in.readShort();
short extBits = in.readShort();
this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
@Override
public void toData(DataOutput out) throws IOException {
- // super.toData(out);
-
short bits = 0;
short extendedBits = 0;
bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
static class CacheOperationReplyProcessor extends DirectReplyProcessor {
public CacheOperationMessage msg;
- public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
- new CopyOnWriteHashSet<InternalDistributedMember>();
+ public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
super(system, initMembers);