You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/04 00:07:33 UTC
[19/54] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 172fabe..5f8d238 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -49,13 +49,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
* {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data node)
*/
protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals =
- new HashMap<>();
+ new HashMap<>();
private HashMap<LocalRegion, DistributedMember> rrTargets;
private Set<DistributedMember> txRemoteParticpants = null; // other than local
protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
- InternalDistributedMember clientMember) {
+ InternalDistributedMember clientMember) {
super(managerImpl, id, clientMember);
}
@@ -65,9 +65,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.geode.internal.cache.TXStateInterface#commit()
- *
+ *
* [DISTTX] TODO Catch all exceptions in precommit and rollback and make sure these messages reach
* all
*/
@@ -79,12 +79,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create a map of secondary(for PR) / replica(for RR) to stubs to send
// commit message to those
HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals =
- getSecondariesAndReplicasForTxOps();
+ getSecondariesAndReplicasForTxOps();
// add it to the existing map and then send commit to all copies
target2realDeals.putAll(otherTargets2realDeals);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
+ "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
}
precommitResult = doPrecommit();
@@ -95,17 +95,17 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
boolean phase2commitDone = doCommit();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
- + (phase2commitDone ? "Done" : "Failed"));
+ + (phase2commitDone ? "Done" : "Failed"));
}
// [DISTTX] TODO Handle this exception well
if (!phase2commitDone) {
throw new TransactionInDoubtException(
- LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
+ LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
+ "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
}
}
} catch (CommitConflictException e) {
@@ -133,30 +133,30 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
final GemFireCacheImpl cache =
- GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
+ GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
InternalDistributedMember currentNode =
- cache.getInternalDistributedSystem().getDistributedMember();
+ cache.getInternalDistributedSystem().getDistributedMember();
HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals =
- new HashMap<>();
+ new HashMap<>();
for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals.entrySet()) {
DistributedMember originalTarget = e.getKey();
DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
ArrayList<DistTxEntryEvent> primaryTxOps =
- distPeerTxStateStub.getPrimaryTransactionalOperations();
+ distPeerTxStateStub.getPrimaryTransactionalOperations();
for (DistTxEntryEvent dtop : primaryTxOps) {
LocalRegion lr = dtop.getRegion();
// replicas or secondaries
Set<InternalDistributedMember> otherNodes = null;
if (lr instanceof PartitionedRegion) {
Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop.getRegion())
- .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
+ .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
allNodes.remove(originalTarget);
otherNodes = allNodes;
} else if (lr instanceof DistributedRegion) {
otherNodes =
- ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
+ ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
otherNodes.remove(originalTarget);
}
@@ -204,20 +204,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and rollback message
DistTXRollbackMessage.DistTxRollbackReplyProcessor processor =
- new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
- txRemoteParticpants, target2realDeals);
+ new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
+ txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXRollbackMessage rollbackMsg =
- new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send rollback message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
@@ -236,14 +236,14 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
localTXState.rollback();
boolean localResult = localTXState.getRollbackResponse();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " + dm.getId()
- + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
+ + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
@@ -280,7 +280,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
Boolean remoteResult = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + target
- + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
+ + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResult;
}
@@ -295,7 +295,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -339,8 +339,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (r.scope.isDistributed()) {
if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
logger.warn(LocalizedMessage.create(
- LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
- r.getFullPath()));
+ LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
+ r.getFullPath()));
}
}
} else {
@@ -354,20 +354,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
- this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
+ "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
+ this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
}
target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
if (logger.isDebugEnabled()) {
logger
- .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
- + target2realDeals);
+ .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+ + target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
- this.realDeal, this, target, target2realDeals);
+ "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
+ this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
@@ -382,25 +382,25 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
this.realDeal = new DistPeerTXStateStub(this, target, onBehalfOfClientMember);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
- this.realDeal, this.txMgr.getDM().getId());
+ "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
+ this.realDeal, this.txMgr.getDM().getId());
}
if (!this.realDeal.isDistTx() || this.realDeal.isCreatedOnDistTxCoordinator()
|| !this.realDeal.isTxState()) {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
- .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
+ .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
}
target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
- + target2realDeals);
+ "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+ + target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
- this.realDeal, this, target, target2realDeals);
+ "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
+ this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
@@ -429,8 +429,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
this.txRemoteParticpants.remove(dm.getId());
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
- + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
- + dm.getId());
+ + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
+ + dm.getId());
}
}
return txRemoteParticpants;
@@ -444,20 +444,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and precommit message
DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor =
- new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
- txRemoteParticpants, target2realDeals);
+ new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
+ txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXPrecommitMessage precommitMsg =
- new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send precommit message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
@@ -467,7 +467,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
+ "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
}
}
@@ -477,29 +477,29 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
localTXState.precommit();
boolean localResult = localTXState.getPreCommitResponse();
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap =
- new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+ new TreeMap<String, ArrayList<DistTxThinEntryState>>();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
if (localResult) {
localResult = ((DistTXStateOnCoordinator) localTXState)
- .populateDistTxEntryStateList(entryStateSortedMap);
+ .populateDistTxEntryStateList(entryStateSortedMap);
if (localResult) {
entryEventList =
- new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
+ new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
}
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " + dm.getId()
- + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
- + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
- + " ,finalResult-old= " + finalResult);
+ + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+ + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
+ + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
@@ -531,19 +531,19 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// dm.getStats().incCommitWaits();
Map<DistributedMember, DistTxPrecommitResponse> remoteResults =
- processor.getCommitResponseMap();
+ processor.getCommitResponseMap();
for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
DistTxPrecommitResponse remoteResponse = e.getValue();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList =
- remoteResponse.getDistTxEntryEventList();
+ remoteResponse.getDistTxEntryEventList();
populateEntryEventMap(target, entryEventList, sortedRegionName);
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
- + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
- + printEntryEventList(entryEventList) + " ,txEntryEventMap="
- + printEntryEventMap(this.txEntryEventMap) + " ,result= "
- + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
+ + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
+ + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResponse.getCommitState();
}
@@ -551,7 +551,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -566,11 +566,11 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* Handle response of precommit reply
- *
+ *
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
if (this.txEntryEventMap == null) {
this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
}
@@ -583,10 +583,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (sortedRegionName.size() != entryEventList.size()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
- "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
- + " for target=" + target,
- entryEventList.size() + " {" + entryEventList + "}"));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+ "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
+ + " for target=" + target,
+ entryEventList.size() + " {" + entryEventList + "}"));
}
int index = 0;
@@ -601,7 +601,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
* Populate list of regions for this target, while sending commit messages
*/
private void populateEntryEventList(DistributedMember target,
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
sortedRegionMap.clear();
distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
@@ -612,7 +612,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap.get(rName);
if (entryStates == null) {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
- .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
+ .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
}
entryEventList.add(entryStates);
}
@@ -631,12 +631,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and commit message
DistTXCommitMessage.DistTxCommitReplyProcessor processor =
- new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
- target2realDeals);
+ new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
+ target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXCommitMessage commitMsg =
- new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send commit message to remote nodes
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
@@ -645,8 +645,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
populateEntryEventList(remoteNode, entryEventList, sortedRegionName);
@@ -659,9 +659,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
- + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
- + printEntryEventList(entryEventList) + " ,txEntryEventMap="
- + printEntryEventMap(this.txEntryEventMap));
+ + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
+ + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ + printEntryEventMap(this.txEntryEventMap));
}
}
@@ -670,8 +670,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
@@ -679,10 +679,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
TXCommitMessage localResultMsg = localTXState.getCommitMessage();
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
- + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
- + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
- + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
+ "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
+ + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
+ + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && (localResultMsg != null);
}
@@ -719,8 +719,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
TXCommitMessage remoteResultMsg = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
- + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
+ "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
+ + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResultMsg != null;
}
@@ -728,7 +728,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
/*
* [DISTTX] TODO Write similar method to take out exception
- *
+ *
* [DISTTX] TODO Handle Reliable regions
*/
// if (this.hasReliableRegions) {
@@ -748,7 +748,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
@Override
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
- LocalRegion region) {
+ LocalRegion region) {
if (putallOp.putAllData.length == 0) {
return;
}
@@ -756,22 +756,22 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postPutAll(putallOp, successfulPuts, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postPutAll "
- + "processing putAll op for region {}, size of putAllOp " + "is {}",
- region, putallOp.putAllData.length);
+ "DistTXStateProxyImplOnCoordinator.postPutAll "
+ + "processing putAll op for region {}, size of putAllOp " + "is {}",
+ region, putallOp.putAllData.length);
}
// map of bucketId to putall op for this bucket
HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap =
- new HashMap<Integer, DistributedPutAllOperation>();
+ new HashMap<Integer, DistributedPutAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
- new HashMap<Integer, DistTXCoordinatorInterface>();
+ new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the putall op per bucket
for (int i = 0; i < putallOp.putAllData.length; i++) {
@@ -783,10 +783,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (putAllForBucket == null) {
// TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
- Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
+ Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
event.setEventId(putallOp.putAllData[i].getEventID());
putAllForBucket =
- new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
+ new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
bucketToPutallMap.put(bucketId, putAllForBucket);
}
putallOp.putAllData[i].setFakeEventID();
@@ -811,9 +811,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postPutAll processing"
- + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
- bucketId, dtsi, putAllForBucket);
+ "DistTXStateProxyImplOnCoordinator.postPutAll processing"
+ + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
+ bucketId, dtsi, putAllForBucket);
}
dtsi.postPutAll(putAllForBucket, successfulPuts, region);
}
@@ -827,7 +827,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
- LocalRegion region) {
+ LocalRegion region) {
if (op.removeAllData.length == 0) {
return;
}
@@ -835,20 +835,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
super.postRemoveAll(op, successfulOps, region);
} else {
region.getCancelCriterion().checkCancelInProgress(null); // fix for bug
- // #43651
+ // #43651
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postRemoveAll "
- + "processing removeAll op for region {}, size of removeAll " + "is {}",
- region, op.removeAllDataSize);
+ "DistTXStateProxyImplOnCoordinator.postRemoveAll "
+ + "processing removeAll op for region {}, size of removeAll " + "is {}",
+ region, op.removeAllDataSize);
}
// map of bucketId to removeAll op for this bucket
HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap =
- new HashMap<Integer, DistributedRemoveAllOperation>();
+ new HashMap<Integer, DistributedRemoveAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
- new HashMap<Integer, DistTXCoordinatorInterface>();
+ new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the removeAll op per bucket
for (int i = 0; i < op.removeAllData.length; i++) {
@@ -862,7 +862,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, region, key);
event.setEventId(op.removeAllData[i].getEventID());
removeAllForBucket =
- new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
+ new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
}
op.removeAllData[i].setFakeEventID();
@@ -887,9 +887,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
- + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
- bucketId, dtsi, removeAllForBucket);
+ "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+ + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
+ bucketId, dtsi, removeAllForBucket);
}
dtsi.postRemoveAll(removeAllForBucket, successfulOps, region);
}
@@ -903,13 +903,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
public static String printEntryEventMap(
- HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
+ HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(txRegionVersionsMap.size());
str.append(")=[ ");
for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
- .entrySet()) {
+ .entrySet()) {
str.append(" {").append(entry.getKey());
str.append(":").append("size(").append(entry.getValue().size()).append(")");
str.append("=").append(entry.getValue()).append("}, ");
@@ -919,7 +919,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
public static String printEntryEventList(
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(entryEventList.size());
http://git-wip-us.apache.org/repos/asf/geode/blob/0d0bf253/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 0a9ccd8..7ba7d0c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -39,6 +38,7 @@ import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.query.internal.cq.CqService;
@@ -58,12 +58,11 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.InternalDataSerializer;
-import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
-import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllMessage;
import org.apache.geode.internal.cache.EntryEventImpl.OldValueImporter;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.UpdateOperation.UpdateMessage;
+import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -75,26 +74,26 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.StoredObject;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.util.DelayedAction;
-/**
- *
- */
public abstract class DistributedCacheOperation {
private static final Logger logger = LogService.getLogger();
public static double LOSS_SIMULATION_RATIO = 0; // test hook
+
public static Random LOSS_SIMULATION_GENERATOR;
public static long SLOW_DISTRIBUTION_MS = 0; // test hook
// constants used in subclasses and distribution messages
// should use enum in source level 1.5+
+
/**
* Deserialization policy: do not deserialize (for byte array, null or cases where the value
* should stay serialized)
@@ -145,11 +144,12 @@ public abstract class DistributedCacheOperation {
}
- public final static byte DESERIALIZATION_POLICY_NUMBITS =
+ public static final byte DESERIALIZATION_POLICY_NUMBITS =
DistributionMessage.getNumBits(DESERIALIZATION_POLICY_LAZY);
public static final short DESERIALIZATION_POLICY_END =
(short) (1 << DESERIALIZATION_POLICY_NUMBITS);
+
public static final short DESERIALIZATION_POLICY_MASK = (short) (DESERIALIZATION_POLICY_END - 1);
public static boolean testSendingOldValues;
@@ -263,7 +263,7 @@ public abstract class DistributedCacheOperation {
try {
_distribute();
} catch (InvalidVersionException e) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM, "PutAll failed since versions were missing; retrying again", e);
}
@@ -283,7 +283,7 @@ public abstract class DistributedCacheOperation {
DistributedRegion region = getRegion();
if (viewVersion != -1) {
region.getDistributionAdvisor().endOperation(viewVersion);
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.STATE_FLUSH_OP, "done dispatching operation in view version {}",
viewVersion);
}
@@ -317,7 +317,7 @@ public abstract class DistributedCacheOperation {
if (SLOW_DISTRIBUTION_MS > 0) { // test hook
try {
Thread.sleep(SLOW_DISTRIBUTION_MS);
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
SLOW_DISTRIBUTION_MS = 0;
@@ -335,15 +335,15 @@ public abstract class DistributedCacheOperation {
}
// some members requiring old value are also in the cache op recipients set
- Set needsOldValueInCacheOp = Collections.EMPTY_SET;
+ Set needsOldValueInCacheOp = Collections.emptySet();
// set client routing information into the event
boolean routingComputed = false;
FilterRoutingInfo filterRouting = null;
// recipients that will get a cacheop msg and also a PR message
- Set twoMessages = Collections.EMPTY_SET;
+ Set twoMessages = Collections.emptySet();
if (region.isUsedForPartitionedRegionBucket()) {
- twoMessages = ((BucketRegion) region).getBucketAdvisor().adviseRequiresTwoMessages();
+ twoMessages = ((Bucket) region).getBucketAdvisor().adviseRequiresTwoMessages();
routingComputed = true;
filterRouting = getRecipientFilterRouting(recipients);
if (filterRouting != null) {
@@ -355,7 +355,7 @@ public abstract class DistributedCacheOperation {
// some members need PR notification of the change for client/wan
// notification
- Set adjunctRecipients = Collections.EMPTY_SET;
+ Set adjunctRecipients = Collections.emptySet();
// Partitioned region listener notification messages piggyback on this
// operation's replyprocessor and need to be sent at the same time as
@@ -377,20 +377,17 @@ public abstract class DistributedCacheOperation {
recipients.removeAll(needsOldValueInCacheOp);
}
- Set cachelessNodes = Collections.EMPTY_SET;
- Set adviseCacheServers = Collections.EMPTY_SET;
- Set<InternalDistributedMember> cachelessNodesWithNoCacheServer =
- new HashSet<InternalDistributedMember>();
+ Set cachelessNodes = Collections.emptySet();
+ Set adviseCacheServers;
+ Set<InternalDistributedMember> cachelessNodesWithNoCacheServer = new HashSet<>();
if (region.getDistributionConfig().getDeltaPropagation() && this.supportsDeltaPropagation()) {
cachelessNodes = region.getCacheDistributionAdvisor().adviseEmptys();
if (!cachelessNodes.isEmpty()) {
List list = new ArrayList(cachelessNodes);
for (Object member : cachelessNodes) {
- if (!recipients.contains(member)) {
+ if (!recipients.contains(member) || adjunctRecipients.contains(member)) {
// Don't include those originally excluded.
list.remove(member);
- } else if (adjunctRecipients.contains(member)) {
- list.remove(member);
}
}
cachelessNodes.clear();
@@ -421,10 +418,10 @@ public abstract class DistributedCacheOperation {
if (!reliableOp || region.isNoDistributionOk()) {
// nothing needs be done in this case
} else {
- region.handleReliableDistribution(Collections.EMPTY_SET);
+ region.handleReliableDistribution(Collections.emptySet());
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
this.event.setLocalFilterInfo(filterInfo);
@@ -433,7 +430,7 @@ public abstract class DistributedCacheOperation {
} else {
boolean directAck = false;
boolean useMulticast = region.getMulticastEnabled()
- && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();;
+ && region.getSystem().getConfig().getMcastPort() != 0 && this.supportsMulticast();
boolean shouldAck = shouldAck();
if (shouldAck) {
@@ -491,7 +488,7 @@ public abstract class DistributedCacheOperation {
recipients);
}
waitForMembers.removeAll(recipients);
- recipients = Collections.EMPTY_SET;
+ recipients = Collections.emptySet();
}
}
if (reliableOp) {
@@ -625,7 +622,7 @@ public abstract class DistributedCacheOperation {
}
adjunctRecipientsWithNoCacheServer.addAll(adjunctRecipients);
- adviseCacheServers = ((BucketRegion) region).getPartitionedRegion()
+ adviseCacheServers = ((Bucket) region).getPartitionedRegion()
.getCacheDistributionAdvisor().adviseCacheServers();
adjunctRecipientsWithNoCacheServer.removeAll(adviseCacheServers);
@@ -652,7 +649,7 @@ public abstract class DistributedCacheOperation {
}
}
- /** compute local client routing before waiting for an ack only for a bucket */
+ // compute local client routing before waiting for an ack only for a bucket
if (region.isUsedForPartitionedRegionBucket()) {
FilterInfo filterInfo = getLocalFilterRouting(filterRouting);
event.setLocalFilterInfo(filterInfo);
@@ -693,7 +690,6 @@ public abstract class DistributedCacheOperation {
}
}
-
/**
* Cleanup destroyed events in CQ result cache for remote CQs. While maintaining the CQ results
* key caching. the destroy event keys are marked as destroyed instead of removing them, this is
@@ -710,7 +706,7 @@ public abstract class DistributedCacheOperation {
continue;
}
- CacheProfile cf = (CacheProfile) ((BucketRegion) getRegion()).getPartitionedRegion()
+ CacheProfile cf = (CacheProfile) ((Bucket) getRegion()).getPartitionedRegion()
.getCacheDistributionAdvisor().getProfile(m);
if (cf == null || cf.filterProfile == null || cf.filterProfile.isLocalProfile()
@@ -718,7 +714,6 @@ public abstract class DistributedCacheOperation {
continue;
}
-
for (Object value : cf.filterProfile.getCqMap().values()) {
ServerCQ cq = (ServerCQ) value;
@@ -726,16 +721,14 @@ public abstract class DistributedCacheOperation {
Long cqID = e.getKey();
// For the CQs satisfying the event with destroy CQEvent, remove
// the entry form CQ cache.
- if (cq.getFilterID() == cqID
- && (e.getValue().equals(Integer.valueOf(MessageType.LOCAL_DESTROY)))) {
- cq.removeFromCqResultKeys(((EntryEventImpl) event).getKey(), true);
+ if (cq.getFilterID() == cqID && (e.getValue().equals(MessageType.LOCAL_DESTROY))) {
+ cq.removeFromCqResultKeys(((EntryOperation) event).getKey(), true);
}
}
}
}
}
-
/**
* Get the adjunct receivers for a partitioned region operation
*
@@ -752,9 +745,6 @@ public abstract class DistributedCacheOperation {
/**
* perform any operation-specific initialization on the given reply processor
- *
- * @param p
- * @param msg
*/
protected void initProcessor(CacheOperationReplyProcessor p, CacheOperationMessage msg) {
// nothing to do here - see UpdateMessage
@@ -783,9 +773,6 @@ public abstract class DistributedCacheOperation {
}
}
- /**
- * @param closedMembers
- */
private void handleClosedMembers(Set<InternalDistributedMember> closedMembers,
Map<InternalDistributedMember, PersistentMemberID> persistentIds) {
if (persistentIds == null) {
@@ -837,11 +824,7 @@ public abstract class DistributedCacheOperation {
return null;
}
CacheDistributionAdvisor advisor;
- // if (region.isUsedForPartitionedRegionBucket()) {
- advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
- // } else {
- // advisor = ((DistributedRegion)region).getCacheDistributionAdvisor();
- // }
+ advisor = region.getPartitionedRegion().getCacheDistributionAdvisor();
return advisor.adviseFilterRouting(this.event, cacheOpRecipients);
}
@@ -915,7 +898,6 @@ public abstract class DistributedCacheOperation {
protected final static short PERSISTENT_TAG_MASK = (VERSION_TAG_MASK << 1);
protected final static short UNRESERVED_FLAGS_START = (PERSISTENT_TAG_MASK << 1);
-
private final static int INHIBIT_NOTIFICATIONS_MASK = 0x400;
public boolean needsRouting;
@@ -959,7 +941,6 @@ public abstract class DistributedCacheOperation {
return this.op;
}
-
/** sets the concurrency versioning tag for this message */
public void setVersionTag(VersionTag tag) {
this.versionTag = tag;
@@ -1001,8 +982,6 @@ public abstract class DistributedCacheOperation {
/**
* process a reply
*
- * @param reply
- * @param processor
* @return true if the reply-processor should continue to process this response
*/
boolean processReply(ReplyMessage reply, CacheOperationReplyProcessor processor) {
@@ -1019,13 +998,11 @@ public abstract class DistributedCacheOperation {
* @param event the entry event that contains the old value
*/
public void appendOldValueToMessage(EntryEventImpl event) {
- {
- @Unretained
- Object val = event.getRawOldValue();
- if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
- || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
- return;
- }
+ @Unretained
+ Object val = event.getRawOldValue();
+ if (val == null || val == Token.NOT_AVAILABLE || val == Token.REMOVED_PHASE1
+ || val == Token.REMOVED_PHASE2 || val == Token.DESTROYED || val == Token.TOMBSTONE) {
+ return;
}
event.exportOldValue(this);
}
@@ -1086,7 +1063,7 @@ public abstract class DistributedCacheOperation {
protected LocalRegion getLocalRegionForProcessing(DistributionManager dm) {
Assert.assertTrue(this.regionPath != null, "regionPath was null");
- GemFireCacheImpl gfc = (GemFireCacheImpl) CacheFactory.getInstance(dm.getSystem());
+ InternalCache gfc = (InternalCache) CacheFactory.getInstance(dm.getSystem());
return gfc.getRegionByPathForProcessing(this.regionPath);
}
@@ -1112,7 +1089,7 @@ public abstract class DistributedCacheOperation {
final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
sendReply = false;
basicProcess(dm, lclRgn);
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1203,7 +1180,7 @@ public abstract class DistributedCacheOperation {
// region
if (!rgn.isEventTrackerInitialized()
&& (rgn.getDataPolicy().withReplication() || rgn.getDataPolicy().withPreloaded())) {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.DM_BRIDGE_SERVER, "Ignoring possible duplicate event");
}
return;
@@ -1213,15 +1190,15 @@ public abstract class DistributedCacheOperation {
sendReply = operateOnRegion(event, dm) && sendReply;
} finally {
if (event instanceof EntryEventImpl) {
- ((EntryEventImpl) event).release();
+ ((Releasable) event).release();
}
}
- } catch (RegionDestroyedException e) {
+ } catch (RegionDestroyedException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Region destroyed: nothing to do", this);
}
- } catch (CancelException e) {
+ } catch (CancelException ignore) {
this.closed = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Cancelled: nothing to do", this);
@@ -1231,7 +1208,7 @@ public abstract class DistributedCacheOperation {
if (!lclRgn.isDestroyed()) {
logger.error("Got disk access exception, expected region to be destroyed", e);
}
- } catch (EntryNotFoundException e) {
+ } catch (EntryNotFoundException ignore) {
this.appliedOperation = true;
if (logger.isDebugEnabled()) {
logger.debug("{} Entry not found, nothing to do", this);
@@ -1275,8 +1252,7 @@ public abstract class DistributedCacheOperation {
if (pId == 0 && (dm instanceof DM) && !this.directAck) {// Fix for #41871
// distributed-no-ack message. Don't respond
} else {
- ReplyException exception = rex;
- ReplyMessage.send(recipient, pId, exception, dm, !this.appliedOperation, this.closed, false,
+ ReplyMessage.send(recipient, pId, rex, dm, !this.appliedOperation, this.closed, false,
isInternal());
}
}
@@ -1312,9 +1288,6 @@ public abstract class DistributedCacheOperation {
* When an event is discarded because of an attempt to overwrite a more recent change we still
* need to deliver that event to clients. Clients can then perform their own concurrency checks
* on the event.
- *
- * @param rgn
- * @param ev
*/
protected void dispatchElidedEvent(LocalRegion rgn, EntryEventImpl ev) {
if (logger.isDebugEnabled()) {
@@ -1325,11 +1298,6 @@ public abstract class DistributedCacheOperation {
rgn.notifyBridgeClients(ev);
}
- // protected LocalRegion getRegionFromPath(InternalDistributedSystem sys,
- // String path) {
- // return LocalRegion.getRegionFromPath(sys, path);
- // }
-
protected abstract InternalCacheEvent createEvent(DistributedRegion rgn)
throws EntryNotFoundException;
@@ -1380,7 +1348,6 @@ public abstract class DistributedCacheOperation {
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- // super.fromData(in);
short bits = in.readShort();
short extBits = in.readShort();
this.flags = bits;
@@ -1424,8 +1391,6 @@ public abstract class DistributedCacheOperation {
@Override
public void toData(DataOutput out) throws IOException {
- // super.toData(out);
-
short bits = 0;
short extendedBits = 0;
bits = computeCompressedShort(bits);
@@ -1611,8 +1576,7 @@ public abstract class DistributedCacheOperation {
static class CacheOperationReplyProcessor extends DirectReplyProcessor {
public CacheOperationMessage msg;
- public CopyOnWriteHashSet<InternalDistributedMember> closedMembers =
- new CopyOnWriteHashSet<InternalDistributedMember>();
+ public CopyOnWriteHashSet<InternalDistributedMember> closedMembers = new CopyOnWriteHashSet<>();
public CacheOperationReplyProcessor(InternalDistributedSystem system, Collection initMembers) {
super(system, initMembers);