You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/10/16 11:50:48 UTC
[4/5] ignite git commit: ignite-3478 Support for optimistic
transactions
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 2e33889..4b1d846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -18,24 +18,45 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
+import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
/**
*
*/
public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearTxPrepareFutureAdapter {
+ /** */
+ private static final AtomicIntegerFieldUpdater<MvccVersionFuture> LOCK_CNT_UPD =
+ AtomicIntegerFieldUpdater.newUpdater(MvccVersionFuture.class, "lockCnt");
+
+ /** */
+ @GridToStringExclude
+ protected KeyLockFuture keyLockFut;
+
+ /** */
+ @GridToStringExclude
+ protected MvccVersionFuture mvccVerFut;
+
/**
* @param cctx Context.
* @param tx Transaction.
@@ -169,6 +190,29 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
protected abstract void prepare0(boolean remap, boolean topLocked);
/**
+ * @param mvccCrd
+ * @param lockCnt
+ * @param remap
+ */
+ final void initMvccVersionFuture(MvccCoordinator mvccCrd, int lockCnt, boolean remap) {
+ if (!remap) {
+ mvccVerFut = new MvccVersionFuture();
+
+ mvccVerFut.init(mvccCrd, lockCnt);
+
+ if (keyLockFut != null)
+ keyLockFut.listen(mvccVerFut);
+
+ add(mvccVerFut);
+ }
+ else {
+ assert mvccVerFut != null;
+
+ mvccVerFut.init(mvccCrd, lockCnt);
+ }
+ }
+
+ /**
* Keys lock future.
*/
protected static class KeyLockFuture extends GridFutureAdapter<Void> {
@@ -231,4 +275,86 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
return S.toString(KeyLockFuture.class, this, super.toString());
}
}
+
+ /**
+ *
+ */
+ class MvccVersionFuture extends GridFutureAdapter implements MvccResponseListener,
+ IgniteInClosure<IgniteInternalFuture<Void>> {
+ /** */
+ MvccCoordinator crd;
+
+ /** */
+ volatile int lockCnt;
+
+ @Override public void apply(IgniteInternalFuture<Void> keyLockFut) {
+ try {
+ keyLockFut.get();
+
+ onLockReceived();
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("MvccVersionFuture ignores key lock future failure: " + e);
+ }
+ }
+
+ /**
+ * @param crd Mvcc coordinator.
+ * @param lockCnt Expected number of lock responses.
+ */
+ void init(MvccCoordinator crd, int lockCnt) {
+ assert crd != null;
+ assert lockCnt > 0;
+
+ this.crd = crd;
+ this.lockCnt = lockCnt;
+
+ assert !isDone();
+ }
+
+ /**
+ *
+ */
+ void onLockReceived() {
+ int remaining = LOCK_CNT_UPD.decrementAndGet(this);
+
+ assert remaining >= 0 : remaining;
+
+ if (remaining == 0) {
+ // TODO IGNTIE-3478: add method to do not create one more future in requestTxCounter.
+ if (cctx.localNodeId().equals(crd.nodeId()))
+ onMvccResponse(crd.nodeId(), cctx.coordinators().requestTxCounterOnCoordinator(tx));
+ else
+ cctx.coordinators().requestTxCounter(crd, this, tx.nearXidVersion());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMvccResponse(UUID crdId, MvccCoordinatorVersion res) {
+ tx.mvccInfo(new TxMvccInfo(crdId, res));
+
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMvccError(IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+ ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+ }
+
+ ERR_UPD.compareAndSet(GridNearOptimisticTxPrepareFutureAdapter.this, null, e);
+
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "MvccVersionFuture [crd=" + crd.nodeId() +
+ ", lockCnt=" + lockCnt +
+ ", done=" + isDone() + ']';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a2aeb8..ef2c359 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping;
import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture;
import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.processors.cache.mvcc.MvccResponseListener;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
@@ -301,7 +302,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
mvccCrd = cacheCtx.affinity().mvccCoordinator(topVer);
if (mvccCrd == null) {
- onDone(new IgniteCheckedException("Mvcc coordinator is not assigned: " + topVer));
+ onDone(CacheCoordinatorsProcessor.noCoordinatorError(topVer));
return;
}
@@ -456,6 +457,12 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
/** {@inheritDoc} */
@Override public void onMvccError(IgniteCheckedException e) {
+ if (e instanceof ClusterTopologyCheckedException) {
+ IgniteInternalFuture<?> fut = cctx.nextAffinityReadyFuture(tx.topologyVersion());
+
+ ((ClusterTopologyCheckedException)e).retryReadyFuture(fut);
+ }
+
ERR_UPD.compareAndSet(GridNearPessimisticTxPrepareFuture.this, null, e);
}
@@ -492,12 +499,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
", loc=" + ((MiniFuture)f).primary().isLocal() +
", done=" + f.isDone() + "]";
}
- else if (f instanceof CacheCoordinatorsProcessor.MvccVersionFuture) {
- CacheCoordinatorsProcessor.MvccVersionFuture crdFut =
- (CacheCoordinatorsProcessor.MvccVersionFuture)f;
+ else if (f instanceof MvccCoordinatorFuture) {
+ MvccCoordinatorFuture crdFut = (MvccCoordinatorFuture)f;
- return "[mvccCrdNode=" + crdFut.crdId +
- ", loc=" + crdFut.crdId.equals(cctx.localNodeId()) +
+ return "[mvccCrdNode=" + crdFut.coordinatorNodeId() +
+ ", loc=" + crdFut.coordinatorNodeId().equals(cctx.localNodeId()) +
", done=" + f.isDone() + "]";
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index 00ff4bb..104a31a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -26,11 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
*
@@ -81,6 +83,9 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
/** TTL for read operation. */
private long accessTtl;
+ /** */
+ private MvccCoordinatorVersion mvccVer;
+
/**
* Empty constructor required for {@link Message}.
*/
@@ -103,6 +108,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
* @param addReader Add reader flag.
* @param needVer {@code True} if entry version is needed.
* @param addDepInfo Deployment info.
+ * @param mvccVer Mvcc version.
*/
public GridNearSingleGetRequest(
int cacheId,
@@ -118,7 +124,8 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
boolean addReader,
boolean needVer,
boolean addDepInfo,
- boolean recovery
+ boolean recovery,
+ MvccCoordinatorVersion mvccVer
) {
assert key != null;
@@ -131,6 +138,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
this.createTtl = createTtl;
this.accessTtl = accessTtl;
this.addDepInfo = addDepInfo;
+ this.mvccVer = mvccVer;
if (readThrough)
flags |= READ_THROUGH_FLAG_MASK;
@@ -149,6 +157,13 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
}
/**
+ * @return Mvcc version.
+ */
+ @Nullable public MvccCoordinatorVersion mvccVersion() {
+ return mvccVer;
+ }
+
+ /**
* @return Key.
*/
public KeyCacheObject key() {
@@ -322,7 +337,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 8:
- subjId = reader.readUuid("subjId");
+ mvccVer = reader.readMessage("mvccVer");
if (!reader.isLastRead())
return false;
@@ -330,7 +345,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 9:
- taskNameHash = reader.readInt("taskNameHash");
+ subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
return false;
@@ -338,6 +353,14 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
reader.incrementState();
case 10:
+ taskNameHash = reader.readInt("taskNameHash");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -396,18 +419,24 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
writer.incrementState();
case 8:
- if (!writer.writeUuid("subjId", subjId))
+ if (!writer.writeMessage("mvccVer", mvccVer))
return false;
writer.incrementState();
case 9:
- if (!writer.writeInt("taskNameHash", taskNameHash))
+ if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
case 10:
+ if (!writer.writeInt("taskNameHash", taskNameHash))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -430,7 +459,7 @@ public class GridNearSingleGetRequest extends GridCacheIdMessage implements Grid
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 12;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
index c24551b..36efe2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishAndAckFuture.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,12 +54,21 @@ public class GridNearTxFinishAndAckFuture extends GridFutureAdapter<IgniteIntern
@Override public void apply(final GridNearTxFinishFuture fut) {
GridNearTxLocal tx = fut.tx();
+ IgniteInternalFuture<Void> ackFut = null;
+
+ MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
TxMvccInfo mvccInfo = tx.mvccInfo();
- if (mvccInfo != null) {
- IgniteInternalFuture<Void> ackFut = fut.context().coordinators().ackTxCommit(
- mvccInfo.coordinator(), mvccInfo.version());
+ if (qryTracker != null)
+ ackFut = qryTracker.onTxDone(mvccInfo, fut.context(), true);
+ else if (mvccInfo != null) {
+ ackFut = fut.context().coordinators().ackTxCommit(mvccInfo.coordinatorNodeId(),
+ mvccInfo.version(),
+ null);
+ }
+ if (ackFut != null) {
ackFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> ackFut) {
Exception err = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9b60d7..14536e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishResponse;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.mvcc.TxMvccInfo;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
@@ -403,6 +405,20 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
fut.getClass() == CheckRemoteTxMiniFuture.class;
}
+ /**
+ *
+ */
+ private void ackMvccCoordinatorOnRollback() {
+ TxMvccInfo mvccInfo = tx.mvccInfo();
+
+ MvccQueryTracker qryTracker = tx.mvccQueryTracker();
+
+ if (qryTracker != null)
+ qryTracker.onTxDone(mvccInfo, cctx, false);
+ else if (mvccInfo != null)
+ cctx.coordinators().ackTxRollback(mvccInfo.coordinatorNodeId(), mvccInfo.version(), null);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("ForLoopReplaceableByForEach")
public void finish(boolean commit, boolean clearThreadMap) {
@@ -421,11 +437,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
return;
}
- if (!commit && tx.mvccInfo() != null) {
- TxMvccInfo mvccInfo = tx.mvccInfo();
-
- cctx.coordinators().ackTxRollback(mvccInfo.coordinator(), mvccInfo.version());
- }
+ if (!commit)
+ ackMvccCoordinatorOnRollback();
try {
if (tx.localFinish(commit, clearThreadMap) || (!commit && tx.state() == UNKNOWN)) {
@@ -436,7 +449,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
assert mvccInfo != null;
- IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinator(), waitTxs);
+ IgniteInternalFuture fut = cctx.coordinators().waitTxsFuture(mvccInfo.coordinatorNodeId(),
+ waitTxs);
add(fut);
}
@@ -445,7 +459,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
GridDistributedTxMapping mapping = mappings.singleMapping();
if (mapping != null) {
- assert !hasFutures() : futures();
+ assert !hasFutures() || waitTxs != null : futures();
finish(1, mapping, commit);
}
@@ -846,6 +860,11 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit
return "CheckRemoteTxMiniFuture[nodes=" + fut.nodes() + ", done=" + f.isDone() + "]";
}
+ else if (f instanceof MvccCoordinatorFuture) {
+ MvccCoordinatorFuture fut = (MvccCoordinatorFuture)f;
+
+ return "WaitPreviousTxsFut[mvccCrd=" + fut.coordinatorNodeId() + ", done=" + f.isDone() + "]";
+ }
else
return "[loc=true, done=" + f.isDone() + "]";
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index a1e37a1..6a59112 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -47,10 +47,10 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheVersionedFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
@@ -61,6 +61,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLoca
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorChangeAware;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -89,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiClosure;
+import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
@@ -119,7 +124,8 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
* Replicated user transaction.
*/
@SuppressWarnings("unchecked")
-public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject, AutoCloseable {
+public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeoutObject,
+ AutoCloseable, MvccCoordinatorChangeAware {
/** */
private static final long serialVersionUID = 0L;
@@ -169,6 +175,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
@GridToStringExclude
private TransactionProxyImpl proxy;
+ /** */
+ private MvccQueryTracker mvccTracker;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -230,6 +239,21 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
trackTimeout = cctx.time().addTimeoutObject(this);
}
+ /**
+ * @return Mvcc query version tracker.
+ */
+ MvccQueryTracker mvccQueryTracker() {
+ return mvccTracker;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) {
+ if (mvccTracker != null)
+ return mvccTracker.onMvccCoordinatorChange(newCrd);
+
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public boolean near() {
return true;
@@ -1653,6 +1677,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
}
/**
+ * @param cctx Cache context.
+ * @return Mvcc version for read inside tx (initialized once for OPTIMISTIC SERIALIZABLE and REPEATABLE_READ txs).
+ */
+ private MvccCoordinatorVersion mvccReadVersion(GridCacheContext cctx) {
+ if (!cctx.mvccEnabled() || mvccTracker == null)
+ return null;
+
+ return mvccTracker.mvccVersion();
+ }
+
+ /**
* @param cacheCtx Cache context.
* @param keys Keys to get.
* @param deserializeBinary Deserialize binary flag.
@@ -1665,7 +1700,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
public <K, V> IgniteInternalFuture<Map<K, V>> getAllAsync(
final GridCacheContext cacheCtx,
@Nullable final AffinityTopologyVersion entryTopVer,
- Collection<KeyCacheObject> keys,
+ final Collection<KeyCacheObject> keys,
final boolean deserializeBinary,
final boolean skipVals,
final boolean keepCacheObjects,
@@ -1677,6 +1712,46 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
init();
+ if (cacheCtx.mvccEnabled() && (optimistic() && !readCommitted()) && mvccTracker == null) {
+ // TODO IGNITE-3478: support async tx rollback (e.g. on timeout).
+ final GridFutureAdapter fut = new GridFutureAdapter();
+
+ boolean canRemap = cctx.lockedTopologyVersion(null) == null;
+
+ mvccTracker = new MvccQueryTracker(cacheCtx, canRemap,
+ new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() {
+ @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) {
+ if (e == null) {
+ getAllAsync(cacheCtx,
+ entryTopVer,
+ keys,
+ deserializeBinary,
+ skipVals,
+ keepCacheObjects,
+ skipStore,
+ recovery,
+ needVer).listen(new IgniteInClosure<IgniteInternalFuture<Map<Object, Object>>>() {
+ @Override
+ public void apply(IgniteInternalFuture<Map<Object, Object>> fut0) {
+ try {
+ fut.onDone(fut0.get());
+ } catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
+ }
+ });
+ }
+ else
+ fut.onDone(e);
+ }
+ }
+ );
+
+ mvccTracker.requestVersion(topologyVersion());
+
+ return fut;
+ }
+
int keysCnt = keys.size();
boolean single = keysCnt == 1;
@@ -1781,8 +1856,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
null,
txEntry.keepBinary(),
- null,
- null); // TODO IGNITE-3478
+ null, // TODO IGNITE-3478
+ null);
if (getRes != null) {
val = getRes.value();
@@ -2165,8 +2240,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null,
- null) : null; // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx), // TODO IGNITE-3478
+ null) : null;
if (getRes != null) {
val = getRes.value();
@@ -2185,7 +2260,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
resolveTaskName(),
accessPlc,
!deserializeBinary,
- null); // TODO IGNITE-3478
+ mvccReadVersion(cacheCtx)); // TODO IGNITE-3478
}
if (val != null) {
@@ -2464,7 +2539,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
* @param expiryPlc Expiry policy.
* @return Future with {@code True} value if loading took place.
*/
- public IgniteInternalFuture<Void> loadMissing(
+ private IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
AffinityTopologyVersion topVer,
boolean readThrough,
@@ -2523,7 +2598,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
skipVals,
needVer,
/*keepCacheObject*/true,
- recovery
+ recovery,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Object>, Void>() {
@Override public Void apply(IgniteInternalFuture<Object> f) {
try {
@@ -2554,7 +2630,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
expiryPlc0,
skipVals,
needVer,
- /*keepCacheObject*/true
+ /*keepCacheObject*/true,
+ mvccReadVersion(cacheCtx)
).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Void>() {
@Override public Void apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
@@ -3311,8 +3388,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou
if (!FINISH_FUT_UPD.compareAndSet(this, null, fut0 = new GridNearTxFinishFuture<>(cctx, this, false)))
return chainFinishFuture(finishFut, false);
- cctx.mvcc().addFuture(fut0, fut0.futureId());
-
IgniteInternalFuture<?> prepFut = this.prepFut;
if (prepFut == null || prepFut.isDone()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index e1c6636..80cd4c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -58,7 +58,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
private static final int ALLOW_WAIT_TOP_FUT_FLAG_MASK = 0x10;
/** */
- private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x02;
+ private static final int REQUEST_MVCC_CNTR_FLAG_MASK = 0x20;
/** Future ID. */
private IgniteUuid futId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
index 39baec9..d532d8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsDiscoveryData.java
@@ -24,6 +24,9 @@ import java.io.Serializable;
*/
public class CacheCoordinatorsDiscoveryData implements Serializable {
/** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
private MvccCoordinator crd;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 85dde15..fd3c2af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -39,6 +39,7 @@ import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -170,13 +171,22 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
return crdVer & CRD_VER_MASK;
}
+ /**
+ * @param topVer Topology version for cache operation.
+ * @return Error.
+ */
+ public static IgniteCheckedException noCoordinatorError(AffinityTopologyVersion topVer) {
+ return new ClusterTopologyServerNotFoundException("Mvcc coordinator is not assigned for " +
+ "topology version: " + topVer);
+ }
+
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
statCntrs = new StatCounter[7];
statCntrs[0] = new CounterWithAvg("CoordinatorTxCounterRequest", "avgTxs");
statCntrs[1] = new CounterWithAvg("MvccCoordinatorVersionResponse", "avgFutTime");
- statCntrs[2] = new StatCounter("CoordinatorTxAckRequest");
+ statCntrs[2] = new StatCounter("CoordinatorAckRequestTx");
statCntrs[3] = new CounterWithAvg("CoordinatorTxAckResponse", "avgFutTime");
statCntrs[4] = new StatCounter("TotalRequests");
statCntrs[5] = new StatCounter("CoordinatorWaitTxsRequest");
@@ -314,9 +324,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
GridCacheVersion txVer) {
assert !ctx.localNodeId().equals(crd.nodeId());
- MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(),
- crd.nodeId(),
- lsnr);
+ MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, lsnr);
verFuts.put(fut.id, fut);
@@ -341,20 +349,9 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
public void ackQueryDone(MvccCoordinator crd, MvccCoordinatorVersion mvccVer) {
assert crd != null;
- long trackCntr = mvccVer.counter();
-
- MvccLongList txs = mvccVer.activeTransactions();
-
- if (txs != null) {
- for (int i = 0; i < txs.size(); i++) {
- long txId = txs.get(i);
-
- if (txId < trackCntr)
- trackCntr = txId;
- }
- }
+ long trackCntr = queryTrackCounter(mvccVer);
- Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorQueryAckRequest(trackCntr) :
+ Message msg = crd.coordinatorVersion() == mvccVer.coordinatorVersion() ? new CoordinatorAckRequestQuery(trackCntr) :
new NewCoordinatorQueryAckRequest(mvccVer.coordinatorVersion(), trackCntr);
try {
@@ -373,6 +370,27 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param mvccVer Read version.
+ * @return
+ */
+ private long queryTrackCounter(MvccCoordinatorVersion mvccVer) {
+ long trackCntr = mvccVer.counter();
+
+ MvccLongList txs = mvccVer.activeTransactions();
+
+ int size = txs.size();
+
+ for (int i = 0; i < size; i++) {
+ long txId = txs.get(i);
+
+ if (txId < trackCntr)
+ trackCntr = txId;
+ }
+
+ return trackCntr;
+ }
+
+ /**
* @param crd Coordinator.
* @return Counter request future.
*/
@@ -380,7 +398,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
assert crd != null;
// TODO IGNITE-3478: special case for local?
- MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd.nodeId(), null);
+ MvccVersionFuture fut = new MvccVersionFuture(futIdCntr.incrementAndGet(), crd, null);
verFuts.put(fut.id, fut);
@@ -432,22 +450,24 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/**
* @param crd Coordinator.
- * @param mvccVer Transaction version.
+ * @param updateVer Transaction update version.
+ * @param readVer Transaction read version.
* @return Acknowledge future.
*/
- public IgniteInternalFuture<Void> ackTxCommit(UUID crd, MvccCoordinatorVersion mvccVer) {
+ public IgniteInternalFuture<Void> ackTxCommit(UUID crd,
+ MvccCoordinatorVersion updateVer,
+ @Nullable MvccCoordinatorVersion readVer) {
assert crd != null;
- assert mvccVer != null;
+ assert updateVer != null;
WaitAckFuture fut = new WaitAckFuture(futIdCntr.incrementAndGet(), crd, true);
ackFuts.put(fut.id, fut);
+ CoordinatorAckRequestTx msg = createTxAckMessage(fut.id, updateVer, readVer);
+
try {
- ctx.io().sendToGridTopic(crd,
- MSG_TOPIC,
- new CoordinatorTxAckRequest(fut.id, mvccVer.counter()),
- MSG_POLICY);
+ ctx.io().sendToGridTopic(crd, MSG_TOPIC, msg, MSG_POLICY);
}
catch (IgniteCheckedException e) {
if (ackFuts.remove(fut.id) != null) {
@@ -462,11 +482,45 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param futId Future ID.
+ * @param updateVer Update version.
+ * @param readVer Optional read version.
+ * @return Message.
+ */
+ private CoordinatorAckRequestTx createTxAckMessage(long futId,
+ MvccCoordinatorVersion updateVer,
+ @Nullable MvccCoordinatorVersion readVer)
+ {
+ CoordinatorAckRequestTx msg;
+
+ if (readVer != null) {
+ long trackCntr = queryTrackCounter(readVer);
+
+ if (readVer.coordinatorVersion() == updateVer.coordinatorVersion()) {
+ msg = new CoordinatorAckRequestTxAndQuery(futId,
+ updateVer.counter(),
+ trackCntr);
+ }
+ else {
+ msg = new CoordinatorAckRequestTxAndQueryEx(futId,
+ updateVer.counter(),
+ readVer.coordinatorVersion(),
+ trackCntr);
+ }
+ }
+ else
+ msg = new CoordinatorAckRequestTx(futId, updateVer.counter());
+
+ return msg;
+ }
+
+ /**
* @param crdId Coordinator node ID.
- * @param mvccVer Transaction version.
+ * @param updateVer Transaction update version.
+ * @param readVer Transaction read version.
*/
- public void ackTxRollback(UUID crdId, MvccCoordinatorVersion mvccVer) {
- CoordinatorTxAckRequest msg = new CoordinatorTxAckRequest(0, mvccVer.counter());
+ public void ackTxRollback(UUID crdId, MvccCoordinatorVersion updateVer, @Nullable MvccCoordinatorVersion readVer) {
+ CoordinatorAckRequestTx msg = createTxAckMessage(0, updateVer, readVer);
msg.skipResponse(true);
@@ -578,7 +632,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param nodeId Node ID.
* @param msg Message.
*/
- private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorQueryAckRequest msg) {
+ private void processCoordinatorQueryAckRequest(UUID nodeId, CoordinatorAckRequestQuery msg) {
onQueryDone(nodeId, msg.counter());
}
@@ -587,16 +641,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param msg Message.
*/
private void processNewCoordinatorQueryAckRequest(UUID nodeId, NewCoordinatorQueryAckRequest msg) {
- prevCrdQueries.onQueryDone(nodeId, msg);
+ prevCrdQueries.onQueryDone(nodeId, msg.coordinatorVersion(), msg.counter());
}
/**
* @param nodeId Sender node ID.
* @param msg Message.
*/
- private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorTxAckRequest msg) {
+ private void processCoordinatorTxAckRequest(UUID nodeId, CoordinatorAckRequestTx msg) {
onTxDone(msg.txCounter());
+ if (msg.queryCounter() != COUNTER_NA) {
+ if (msg.queryCoordinatorVersion() == 0)
+ onQueryDone(nodeId, msg.queryCounter());
+ else
+ prevCrdQueries.onQueryDone(nodeId, msg.queryCoordinatorVersion(), msg.queryCounter());
+ }
+
if (STAT_CNTRS)
statCntrs[2].update();
@@ -907,6 +968,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
+ * @param nodeId Node ID.
* @param msg Message.
*/
private void processCoordinatorWaitTxsRequest(final UUID nodeId, final CoordinatorWaitTxsRequest msg) {
@@ -954,8 +1016,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
- * @param nodeId
- * @param msg
+ * @param nodeId Node ID.
+ * @param msg Message.
*/
private void sendFutureResponse(UUID nodeId, CoordinatorWaitTxsRequest msg) {
try {
@@ -974,18 +1036,21 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
}
/**
- * @return
+ * @return Coordinator.
*/
public MvccCoordinator currentCoordinator() {
return curCrd;
}
+ /**
+ * @param curCrd Coordinator.
+ */
public void currentCoordinator(MvccCoordinator curCrd) {
this.curCrd = curCrd;
}
/**
- * @return
+ * @return Current coordinator node ID.
*/
public UUID currentCoordinatorId() {
MvccCoordinator curCrd = this.curCrd;
@@ -1013,7 +1078,33 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
*/
public void processClientActiveQueries(UUID nodeId,
@Nullable Map<MvccCounter, Integer> activeQueries) {
- prevCrdQueries.processClientActiveQueries(nodeId, activeQueries);
+ prevCrdQueries.addNodeActiveQueries(nodeId, activeQueries);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param msg Message.
+ */
+ private void processCoordinatorActiveQueriesMessage(UUID nodeId, CoordinatorActiveQueriesMessage msg) {
+ prevCrdQueries.addNodeActiveQueries(nodeId, msg.activeQueries());
+ }
+
+ /**
+ * @param nodeId Coordinator node ID.
+ * @param activeQueries Active queries.
+ */
+ public void sendActiveQueries(UUID nodeId, @Nullable Map<MvccCounter, Integer> activeQueries) {
+ CoordinatorActiveQueriesMessage msg = new CoordinatorActiveQueriesMessage(activeQueries);
+
+ try {
+ ctx.io().sendToGridTopic(nodeId,
+ MSG_TOPIC,
+ msg,
+ MSG_POLICY);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send active queries to mvcc coordinator: " + e);
+ }
}
/**
@@ -1070,7 +1161,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/**
*
*/
- public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> {
+ private class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> implements MvccCoordinatorFuture {
/** */
private final Long id;
@@ -1078,24 +1169,30 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
private MvccResponseListener lsnr;
/** */
- public final UUID crdId;
+ public final MvccCoordinator crd;
/** */
long startTime;
/**
* @param id Future ID.
- * @param crdId Coordinator node ID.
+ * @param crd Mvcc coordinator.
+ * @param lsnr Listener.
*/
- MvccVersionFuture(Long id, UUID crdId, @Nullable MvccResponseListener lsnr) {
+ MvccVersionFuture(Long id, MvccCoordinator crd, @Nullable MvccResponseListener lsnr) {
this.id = id;
- this.crdId = crdId;
+ this.crd = crd;
this.lsnr = lsnr;
if (STAT_CNTRS)
startTime = System.nanoTime();
}
+ /** {@inheritDoc} */
+ @Override public UUID coordinatorNodeId() {
+ return crd.nodeId();
+ }
+
/**
* @param res Response.
*/
@@ -1103,7 +1200,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
assert res.counter() != COUNTER_NA;
if (lsnr != null)
- lsnr.onMvccResponse(crdId, res);
+ lsnr.onMvccResponse(crd.nodeId(), res);
onDone(res);
}
@@ -1122,7 +1219,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
* @param nodeId Failed node ID.
*/
void onNodeLeft(UUID nodeId ) {
- if (crdId.equals(nodeId) && verFuts.remove(id) != null) {
+ if (crd.nodeId().equals(nodeId) && verFuts.remove(id) != null) {
ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " +
"version, coordinator failed: " + nodeId);
@@ -1132,14 +1229,14 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override public String toString() {
- return "MvccVersionFuture [crd=" + crdId + ", id=" + id + ']';
+ return "MvccVersionFuture [crd=" + crd.nodeId() + ", id=" + id + ']';
}
}
/**
*
*/
- private class WaitAckFuture extends GridFutureAdapter<Void> {
+ private class WaitAckFuture extends GridFutureAdapter<Void> implements MvccCoordinatorFuture {
/** */
private final long id;
@@ -1155,6 +1252,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
/**
* @param id Future ID.
* @param crdId Coordinator node ID.
+ * @param ackTx {@code True} if ack tx commit, {@code false} if waits for previous txs.
*/
WaitAckFuture(long id, UUID crdId, boolean ackTx) {
assert crdId != null;
@@ -1167,6 +1265,11 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
startTime = System.nanoTime();
}
+ /** {@inheritDoc} */
+ @Override public UUID coordinatorNodeId() {
+ return crdId;
+ }
+
/**
*
*/
@@ -1247,12 +1350,12 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
if (msg instanceof CoordinatorTxCounterRequest)
processCoordinatorTxCounterRequest(nodeId, (CoordinatorTxCounterRequest)msg);
- else if (msg instanceof CoordinatorTxAckRequest)
- processCoordinatorTxAckRequest(nodeId, (CoordinatorTxAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestTx)
+ processCoordinatorTxAckRequest(nodeId, (CoordinatorAckRequestTx)msg);
else if (msg instanceof CoordinatorFutureResponse)
processCoordinatorAckResponse(nodeId, (CoordinatorFutureResponse)msg);
- else if (msg instanceof CoordinatorQueryAckRequest)
- processCoordinatorQueryAckRequest(nodeId, (CoordinatorQueryAckRequest)msg);
+ else if (msg instanceof CoordinatorAckRequestQuery)
+ processCoordinatorQueryAckRequest(nodeId, (CoordinatorAckRequestQuery)msg);
else if (msg instanceof CoordinatorQueryVersionRequest)
processCoordinatorQueryVersionRequest(nodeId, (CoordinatorQueryVersionRequest)msg);
else if (msg instanceof MvccCoordinatorVersionResponse)
@@ -1261,6 +1364,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter {
processCoordinatorWaitTxsRequest(nodeId, (CoordinatorWaitTxsRequest)msg);
else if (msg instanceof NewCoordinatorQueryAckRequest)
processNewCoordinatorQueryAckRequest(nodeId, (NewCoordinatorQueryAckRequest)msg);
+ else if (msg instanceof CoordinatorActiveQueriesMessage)
+ processCoordinatorActiveQueriesMessage(nodeId, (CoordinatorActiveQueriesMessage)msg);
else
U.warn(log, "Unexpected message received [node=" + nodeId + ", msg=" + msg + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
new file mode 100644
index 0000000..e51ec90
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestQuery.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestQuery implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long cntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param cntr Query counter.
+ */
+ CoordinatorAckRequestQuery(long cntr) {
+ this.cntr = cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Counter.
+ */
+ public long counter() {
+ return cntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeLong("cntr", cntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cntr = reader.readLong("cntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 134;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
new file mode 100644
index 0000000..c0512f0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTx.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTx implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
+
+ /** */
+ private long futId;
+
+ /** */
+ private long txCntr;
+
+ /** */
+ private byte flags;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction.
+ */
+ CoordinatorAckRequestTx(long futId, long txCntr) {
+ this.futId = futId;
+ this.txCntr = txCntr;
+ }
+
+ /** {@inheritDoc} */
+ long queryCounter() {
+ return CacheCoordinatorsProcessor.COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ long queryCoordinatorVersion() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ long futureId() {
+ return futId;
+ }
+
+ /**
+ * @return {@code True} if response message is not needed.
+ */
+ boolean skipResponse() {
+ return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
+ }
+
+ /**
+ * @param val {@code True} if response message is not needed.
+ */
+ void skipResponse(boolean val) {
+ if (val)
+ flags |= SKIP_RESPONSE_FLAG_MASK;
+ else
+ flags &= ~SKIP_RESPONSE_FLAG_MASK;
+ }
+
+ /**
+ * @return Counter assigned tp transaction.
+ */
+ public long txCounter() {
+ return txCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("futId", futId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("txCntr", txCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ futId = reader.readLong("futId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ txCntr = reader.readLong("txCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 131;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
new file mode 100644
index 0000000..86c3223
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQuery.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQuery extends CoordinatorAckRequestTx {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQuery() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQuery(long futId, long txCntr, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQuery.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 141;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQuery.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
new file mode 100644
index 0000000..6f6f712
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorAckRequestTxAndQueryEx.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class CoordinatorAckRequestTxAndQueryEx extends CoordinatorAckRequestTx {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long qryCrdVer;
+
+ /** */
+ private long qryCntr;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorAckRequestTxAndQueryEx() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param txCntr Counter assigned to transaction update.
+ * @param qryCrdVer Version of coordinator assigned read counter.
+ * @param qryCntr Counter assigned for transaction reads.
+ */
+ CoordinatorAckRequestTxAndQueryEx(long futId, long txCntr, long qryCrdVer, long qryCntr) {
+ super(futId, txCntr);
+
+ this.qryCrdVer = qryCrdVer;
+ this.qryCntr = qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCoordinatorVersion() {
+ return qryCrdVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override long queryCounter() {
+ return qryCntr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeLong("qryCntr", qryCntr))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeLong("qryCrdVer", qryCrdVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ qryCntr = reader.readLong("qryCntr");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ qryCrdVer = reader.readLong("qryCrdVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorAckRequestTxAndQueryEx.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 142;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorAckRequestTxAndQueryEx.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
new file mode 100644
index 0000000..49b1adb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorActiveQueriesMessage.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class CoordinatorActiveQueriesMessage implements MvccCoordinatorMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridDirectMap(keyType = Message.class, valueType = Integer.class)
+ private Map<MvccCounter, Integer> activeQrys;
+
+ /**
+ * Required by {@link GridIoMessageFactory}.
+ */
+ public CoordinatorActiveQueriesMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param activeQrys Active queries.
+ */
+ CoordinatorActiveQueriesMessage(Map<MvccCounter, Integer> activeQrys) {
+ this.activeQrys = activeQrys;
+ }
+
+ /**
+ * @return Active queries.
+ */
+ @Nullable Map<MvccCounter, Integer> activeQueries() {
+ return activeQrys;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean waitForCoordinatorInit() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean processedFromNioThread() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeMap("activeQrys", activeQrys, MessageCollectionItemType.MSG, MessageCollectionItemType.INT))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ activeQrys = reader.readMap("activeQrys", MessageCollectionItemType.MSG, MessageCollectionItemType.INT, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(CoordinatorActiveQueriesMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 144;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CoordinatorActiveQueriesMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
index e7eff42..777927c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorFutureResponse.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
*/
public class CoordinatorFutureResponse implements MvccCoordinatorMessage {
/** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
private long futId;
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
deleted file mode 100644
index 602d3b4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorQueryAckRequest.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorQueryAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long cntr;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorQueryAckRequest() {
- // No-op.
- }
-
- /**
- * @param cntr Query counter.
- */
- CoordinatorQueryAckRequest(long cntr) {
- this.cntr = cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Counter.
- */
- public long counter() {
- return cntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeLong("cntr", cntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cntr = reader.readLong("cntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorQueryAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 134;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 1;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorQueryAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
deleted file mode 100644
index 14cd6a9..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorTxAckRequest.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.mvcc;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-
-/**
- *
- */
-public class CoordinatorTxAckRequest implements MvccCoordinatorMessage {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private static final int SKIP_RESPONSE_FLAG_MASK = 0x01;
-
- /** */
- private long futId;
-
- /** */
- private long txCntr;
-
- /** */
- private byte flags;
-
- /**
- * Required by {@link GridIoMessageFactory}.
- */
- public CoordinatorTxAckRequest() {
- // No-op.
- }
-
- /**
- * @param futId Future ID.
- * @param txCntr Counter assigned to transaction.
- */
- CoordinatorTxAckRequest(long futId, long txCntr) {
- this.futId = futId;
- this.txCntr = txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean waitForCoordinatorInit() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean processedFromNioThread() {
- return true;
- }
-
- /**
- * @return Future ID.
- */
- long futureId() {
- return futId;
- }
-
- /**
- * @return {@code True} if response message is not needed.
- */
- boolean skipResponse() {
- return (flags & SKIP_RESPONSE_FLAG_MASK) != 0;
- }
-
- /**
- * @param val {@code True} if response message is not needed.
- */
- void skipResponse(boolean val) {
- if (val)
- flags |= SKIP_RESPONSE_FLAG_MASK;
- else
- flags &= ~SKIP_RESPONSE_FLAG_MASK;
- }
-
- /**
- * @return Counter assigned tp transaction.
- */
- public long txCounter() {
- return txCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByte("flags", flags))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong("futId", futId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong("txCntr", txCntr))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- flags = reader.readByte("flags");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- futId = reader.readLong("futId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- txCntr = reader.readLong("txCntr");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return reader.afterMessageRead(CoordinatorTxAckRequest.class);
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 131;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 3;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CoordinatorTxAckRequest.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f33d6a5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
index f40df72..0d75f0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CoordinatorWaitTxsRequest.java
@@ -28,6 +28,9 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
*/
public class CoordinatorWaitTxsRequest implements MvccCoordinatorMessage {
/** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
private long futId;
/** */