You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/04 18:36:39 UTC
geode git commit: Fixup DistTX classes
Repository: geode
Updated Branches:
refs/heads/feature/GEODE-2632-11 24786c82c -> 0c36d5f8e
Fixup DistTX classes
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/0c36d5f8
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/0c36d5f8
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/0c36d5f8
Branch: refs/heads/feature/GEODE-2632-11
Commit: 0c36d5f8e186cea35b162cc834239761a3909983
Parents: 24786c8
Author: Kirk Lund <kl...@apache.org>
Authored: Thu May 4 11:36:19 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Thu May 4 11:36:19 2017 -0700
----------------------------------------------------------------------
.../internal/cache/DistTXPrecommitMessage.java | 48 ++---
.../DistTXStateProxyImplOnCoordinator.java | 212 +++++++++----------
2 files changed, 130 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/0c36d5f8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
index 96885bb..0ab2cc3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXPrecommitMessage.java
@@ -65,7 +65,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
public DistTXPrecommitMessage() {}
public DistTXPrecommitMessage(TXId txUniqId, InternalDistributedMember onBehalfOfClientMember,
- ReplyProcessor21 processor) {
+ ReplyProcessor21 processor) {
super(txUniqId.getUniqId(), onBehalfOfClientMember, processor);
}
@@ -81,7 +81,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
if (logger.isDebugEnabled()) {
logger.debug("DistTXPrecommitMessage.operateOnTx: Tx {} with Secondaries List {}", txId,
- this.secondaryTransactionalOperations);
+ this.secondaryTransactionalOperations);
}
// should not be commited before
@@ -91,7 +91,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
final TXStateProxy txStateProxy = txMgr.getTXState();
boolean precommitSuccess = true;
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap =
- new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+ new TreeMap<String, ArrayList<DistTxThinEntryState>>();
// [DISTTX] TODO - Test valid scenarios of null txState
// if no TXState was created (e.g. due to only getEntry/size operations
// that don't start remote TX) then ignore
@@ -99,8 +99,8 @@ public final class DistTXPrecommitMessage extends TXMessage {
if (!txStateProxy.isDistTx() || !txStateProxy.isTxStateProxy()
|| txStateProxy.isCreatedOnDistTxCoordinator()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateProxyImplOnDatanode",
- txStateProxy.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateProxyImplOnDatanode",
+ txStateProxy.getClass().getSimpleName()));
}
((DistTXStateProxyImplOnDatanode) txStateProxy).setPreCommitMessage(this);
@@ -114,28 +114,28 @@ public final class DistTXPrecommitMessage extends TXMessage {
precommitSuccess = ((DistTXStateProxyImplOnDatanode) txStateProxy).getPreCommitResponse();
if (precommitSuccess) {
precommitSuccess = ((DistTXStateProxyImplOnDatanode) txStateProxy)
- .populateDistTxEntryStateList(entryStateSortedMap);
+ .populateDistTxEntryStateList(entryStateSortedMap);
if (!precommitSuccess) {
entryStateSortedMap.clear();
if (logger.isDebugEnabled()) {
logger.debug("DistTXPrecommitMessage.operateOnTx: Tx {} Failed while creating response",
- txId);
+ txId);
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXPrecommitMessage.operateOnTx: Tx {} Failed while applying changes for replicates",
- txId);
+ "DistTXPrecommitMessage.operateOnTx: Tx {} Failed while applying changes for replicates",
+ txId);
}
}
}
// Send Response : Send false if conflict
DistTxPrecommitResponse finalResponse = new DistTxPrecommitResponse(precommitSuccess,
- new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values()));
+ new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values()));
DistTXPrecommitReplyMessage.send(getSender(), getProcessorId(), finalResponse,
- getReplySender(dm));
+ getReplySender(dm));
// return false so there isn't another reply
return false;
@@ -168,7 +168,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
}
public void setSecondaryTransactionalOperations(
- ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) {
+ ArrayList<DistTxEntryEvent> secondaryTransactionalOperations) {
this.secondaryTransactionalOperations = secondaryTransactionalOperations;
}
@@ -209,7 +209,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
* @param replySender distribution manager used to send the reply
*/
public static void send(InternalDistributedMember recipient, int processorId,
- DistTxPrecommitResponse val, ReplySender replySender) throws RemoteOperationException {
+ DistTxPrecommitResponse val, ReplySender replySender) throws RemoteOperationException {
Assert.assertTrue(recipient != null, "DistTXPhaseOneCommitReplyMessage NULL reply message");
DistTXPrecommitReplyMessage m = new DistTXPrecommitReplyMessage(processorId, val);
m.setRecipient(recipient);
@@ -226,8 +226,8 @@ public final class DistTXPrecommitMessage extends TXMessage {
final long startTime = getTimestamp();
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM,
- "DistTXPhaseOneCommitReplyMessage process invoking reply processor with processorId:{}",
- this.processorId);
+ "DistTXPhaseOneCommitReplyMessage process invoking reply processor with processorId:{}",
+ this.processorId);
}
if (processor == null) {
@@ -260,7 +260,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("DistTXPhaseOneCommitReplyMessage").append("processorid=").append(this.processorId)
- .append(" reply to sender ").append(this.getSender());
+ .append(" reply to sender ").append(this.getSender());
return sb.toString();
}
@@ -283,12 +283,12 @@ public final class DistTXPrecommitMessage extends TXMessage {
private transient TXId txIdent = null;
public DistTxPrecommitReplyProcessor(TXId txUniqId, DM dm, Set initMembers,
- HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+ HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
super(dm, initMembers);
this.msgMap = msgMap;
// [DISTTX] TODO Do we need synchronised map?
this.commitResponseMap =
- Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPrecommitResponse>());
+ Collections.synchronizedMap(new HashMap<DistributedMember, DistTxPrecommitResponse>());
this.txIdent = txUniqId;
}
@@ -318,7 +318,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
this.exception = new DistTxPrecommitExceptionCollectingException(txIdent);
}
DistTxPrecommitExceptionCollectingException cce =
- (DistTxPrecommitExceptionCollectingException) this.exception;
+ (DistTxPrecommitExceptionCollectingException) this.exception;
if (ex instanceof CommitReplyException) {
CommitReplyException cre = (CommitReplyException) ex;
cce.addExceptionsFromMember(msg.getSender(), cre.getExceptions());
@@ -337,7 +337,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
public Set getCacheClosedMembers() {
if (this.exception != null) {
DistTxPrecommitExceptionCollectingException cce =
- (DistTxPrecommitExceptionCollectingException) this.exception;
+ (DistTxPrecommitExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
return Collections.EMPTY_SET;
@@ -347,7 +347,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
public Set getRegionDestroyedMembers(String regionFullPath) {
if (this.exception != null) {
DistTxPrecommitExceptionCollectingException cce =
- (DistTxPrecommitExceptionCollectingException) this.exception;
+ (DistTxPrecommitExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
return Collections.EMPTY_SET;
@@ -392,10 +392,10 @@ public final class DistTXPrecommitMessage extends TXMessage {
* @param msgMap
*/
public void handlePotentialCommitFailure(
- HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
+ HashMap<DistributedMember, DistTXCoordinatorInterface> msgMap) {
if (fatalExceptions.size() > 0) {
StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ .append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -473,7 +473,7 @@ public final class DistTXPrecommitMessage extends TXMessage {
public DistTxPrecommitResponse() {}
public DistTxPrecommitResponse(boolean precommitSuccess,
- ArrayList<ArrayList<DistTxThinEntryState>> eventList) {
+ ArrayList<ArrayList<DistTxThinEntryState>> eventList) {
this.commitState = precommitSuccess;
this.distTxEventList = eventList;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0c36d5f8/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
index 5f8d238..aa40508 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistTXStateProxyImplOnCoordinator.java
@@ -49,13 +49,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
* {@link DistTXStateOnCoordinator} (in case of TX coordinator is also a data node)
*/
protected HashMap<DistributedMember, DistTXCoordinatorInterface> target2realDeals =
- new HashMap<>();
+ new HashMap<>();
private HashMap<LocalRegion, DistributedMember> rrTargets;
private Set<DistributedMember> txRemoteParticpants = null; // other than local
protected HashMap<String, ArrayList<DistTxThinEntryState>> txEntryEventMap = null;
public DistTXStateProxyImplOnCoordinator(TXManagerImpl managerImpl, TXId id,
- InternalDistributedMember clientMember) {
+ InternalDistributedMember clientMember) {
super(managerImpl, id, clientMember);
}
@@ -79,12 +79,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create a map of secondary(for PR) / replica(for RR) to stubs to send
// commit message to those
HashMap<DistributedMember, DistTXCoordinatorInterface> otherTargets2realDeals =
- getSecondariesAndReplicasForTxOps();
+ getSecondariesAndReplicasForTxOps();
// add it to the existing map and then send commit to all copies
target2realDeals.putAll(otherTargets2realDeals);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
+ "DistTXStateProxyImplOnCoordinator.commit target2realDeals = " + target2realDeals);
}
precommitResult = doPrecommit();
@@ -95,17 +95,17 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
boolean phase2commitDone = doCommit();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.commit Commit "
- + (phase2commitDone ? "Done" : "Failed"));
+ + (phase2commitDone ? "Done" : "Failed"));
}
// [DISTTX] TODO Handle this exception well
if (!phase2commitDone) {
throw new TransactionInDoubtException(
- LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
+ LocalizedStrings.ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
+ "DistTXStateProxyImplOnCoordinator.commit precommitResult = " + precommitResult);
}
}
} catch (CommitConflictException e) {
@@ -133,30 +133,30 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
private HashMap<DistributedMember, DistTXCoordinatorInterface> getSecondariesAndReplicasForTxOps() {
final GemFireCacheImpl cache =
- GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
+ GemFireCacheImpl.getExisting("getSecondariesAndReplicasForTxOps");
InternalDistributedMember currentNode =
- cache.getInternalDistributedSystem().getDistributedMember();
+ cache.getInternalDistributedSystem().getDistributedMember();
HashMap<DistributedMember, DistTXCoordinatorInterface> secondaryTarget2realDeals =
- new HashMap<>();
+ new HashMap<>();
for (Entry<DistributedMember, DistTXCoordinatorInterface> e : target2realDeals.entrySet()) {
DistributedMember originalTarget = e.getKey();
DistTXCoordinatorInterface distPeerTxStateStub = e.getValue();
ArrayList<DistTxEntryEvent> primaryTxOps =
- distPeerTxStateStub.getPrimaryTransactionalOperations();
+ distPeerTxStateStub.getPrimaryTransactionalOperations();
for (DistTxEntryEvent dtop : primaryTxOps) {
LocalRegion lr = dtop.getRegion();
// replicas or secondaries
Set<InternalDistributedMember> otherNodes = null;
if (lr instanceof PartitionedRegion) {
Set<InternalDistributedMember> allNodes = ((PartitionedRegion) dtop.getRegion())
- .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
+ .getRegionAdvisor().getBucketOwners(dtop.getKeyInfo().getBucketId());
allNodes.remove(originalTarget);
otherNodes = allNodes;
} else if (lr instanceof DistributedRegion) {
otherNodes =
- ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
+ ((DistributedRegion) lr).getCacheDistributionAdvisor().adviseInitializedReplicates();
otherNodes.remove(originalTarget);
}
@@ -204,20 +204,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and rollback message
DistTXRollbackMessage.DistTxRollbackReplyProcessor processor =
- new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
- txRemoteParticpants, target2realDeals);
+ new DistTXRollbackMessage.DistTxRollbackReplyProcessor(this.getTxId(), dm,
+ txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXRollbackMessage rollbackMsg =
- new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXRollbackMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send rollback message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setRollbackMessage(rollbackMsg, dm);
@@ -236,14 +236,14 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
localTXState.rollback();
boolean localResult = localTXState.getRollbackResponse();
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.rollback local = " + dm.getId()
- + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
+ + " ,result= " + localResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
@@ -280,7 +280,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
Boolean remoteResult = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug("DistTXStateProxyImplOnCoordinator.rollback target = " + target
- + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
+ + " ,result= " + remoteResult + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResult;
}
@@ -339,8 +339,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (r.scope.isDistributed()) {
if (txDistributedClientWarningIssued.compareAndSet(false, true)) {
logger.warn(LocalizedMessage.create(
- LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
- r.getFullPath()));
+ LocalizedStrings.TXStateProxyImpl_Distributed_Region_In_Client_TX,
+ r.getFullPath()));
}
}
} else {
@@ -354,20 +354,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
- this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
+ "DistTXStateProxyImplOnCoordinator::getRealDeal Built a new TXState: {} txMge:{} proxy {} target {}",
+ this.realDeal, this.txMgr.getDM().getId(), this, target, new Throwable());
}
target2realDeals.put(target, (DistTXCoordinatorInterface) realDeal);
if (logger.isDebugEnabled()) {
logger
- .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
- + target2realDeals);
+ .debug("DistTXStateProxyImplOnCoordinator.getRealDeal added TxState target2realDeals = "
+ + target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
- this.realDeal, this, target, target2realDeals);
+ "DistTXStateProxyImplOnCoordinator::getRealDeal Found TXState: {} proxy {} target {} target2realDeals {}",
+ this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
@@ -382,25 +382,25 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
this.realDeal = new DistPeerTXStateStub(this, target, onBehalfOfClientMember);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
- this.realDeal, this.txMgr.getDM().getId());
+ "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Built a new TXState: {} me:{}",
+ this.realDeal, this.txMgr.getDM().getId());
}
if (!this.realDeal.isDistTx() || this.realDeal.isCreatedOnDistTxCoordinator()
|| !this.realDeal.isTxState()) {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
- .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
+ .toLocalizedString("DistPeerTXStateStub", this.realDeal.getClass().getSimpleName()));
}
target2realDeals.put(target, (DistPeerTXStateStub) realDeal);
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
- + target2realDeals);
+ "DistTXStateProxyImplOnCoordinator.getRealDeal(t) added TxState target2realDeals = "
+ + target2realDeals);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
- this.realDeal, this, target, target2realDeals);
+ "DistTXStateProxyImplOnCoordinator::getRealDeal(t) Found TXState: {} proxy {} target {} target2realDeals {}",
+ this.realDeal, this, target, target2realDeals);
}
}
return this.realDeal;
@@ -429,8 +429,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
this.txRemoteParticpants.remove(dm.getId());
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit txParticpants = "
- + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
- + dm.getId());
+ + txParticpants + " ,txRemoteParticpants=" + this.txRemoteParticpants + " ,originator="
+ + dm.getId());
}
}
return txRemoteParticpants;
@@ -444,20 +444,20 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and precommit message
DistTXPrecommitMessage.DistTxPrecommitReplyProcessor processor =
- new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
- txRemoteParticpants, target2realDeals);
+ new DistTXPrecommitMessage.DistTxPrecommitReplyProcessor(this.getTxId(), dm,
+ txRemoteParticpants, target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXPrecommitMessage precommitMsg =
- new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXPrecommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send precommit message to remote nodes
for (DistributedMember remoteNode : txRemoteParticpants) {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
remoteTXStateStub.setPrecommitMessage(precommitMsg, dm);
@@ -467,7 +467,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
+ "DistTXStateProxyImplOnCoordinator.doPrecommit Sent Message to target = " + remoteNode);
}
}
@@ -477,29 +477,29 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
localTXState.precommit();
boolean localResult = localTXState.getPreCommitResponse();
TreeMap<String, ArrayList<DistTxThinEntryState>> entryStateSortedMap =
- new TreeMap<String, ArrayList<DistTxThinEntryState>>();
+ new TreeMap<String, ArrayList<DistTxThinEntryState>>();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = null;
if (localResult) {
localResult = ((DistTXStateOnCoordinator) localTXState)
- .populateDistTxEntryStateList(entryStateSortedMap);
+ .populateDistTxEntryStateList(entryStateSortedMap);
if (localResult) {
entryEventList =
- new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
+ new ArrayList<ArrayList<DistTxThinEntryState>>(entryStateSortedMap.values());
populateEntryEventMap(dm.getId(), entryEventList, sortedRegionName);
}
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit local = " + dm.getId()
- + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
- + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
- + " ,finalResult-old= " + finalResult);
+ + " ,entryEventList=" + printEntryEventList(entryEventList) + " ,txRegionVersionsMap="
+ + printEntryEventMap(this.txEntryEventMap) + " ,result= " + localResult
+ + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && localResult;
}
@@ -531,19 +531,19 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// dm.getStats().incCommitWaits();
Map<DistributedMember, DistTxPrecommitResponse> remoteResults =
- processor.getCommitResponseMap();
+ processor.getCommitResponseMap();
for (Entry<DistributedMember, DistTxPrecommitResponse> e : remoteResults.entrySet()) {
DistributedMember target = e.getKey();
DistTxPrecommitResponse remoteResponse = e.getValue();
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList =
- remoteResponse.getDistTxEntryEventList();
+ remoteResponse.getDistTxEntryEventList();
populateEntryEventMap(target, entryEventList, sortedRegionName);
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doPrecommit got reply from target = "
- + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
- + printEntryEventList(entryEventList) + " ,txEntryEventMap="
- + printEntryEventMap(this.txEntryEventMap) + " ,result= "
- + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
+ + target + " ,sortedRegions" + sortedRegionName + " ,entryEventList="
+ + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ + remoteResponse.getCommitState() + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResponse.getCommitState();
}
@@ -570,7 +570,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
* Go over list of region versions for this target and fill map
*/
private void populateEntryEventMap(DistributedMember target,
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionName) {
if (this.txEntryEventMap == null) {
this.txEntryEventMap = new HashMap<String, ArrayList<DistTxThinEntryState>>();
}
@@ -583,10 +583,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (sortedRegionName.size() != entryEventList.size()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
- "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
- + " for target=" + target,
- entryEventList.size() + " {" + entryEventList + "}"));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString(
+ "size of " + sortedRegionName.size() + " {" + sortedRegionName + "}"
+ + " for target=" + target,
+ entryEventList.size() + " {" + entryEventList + "}"));
}
int index = 0;
@@ -601,7 +601,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
* Populate list of regions for this target, while sending commit messages
*/
private void populateEntryEventList(DistributedMember target,
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList, TreeSet<String> sortedRegionMap) {
DistTXCoordinatorInterface distTxItem = target2realDeals.get(target);
sortedRegionMap.clear();
distTxItem.gatherAffectedRegionsName(sortedRegionMap, false, true);
@@ -612,7 +612,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
ArrayList<DistTxThinEntryState> entryStates = this.txEntryEventMap.get(rName);
if (entryStates == null) {
throw new UnsupportedOperationInTransactionException(LocalizedStrings.DISTTX_TX_EXPECTED
- .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
+ .toLocalizedString("entryStates for " + rName + " at target " + target, "null"));
}
entryEventList.add(entryStates);
}
@@ -631,12 +631,12 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// create processor and commit message
DistTXCommitMessage.DistTxCommitReplyProcessor processor =
- new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
- target2realDeals);
+ new DistTXCommitMessage.DistTxCommitReplyProcessor(this.getTxId(), dm, txRemoteParticpants,
+ target2realDeals);
// TODO [DISTTX} whats ack threshold?
processor.enableSevereAlertProcessing();
final DistTXCommitMessage commitMsg =
- new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
+ new DistTXCommitMessage(this.getTxId(), this.onBehalfOfClientMember, processor);
// send commit message to remote nodes
ArrayList<ArrayList<DistTxThinEntryState>> entryEventList = new ArrayList<>();
@@ -645,8 +645,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
DistTXCoordinatorInterface remoteTXStateStub = target2realDeals.get(remoteNode);
if (remoteTXStateStub.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
- remoteTXStateStub.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistPeerTXStateStub",
+ remoteTXStateStub.getClass().getSimpleName()));
}
try {
populateEntryEventList(remoteNode, entryEventList, sortedRegionName);
@@ -659,9 +659,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
if (logger.isDebugEnabled()) {
logger.debug("DistTXStateProxyImplOnCoordinator.doCommit Sent Message target = "
- + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
- + printEntryEventList(entryEventList) + " ,txEntryEventMap="
- + printEntryEventMap(this.txEntryEventMap));
+ + remoteNode + " ,sortedRegions=" + sortedRegionName + " ,entryEventList="
+ + printEntryEventList(entryEventList) + " ,txEntryEventMap="
+ + printEntryEventMap(this.txEntryEventMap));
}
}
@@ -670,8 +670,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (localTXState != null) {
if (!localTXState.isTxState()) {
throw new UnsupportedOperationInTransactionException(
- LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
- localTXState.getClass().getSimpleName()));
+ LocalizedStrings.DISTTX_TX_EXPECTED.toLocalizedString("DistTXStateOnCoordinator",
+ localTXState.getClass().getSimpleName()));
}
populateEntryEventList(dm.getId(), entryEventList, sortedRegionName);
((DistTXStateOnCoordinator) localTXState).setDistTxEntryStates(entryEventList);
@@ -679,10 +679,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
TXCommitMessage localResultMsg = localTXState.getCommitMessage();
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
- + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
- + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
- + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
+ "DistTXStateProxyImplOnCoordinator.doCommit local = " + dm.getId() + " ,sortedRegions="
+ + sortedRegionName + " ,entryEventList=" + printEntryEventList(entryEventList)
+ + " ,txEntryEventMap=" + printEntryEventMap(this.txEntryEventMap) + " ,result= "
+ + (localResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && (localResultMsg != null);
}
@@ -719,8 +719,8 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
TXCommitMessage remoteResultMsg = e.getValue();
if (logger.isDebugEnabled()) { // TODO - make this trace level
logger.debug(
- "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
- + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
+ "DistTXStateProxyImplOnCoordinator.doCommit got results from target = " + target
+ + " ,result= " + (remoteResultMsg != null) + " ,finalResult-old= " + finalResult);
}
finalResult = finalResult && remoteResultMsg != null;
}
@@ -748,7 +748,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
@Override
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
- LocalRegion region) {
+ LocalRegion region) {
if (putallOp.putAllData.length == 0) {
return;
}
@@ -760,18 +760,18 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postPutAll "
- + "processing putAll op for region {}, size of putAllOp " + "is {}",
- region, putallOp.putAllData.length);
+ "DistTXStateProxyImplOnCoordinator.postPutAll "
+ + "processing putAll op for region {}, size of putAllOp " + "is {}",
+ region, putallOp.putAllData.length);
}
// map of bucketId to putall op for this bucket
HashMap<Integer, DistributedPutAllOperation> bucketToPutallMap =
- new HashMap<Integer, DistributedPutAllOperation>();
+ new HashMap<Integer, DistributedPutAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
- new HashMap<Integer, DistTXCoordinatorInterface>();
+ new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the putall op per bucket
for (int i = 0; i < putallOp.putAllData.length; i++) {
@@ -783,10 +783,10 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (putAllForBucket == null) {
// TODO DISTTX: event is never released
EntryEventImpl event = EntryEventImpl.createPutAllEvent(null, region,
- Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
+ Operation.PUTALL_CREATE, key, putallOp.putAllData[i].getValue());
event.setEventId(putallOp.putAllData[i].getEventID());
putAllForBucket =
- new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
+ new DistributedPutAllOperation(event, putallOp.putAllDataSize, putallOp.isBridgeOp);
bucketToPutallMap.put(bucketId, putAllForBucket);
}
putallOp.putAllData[i].setFakeEventID();
@@ -811,9 +811,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postPutAll processing"
- + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
- bucketId, dtsi, putAllForBucket);
+ "DistTXStateProxyImplOnCoordinator.postPutAll processing"
+ + " putAll for ##bucketId = {}, ##txStateStub = {}, " + "##putAllOp = {}",
+ bucketId, dtsi, putAllForBucket);
}
dtsi.postPutAll(putAllForBucket, successfulPuts, region);
}
@@ -827,7 +827,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
*/
@Override
public void postRemoveAll(DistributedRemoveAllOperation op, VersionedObjectList successfulOps,
- LocalRegion region) {
+ LocalRegion region) {
if (op.removeAllData.length == 0) {
return;
}
@@ -838,17 +838,17 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
// #43651
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postRemoveAll "
- + "processing removeAll op for region {}, size of removeAll " + "is {}",
- region, op.removeAllDataSize);
+ "DistTXStateProxyImplOnCoordinator.postRemoveAll "
+ + "processing removeAll op for region {}, size of removeAll " + "is {}",
+ region, op.removeAllDataSize);
}
// map of bucketId to removeAll op for this bucket
HashMap<Integer, DistributedRemoveAllOperation> bucketToRemoveAllMap =
- new HashMap<Integer, DistributedRemoveAllOperation>();
+ new HashMap<Integer, DistributedRemoveAllOperation>();
// map of bucketId to TXStateStub for target that hosts this bucket
HashMap<Integer, DistTXCoordinatorInterface> bucketToTxStateStubMap =
- new HashMap<Integer, DistTXCoordinatorInterface>();
+ new HashMap<Integer, DistTXCoordinatorInterface>();
// separate the removeAll op per bucket
for (int i = 0; i < op.removeAllData.length; i++) {
@@ -862,7 +862,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
EntryEventImpl event = EntryEventImpl.createRemoveAllEvent(op, region, key);
event.setEventId(op.removeAllData[i].getEventID());
removeAllForBucket =
- new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
+ new DistributedRemoveAllOperation(event, op.removeAllDataSize, op.isBridgeOp);
bucketToRemoveAllMap.put(bucketId, removeAllForBucket);
}
op.removeAllData[i].setFakeEventID();
@@ -887,9 +887,9 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
if (logger.isDebugEnabled()) {
logger.debug(
- "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
- + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
- bucketId, dtsi, removeAllForBucket);
+ "DistTXStateProxyImplOnCoordinator.postRemoveAll processing"
+ + " removeAll for ##bucketId = {}, ##txStateStub = {}, " + "##removeAllOp = {}",
+ bucketId, dtsi, removeAllForBucket);
}
dtsi.postRemoveAll(removeAllForBucket, successfulOps, region);
}
@@ -903,13 +903,13 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
public static String printEntryEventMap(
- HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
+ HashMap<String, ArrayList<DistTxThinEntryState>> txRegionVersionsMap) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(txRegionVersionsMap.size());
str.append(")=[ ");
for (Map.Entry<String, ArrayList<DistTxThinEntryState>> entry : txRegionVersionsMap
- .entrySet()) {
+ .entrySet()) {
str.append(" {").append(entry.getKey());
str.append(":").append("size(").append(entry.getValue().size()).append(")");
str.append("=").append(entry.getValue()).append("}, ");
@@ -919,7 +919,7 @@ public class DistTXStateProxyImplOnCoordinator extends DistTXStateProxyImpl {
}
public static String printEntryEventList(
- ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
+ ArrayList<ArrayList<DistTxThinEntryState>> entryEventList) {
StringBuilder str = new StringBuilder();
str.append(" (");
str.append(entryEventList.size());