You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/18 17:20:57 UTC
[1/2] ignite git commit: Performance optimizations - reviewed by
Yakov.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 4c9ea5864 -> 175b7f24e
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index e8546ef..1a26028 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -273,7 +273,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
* @throws IgniteCheckedException If failed.
* @throws GridDistributedLockCancelledException If lock has been cancelled.
*/
- @SuppressWarnings({"RedundantTypeArguments"})
+ @SuppressWarnings({"RedundantTypeArguments", "ForLoopReplaceableByForEach"})
@Nullable public GridNearTxRemote startRemoteTx(UUID nodeId, GridDhtLockRequest req)
throws IgniteCheckedException, GridDistributedLockCancelledException {
List<KeyCacheObject> nearKeys = req.nearKeys();
@@ -285,6 +285,7 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
if (ldr != null) {
Collection<IgniteTxKey> evicted = null;
+ // Avoid iterator creation.
for (int i = 0; i < nearKeys.size(); i++) {
KeyCacheObject key = nearKeys.get(i);
@@ -293,8 +294,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
IgniteTxKey txKey = ctx.txKey(key);
- Collection<GridCacheMvccCandidate> cands = req.candidatesByIndex(i);
-
if (log.isDebugEnabled())
log.debug("Unmarshalled key: " + key);
@@ -356,8 +355,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V>
req.owned(entry.key())
);
- assert cands.isEmpty() : "Received non-empty candidates in dht lock request: " + cands;
-
if (!req.inTx())
ctx.evicts().touch(entry, req.topologyVersion());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 31aa8c3..9c022b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -332,6 +333,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
/**
* Initializes future.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
void finish() {
if (tx.onNeedCheckBackup()) {
assert tx.onePhaseCommit();
@@ -363,10 +365,18 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (!isSync() && !isDone()) {
boolean complete = true;
- for (IgniteInternalFuture<?> f : pending())
- // Mini-future in non-sync mode gets done when message gets sent.
- if (isMini(f) && !f.isDone())
- complete = false;
+ synchronized (futs) {
+ // Avoid collection copy and iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<IgniteInternalTx> f = futs.get(i);
+
+ if (isMini(f) && !f.isDone()) {
+ complete = false;
+
+ break;
+ }
+ }
+ }
if (complete)
onComplete();
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 434b6c7..b92be31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -461,21 +461,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
}
/**
- * @param nodeId Node ID.
- * @param dhtVer DHT version.
- * @param writeVer Write version.
- */
- void addDhtVersion(UUID nodeId, GridCacheVersion dhtVer, GridCacheVersion writeVer) {
- // This step is very important as near and DHT versions grow separately.
- cctx.versions().onReceived(nodeId, dhtVer);
-
- GridDistributedTxMapping m = mappings.get(nodeId);
-
- if (m != null)
- m.dhtVersion(dhtVer, writeVer);
- }
-
- /**
* @param nodeId Undo mapping.
*/
@Override public boolean removeMapping(UUID nodeId) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index 45477a0..cfaadc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -172,6 +172,8 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
assert res.error() == null : res;
assert F.isEmpty(res.invalidPartitions()) : res;
+ UUID nodeId = m.node().id();
+
for (Map.Entry<IgniteTxKey, CacheVersionedValue> entry : res.ownedValues().entrySet()) {
IgniteTxEntry txEntry = tx.entry(entry.getKey());
@@ -187,7 +189,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
CacheVersionedValue tup = entry.getValue();
nearEntry.resetFromPrimary(tup.value(), tx.xidVersion(),
- tup.version(), m.node().id(), tx.topologyVersion());
+ tup.version(), nodeId, tx.topologyVersion());
}
else if (txEntry.cached().detached()) {
GridDhtDetachedCacheEntry detachedEntry = (GridDhtDetachedCacheEntry)txEntry.cached();
@@ -229,11 +231,17 @@ public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture<
if (writeVer == null)
writeVer = res.dhtVersion();
- // Register DHT version.
- tx.addDhtVersion(m.node().id(), res.dhtVersion(), writeVer);
+ // This step is very important as near and DHT versions grow separately.
+ cctx.versions().onReceived(nodeId, res.dhtVersion());
+ // Register DHT version.
m.dhtVersion(res.dhtVersion(), writeVer);
+ GridDistributedTxMapping map = tx.mappings().get(nodeId);
+
+ if (map != null)
+ map.dhtVersion(res.dhtVersion(), writeVer);
+
if (m.near())
tx.readyNearLocks(m, res.pending(), res.committedVersions(), res.rolledbackVersions());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
index 798635a..9dfdb43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java
@@ -71,7 +71,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** Implicit single flag. */
private boolean implicitSingle;
- /** Explicit lock flag. Set to true if at leat one entry was explicitly locked. */
+ /** Explicit lock flag. Set to true if at least one entry was explicitly locked. */
private boolean explicitLock;
/** Subject ID. */
@@ -282,73 +282,73 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 22:
+ case 23:
if (!writer.writeBoolean("explicitLock", explicitLock))
return false;
writer.incrementState();
- case 23:
+ case 24:
if (!writer.writeBoolean("firstClientReq", firstClientReq))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeBoolean("implicitSingle", implicitSingle))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeCollection("lastBackups", lastBackups, MessageCollectionItemType.UUID))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeBoolean("near", near))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 33:
+ case 34:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -370,7 +370,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 22:
+ case 23:
explicitLock = reader.readBoolean("explicitLock");
if (!reader.isLastRead())
@@ -378,7 +378,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 23:
+ case 24:
firstClientReq = reader.readBoolean("firstClientReq");
if (!reader.isLastRead())
@@ -386,7 +386,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -394,7 +394,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
implicitSingle = reader.readBoolean("implicitSingle");
if (!reader.isLastRead())
@@ -402,7 +402,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -410,7 +410,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
lastBackups = reader.readCollection("lastBackups", MessageCollectionItemType.UUID);
if (!reader.isLastRead())
@@ -418,7 +418,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -426,7 +426,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
near = reader.readBoolean("near");
if (!reader.isLastRead())
@@ -434,7 +434,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
@@ -442,7 +442,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -450,7 +450,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -458,7 +458,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 34:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -478,7 +478,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 34;
+ return 35;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..eb0db4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteUuid;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 6a0f8ab..3ddd909 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 9eb2808..23f83be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -599,20 +599,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
*/
@SuppressWarnings("unchecked")
public CacheObject applyEntryProcessors(CacheObject cacheVal) {
- Object val = null;
- Object keyVal = null;
-
GridCacheVersion ver;
try {
ver = entry.version();
}
- catch (GridCacheEntryRemovedException e) {
+ catch (GridCacheEntryRemovedException ignore) {
assert tx == null || tx.optimistic() : tx;
ver = null;
}
+ Object val = null;
+ Object keyVal = null;
+
for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
try {
CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val,
@@ -1078,5 +1078,4 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
@Override public String toString() {
return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion());
}
-
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 5f48469..c75a8f38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -147,6 +147,13 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+ GridCacheContext ctx0 = cacheCtx;
+
+ return ctx0 != null && ctx0.config().getInterceptor() != null;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
if (cacheCtx == null)
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 758f82c..9e44b10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -67,6 +67,7 @@ import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridLeanMap;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -74,7 +75,6 @@ import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridInClosure3;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.C2;
@@ -358,6 +358,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
}
/**
+ * @return {@code True} if transaction participates in a cache that has an interceptor configured.
+ */
+ public boolean hasInterceptor() {
+ return txState().hasInterceptor(cctx);
+ }
+
+ /**
* @param needRetVal Need return value flag.
*/
public void needReturnValue(boolean needRetVal) {
@@ -3045,7 +3052,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
try {
Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet();
- final Collection<KeyCacheObject> enlisted = new ArrayList<>();
+ final Collection<KeyCacheObject> enlisted = new ArrayList<>(keySet.size());
CacheOperationContext opCtx = cacheCtx.operationContextPerCall();
@@ -3434,7 +3441,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
*/
public boolean init() {
return !txState.init(txSize) || cctx.tm().onStarted(this);
-
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index ccccca0..67bca51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.IgnitePair;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index e7c4c96..3e5034b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -92,6 +92,11 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
}
/** {@inheritDoc} */
+ @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
assert false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 81707ba..18fce8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -101,6 +101,12 @@ public interface IgniteTxState {
/**
* @param cctx Context.
+ * @return {@code True} if transaction spans one or more caches with configured interceptor.
+ */
+ public boolean hasInterceptor(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
* @return Configured stores for active caches.
*/
public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c95fb19..213c5e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,11 +24,13 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -45,7 +47,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
*/
public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** Active cache IDs. */
- private Set<Integer> activeCacheIds = new HashSet<>();
+ private GridLongList activeCacheIds = new GridLongList();
/** Per-transaction read map. */
@GridToStringInclude
@@ -66,13 +68,13 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Nullable @Override public Integer firstCacheId() {
- return F.first(activeCacheIds);
+ return activeCacheIds.isEmpty() ? null : (int)activeCacheIds.get(0);
}
/** {@inheritDoc} */
@Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
if (activeCacheIds.size() == 1) {
- int cacheId = F.first(activeCacheIds);
+ int cacheId = (int)activeCacheIds.get(0);
return cctx.cacheContext(cacheId);
}
@@ -82,8 +84,11 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public void awaitLastFut(GridCacheSharedContext cctx) {
- for (Integer cacheId : activeCacheIds)
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
cctx.cacheContext(cacheId).cache().awaitLastFut();
+ }
}
/** {@inheritDoc} */
@@ -91,7 +96,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
GridDhtTopologyFuture topFut) {
StringBuilder invalidCaches = null;
- for (Integer cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext ctx = cctx.cacheContext(cacheId);
assert ctx != null : cacheId;
@@ -113,7 +120,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
invalidCaches.toString());
}
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) {
@@ -127,7 +136,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public boolean sync(GridCacheSharedContext cctx) {
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
return true;
}
@@ -137,7 +148,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public boolean hasNearCache(GridCacheSharedContext cctx) {
- for (Integer cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx.isNear())
@@ -163,7 +176,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
int idx = 0;
- for (Integer activeCacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int activeCacheId = (int)activeCacheIds.get(i);
+
cacheNames.append(cctx.cacheContext(activeCacheId).name());
if (idx++ < activeCacheIds.size() - 1)
@@ -192,7 +207,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
GridCacheContext<?, ?> nonLocCtx = null;
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
if (!cacheCtx.isLocal()) {
@@ -222,7 +239,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
if (!activeCacheIds.isEmpty()) {
GridCacheContext<?, ?> nonLocCtx = null;
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
if (!cacheCtx.isLocal()) {
@@ -240,7 +259,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public boolean storeUsed(GridCacheSharedContext cctx) {
if (!activeCacheIds.isEmpty()) {
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
CacheStoreManager store = cctx.cacheContext(cacheId).store();
if (store.configured())
@@ -252,13 +273,29 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
}
/** {@inheritDoc} */
+ @Override public boolean hasInterceptor(GridCacheSharedContext cctx) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
+ CacheInterceptor interceptor = cctx.cacheContext(cacheId).config().getInterceptor();
+
+ if (interceptor != null)
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
- Collection<Integer> cacheIds = activeCacheIds;
+ GridLongList cacheIds = activeCacheIds;
if (!cacheIds.isEmpty()) {
Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
- for (int cacheId : cacheIds) {
+ for (int i = 0; i < cacheIds.size(); i++) {
+ int cacheId = (int)cacheIds.get(i);
+
CacheStoreManager store = cctx.cacheContext(cacheId).store();
if (store.configured())
@@ -273,7 +310,9 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
/** {@inheritDoc} */
@Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
- for (int cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
onTxEnd(cacheCtx, tx, commit);
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
index 21d3fb6..b5c89cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockSyncProcessor.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
@@ -71,6 +72,9 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist =
new GridBoundedConcurrentOrderedMap<>(MAX_TIME_SYNC_HISTORY);
+ /** Last recorded. */
+ private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot;
+
/** Time source. */
private GridClockSource clockSrc;
@@ -99,7 +103,11 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
GridClockDeltaVersion ver = msg0.snapshotVersion();
- timeSyncHist.put(ver, new GridClockDeltaSnapshot(ver, msg0.deltas()));
+ GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas());
+
+ lastSnapshot = new T2<>(ver, snap);
+
+ timeSyncHist.put(ver, snap);
}
});
@@ -265,11 +273,19 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
* @return Adjusted time.
*/
public long adjustedTime(long topVer) {
- // Get last synchronized time on given topology version.
- Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
- new GridClockDeltaVersion(0, topVer + 1));
+ T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = lastSnapshot;
+
+ GridClockDeltaSnapshot snap;
- GridClockDeltaSnapshot snap = entry == null ? null : entry.getValue();
+ if (fastSnap != null && fastSnap.get1().topologyVersion() == topVer)
+ snap = fastSnap.get2();
+ else {
+ // Get last synchronized time on given topology version.
+ Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry = timeSyncHistory().lowerEntry(
+ new GridClockDeltaVersion(0, topVer + 1));
+
+ snap = entry == null ? null : entry.getValue();
+ }
long now = clockSrc.currentTimeMillis();
@@ -295,6 +311,8 @@ public class GridClockSyncProcessor extends GridProcessorAdapter {
return;
try {
+ lastSnapshot = new T2<>(snapshot.version(), snapshot);
+
timeSyncHist.put(snapshot.version(), snapshot);
for (ClusterNode n : top.topologyNodes()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
new file mode 100644
index 0000000..25e3376
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/UUIDCollectionMessage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class UUIDCollectionMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ @GridDirectCollection(UUID.class)
+ private Collection<UUID> uuids;
+
+ /**
+ * Empty constructor required for direct marshalling.
+ */
+ public UUIDCollectionMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param uuids UUIDs to wrap.
+ */
+ public UUIDCollectionMessage(Collection<UUID> uuids) {
+ this.uuids = uuids;
+ }
+
+ /**
+ * @return The collection of UUIDs that was wrapped.
+ */
+ public Collection<UUID> uuids() {
+ return uuids;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeCollection("uuids", uuids, MessageCollectionItemType.UUID))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ uuids = reader.readCollection("uuids", MessageCollectionItemType.UUID);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(UUIDCollectionMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 115;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
index 31674f1..5f0d411 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java
@@ -53,7 +53,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls");
/** Futures. */
- private final Collection<IgniteInternalFuture<T>> futs = new ArrayList<>();
+ protected final ArrayList<IgniteInternalFuture<T>> futs = new ArrayList<>();
/** Reducer. */
@GridToStringInclude
@@ -166,8 +166,19 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> {
*
* @return {@code True} if there are pending futures.
*/
+ @SuppressWarnings("ForLoopReplaceableByForEach")
public boolean hasPending() {
- return !pending().isEmpty();
+ synchronized (futs) {
+ // Avoid iterator creation and collection copy.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<T> fut = futs.get(i);
+
+ if (!fut.isDone())
+ return true;
+ }
+ }
+
+ return false;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index c1d91a8..8d5a8e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1866,6 +1866,8 @@ public class GridFunc {
assert m != null;
+ final boolean hasPred = p != null && p.length > 0;
+
return new GridSerializableMap<K, V1>() {
/** Entry predicate. */
private IgnitePredicate<Entry<K, V>> ep = new P1<Map.Entry<K, V>>() {
@@ -1911,7 +1913,7 @@ public class GridFunc {
}
@Override public int size() {
- return F.size(m.keySet(), p);
+ return hasPred ? F.size(m.keySet(), p) : m.size();
}
@SuppressWarnings({"unchecked"})
@@ -1925,13 +1927,13 @@ public class GridFunc {
}
@Override public boolean isEmpty() {
- return !iterator().hasNext();
+ return hasPred ? !iterator().hasNext() : m.isEmpty();
}
};
}
@Override public boolean isEmpty() {
- return entrySet().isEmpty();
+ return hasPred ? entrySet().isEmpty() : m.isEmpty();
}
@SuppressWarnings({"unchecked"})
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 1824339..5bd08e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1301,6 +1301,8 @@ public class GridNioServer<T> {
@SuppressWarnings("unchecked")
private void bodyInternal() throws IgniteCheckedException {
try {
+ long lastIdleCheck = U.currentTimeMillis();
+
while (!closed && selector.isOpen()) {
NioOperationFuture req;
@@ -1374,11 +1376,18 @@ public class GridNioServer<T> {
}
// Wake up every 2 seconds to check if closed.
- if (selector.select(2000) > 0)
+ if (selector.select(2000) > 0) {
// Walk through the ready keys collection and process network events.
processSelectedKeys(selector.selectedKeys());
+ }
- checkIdle(selector.keys());
+ long now = U.currentTimeMillis();
+
+ if (now - lastIdleCheck > 2000) {
+ lastIdleCheck = now;
+
+ checkIdle(selector.keys());
+ }
}
}
// Ignore this exception as thread interruption is equal to 'close' call.
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
index 8957c5d..6424b8b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheTxStoreSessionTest.java
@@ -285,7 +285,7 @@ public class IgniteCacheTxStoreSessionTest extends IgniteCacheStoreSessionAbstra
expData.add(new ExpectedData(true, "write", new HashMap<>(), CACHE_NAME1));
expData.add(new ExpectedData(true, "write", F.<Object, Object>asMap(0, "write"), null));
- expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), null));
+ expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, "write", 1, "write"), CACHE_NAME1));
tx.commit();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index c3c3659..ea13cdd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridCachePut
import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxReentryNearSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingAsyncSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingSyncSelfTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingUnmarshallingFailedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheDaemonNodeReplicatedSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedAtomicGetAndTransformStoreSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedBasicApiTest;
@@ -138,6 +139,7 @@ public class IgniteCacheTestSuite3 extends TestSuite {
suite.addTestSuite(GridCacheOrderedPreloadingSelfTest.class);
suite.addTestSuite(GridCacheRebalancingSyncSelfTest.class);
+ suite.addTestSuite(GridCacheRebalancingUnmarshallingFailedSelfTest.class);
suite.addTestSuite(GridCacheRebalancingAsyncSelfTest.class);
// Test for byte array value special case.
[2/2] ignite git commit: Performance optimizations - reviewed by
Yakov.
Posted by ag...@apache.org.
Performance optimizations - reviewed by Yakov.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/175b7f24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/175b7f24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/175b7f24
Branch: refs/heads/ignite-1.5
Commit: 175b7f24e1d62a90e7a7159ad670036216e6d278
Parents: 4c9ea58
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Wed Nov 18 19:20:45 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Wed Nov 18 19:20:45 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 11 +-
.../communication/GridIoMessageFactory.java | 6 +
.../discovery/GridDiscoveryManager.java | 2 +-
.../cache/GridCacheDeploymentManager.java | 2 +-
.../processors/cache/GridCacheGateway.java | 1 -
.../processors/cache/GridCacheMvcc.java | 7 --
.../processors/cache/GridCacheMvccManager.java | 42 -------
.../GridCachePartitionExchangeManager.java | 55 ++++++++-
.../cache/GridCacheSharedContext.java | 7 +-
.../distributed/GridCacheTxRecoveryFuture.java | 41 +++++--
.../distributed/GridDistributedBaseMessage.java | 56 ---------
.../distributed/GridDistributedLockRequest.java | 6 -
.../GridDistributedLockResponse.java | 32 +-----
.../GridDistributedTxPrepareRequest.java | 67 +++++++++--
.../distributed/dht/GridDhtLockFuture.java | 63 ++++++----
.../distributed/dht/GridDhtLockRequest.java | 2 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 5 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 9 ++
.../distributed/dht/GridDhtTxPrepareFuture.java | 60 ++++++----
.../dht/GridDhtTxPrepareRequest.java | 54 ++++-----
.../dht/colocated/GridDhtColocatedCache.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 55 ++++++---
.../distributed/near/GridNearLockFuture.java | 56 ++++++---
.../distributed/near/GridNearLockRequest.java | 4 +-
...arOptimisticSerializableTxPrepareFuture.java | 91 +++++++++------
.../near/GridNearOptimisticTxPrepareFuture.java | 50 +++++---
.../GridNearPessimisticTxPrepareFuture.java | 39 +++++--
.../near/GridNearTransactionalCache.java | 7 +-
.../near/GridNearTxFinishFuture.java | 18 ++-
.../cache/distributed/near/GridNearTxLocal.java | 15 ---
.../near/GridNearTxPrepareFutureAdapter.java | 14 ++-
.../near/GridNearTxPrepareRequest.java | 52 ++++-----
.../cache/transactions/IgniteInternalTx.java | 1 +
.../cache/transactions/IgniteTxAdapter.java | 1 +
.../cache/transactions/IgniteTxEntry.java | 9 +-
.../IgniteTxImplicitSingleStateImpl.java | 7 ++
.../transactions/IgniteTxLocalAdapter.java | 12 +-
.../cache/transactions/IgniteTxManager.java | 1 +
.../IgniteTxRemoteStateAdapter.java | 5 +
.../cache/transactions/IgniteTxState.java | 6 +
.../cache/transactions/IgniteTxStateImpl.java | 69 ++++++++---
.../clock/GridClockSyncProcessor.java | 28 ++++-
.../internal/util/UUIDCollectionMessage.java | 114 +++++++++++++++++++
.../util/future/GridCompoundFuture.java | 15 ++-
.../ignite/internal/util/lang/GridFunc.java | 8 +-
.../ignite/internal/util/nio/GridNioServer.java | 13 ++-
.../IgniteCacheTxStoreSessionTest.java | 2 +-
.../testsuites/IgniteCacheTestSuite3.java | 2 +
48 files changed, 805 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 8d9a3f5..74c71c4 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -43,6 +43,10 @@ import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
+import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -165,7 +169,12 @@ public class MessageCodeGenerator {
MessageCodeGenerator gen = new MessageCodeGenerator(srcDir);
- gen.generateAll(true);
+ gen.generateAndWrite(GridDistributedTxPrepareRequest.class);
+ gen.generateAndWrite(GridDhtTxPrepareRequest.class);
+ gen.generateAndWrite(GridNearTxPrepareRequest.class);
+ gen.generateAndWrite(UUIDCollectionMessage.class);
+
+// gen.generateAll(true);
// gen.generateAndWrite(DataStreamerEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ae8c753..2503eda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -121,6 +121,7 @@ import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRe
import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse;
import org.apache.ignite.internal.util.GridByteArrayList;
import org.apache.ignite.internal.util.GridLongList;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -690,6 +691,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 115:
+ msg = new UUIDCollectionMessage();
+
+ break;
+
// [-3..114] - this
// [120..123] - DR
// [-4..-22] - SQL
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index cd2f49c..4880338 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2136,7 +2136,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
customEvt.node(ctx.discovery().localNode());
customEvt.eventNode(node);
customEvt.type(type);
- customEvt.topologySnapshot(topVer.topologyVersion(), null);
+ customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4());
customEvt.affinityTopologyVersion(topVer);
customEvt.customMessage(evt.get5());
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index 40c5b0f..35e8b75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -164,7 +164,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
* Callback on method enter.
*/
public void onEnter() {
- if (!locDepOwner && depEnabled && !ignoreOwnership.get()
+ if (depEnabled && !locDepOwner && !ignoreOwnership.get()
&& !cctx.kernalContext().job().internal()) {
ClassLoader ldr = Thread.currentThread().getContextClassLoader();
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 0eac5ba..1562d70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -109,7 +109,6 @@ public class GridCacheGateway<K, V> {
rwLock.readLock();
return checkState(true, false);
-
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
index 12583ad..adcbf92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java
@@ -348,9 +348,6 @@ public final class GridCacheMvcc {
reassign();
- if (cand.local())
- cctx.mvcc().removeLocal(cand);
-
return true;
}
}
@@ -596,8 +593,6 @@ public final class GridCacheMvcc {
);
if (serOrder == null) {
- cctx.mvcc().addLocal(cand);
-
boolean add = add0(cand);
assert add : cand;
@@ -605,8 +600,6 @@ public final class GridCacheMvcc {
else {
if (!add0(cand))
return null;
-
- cctx.mvcc().addLocal(cand);
}
return cand;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 2c14209..8562f37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
@@ -92,9 +91,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks =
new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q);
- /** Current local candidates. */
- private Collection<GridCacheMvccCandidate> dhtLocCands = new ConcurrentSkipListSet<>();
-
/** Locked keys. */
@GridToStringExclude
private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> locked = newMap();
@@ -707,43 +703,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
- * @param cand Local lock.
- * @return {@code True} if added.
- */
- public boolean addLocal(GridCacheMvccCandidate cand) {
- assert cand.key() != null : cand;
- assert cand.local() : cand;
-
- if (cand.dhtLocal() && dhtLocCands.add(cand)) {
- if (log.isDebugEnabled())
- log.debug("Added local candidate: " + cand);
-
- return true;
- }
-
- return false;
- }
-
- /**
- *
- * @param cand Local candidate to remove.
- * @return {@code True} if removed.
- */
- public boolean removeLocal(GridCacheMvccCandidate cand) {
- assert cand.key() != null : cand;
- assert cand.local() : cand;
-
- if (cand.dhtLocal() && dhtLocCands.remove(cand)) {
- if (log.isDebugEnabled())
- log.debug("Removed local candidate: " + cand);
-
- return true;
- }
-
- return false;
- }
-
- /**
* @param cacheCtx Cache context.
* @param cand Cache lock candidate to add.
* @return {@code True} if added as a result of this operation,
@@ -953,7 +912,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
X.println(">>> ");
X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']');
X.println(">>> rmvLocksSize: " + rmvLocks.sizex());
- X.println(">>> dhtLocCandsSize: " + dhtLocCands.size());
X.println(">>> lockedSize: " + locked.size());
X.println(">>> futsSize: " + futs.size());
X.println(">>> near2dhtSize: " + near2dht.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 81ff028..e19b310 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridListSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgnitePair;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -134,6 +137,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>();
/** */
+ private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers =
+ new ConcurrentSkipListMap<>();
+
+ /** */
private final AtomicReference<AffinityTopologyVersion> readyTopVer =
new AtomicReference<>(AffinityTopologyVersion.NONE);
@@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Gets minimum node version for the given topology version.
+ *
+ * @param topVer Topology version to get minimum node version for.
+ * @return Minimum node version.
+ */
+ public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) {
+ IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+ return vers == null ? cctx.localNode().version() : vers.get1();
+ }
+
+ /**
+ * Gets maximum node version for the given topology version.
+ *
+ * @param topVer Topology version to get maximum node version for.
+ * @return Maximum node version.
+ */
+ public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) {
+ IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer);
+
+ return vers == null ? cctx.localNode().version() : vers.get2();
+ }
+
+ /**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
@@ -832,6 +863,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (log.isDebugEnabled())
log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ IgniteProductVersion minVer = cctx.localNode().version();
+ IgniteProductVersion maxVer = cctx.localNode().version();
+
+ if (err == null) {
+ if (!F.isEmpty(exchFut.discoveryEvent().topologyNodes())) {
+ for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) {
+ IgniteProductVersion ver = node.version();
+
+ if (ver.compareTo(minVer) < 0)
+ minVer = ver;
+
+ if (ver.compareTo(maxVer) > 0)
+ maxVer = ver;
+ }
+ }
+ }
+
+ nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer));
+
+ for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet())
+ nodeVers.remove(oldVer);
+
if (err == null) {
while (true) {
AffinityTopologyVersion readyVer = readyTopVer.get();
@@ -1050,7 +1103,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int cnt = 0;
- for (GridDhtPartitionsExchangeFuture fut : exchFuts) {
+ for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
U.warn(log, ">>> " + fut);
if (++cnt == 10)
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 4293b90..608829a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -533,7 +534,7 @@ public class GridCacheSharedContext<K, V> {
* @param cacheCtx Cache context.
* @return Error message if transactions are incompatible.
*/
- @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds,
+ @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds,
GridCacheContext<K, V> cacheCtx) {
if (cacheCtx.systemTx() && !tx.system())
return "system cache can be enlisted only in system transaction";
@@ -541,7 +542,9 @@ public class GridCacheSharedContext<K, V> {
if (!cacheCtx.systemTx() && tx.system())
return "non-system cache can't be enlisted in system transaction";
- for (Integer cacheId : activeCacheIds) {
+ for (int i = 0; i < activeCacheIds.size(); i++) {
+ int cacheId = (int)activeCacheIds.get(i);
+
GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId);
if (cacheCtx.systemTx()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index b266c4d..01c4867 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -342,20 +342,45 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
*/
public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) {
if (!isDone()) {
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (f.futureId().equals(res.miniId())) {
- assert f.nodeId().equals(nodeId);
+ if (mini != null) {
+ assert mini.nodeId().equals(nodeId);
- f.onResult(res);
+ mini.onResult(res);
+ }
+ }
+ }
- break;
- }
+ /**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
}
}
}
+
+ return null;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
index f4a16dc..ebbc9ae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java
@@ -21,13 +21,10 @@ import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -49,15 +46,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
@GridToStringInclude
protected GridCacheVersion ver;
- /**
- * Candidates for every key ordered in the order of keys. These
- * can be either local-only candidates in case of lock acquisition,
- * or pending candidates in case of transaction commit.
- */
- @GridToStringInclude
- @GridDirectTransient
- private Collection<GridCacheMvccCandidate>[] candsByIdx;
-
/** */
@GridToStringExclude
private byte[] candsByIdxBytes;
@@ -108,23 +96,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
this.ver = ver;
}
- /** {@inheritDoc}
- * @param ctx*/
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
- super.prepareMarshal(ctx);
-
- if (candsByIdx != null)
- candsByIdxBytes = ctx.marshaller().marshal(candsByIdx);
- }
-
- /** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- super.finishUnmarshal(ctx, ldr);
-
- if (candsByIdxBytes != null)
- candsByIdx = ctx.marshaller().unmarshal(candsByIdxBytes, ldr);
- }
-
/** {@inheritDoc} */
@Override public boolean addDeploymentInfo() {
return addDepInfo;
@@ -169,33 +140,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem
}
/**
- * @param idx Key index.
- * @param candsByIdx List of candidates for that key.
- */
- @SuppressWarnings({"unchecked"})
- public void candidatesByIndex(int idx, Collection<GridCacheMvccCandidate> candsByIdx) {
- assert idx < cnt;
-
- // If nothing to add.
- if (candsByIdx == null || candsByIdx.isEmpty())
- return;
-
- if (this.candsByIdx == null)
- this.candsByIdx = new Collection[cnt];
-
- this.candsByIdx[idx] = candsByIdx;
- }
-
- /**
- * @param idx Key index.
- * @return Candidates for given key.
- */
- public Collection<GridCacheMvccCandidate> candidatesByIndex(int idx) {
- return candsByIdx == null ||
- candsByIdx[idx] == null ? Collections.<GridCacheMvccCandidate>emptyList() : candsByIdx[idx];
- }
-
- /**
* @return Count of keys referenced in candidates array (needed only locally for optimization).
*/
public int keysCount() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 2899e25..b584f8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -19,14 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -261,14 +259,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
*
* @param key Key.
* @param retVal Flag indicating whether value should be returned.
- * @param cands Candidates.
* @param ctx Context.
* @throws IgniteCheckedException If failed.
*/
public void addKeyBytes(
KeyCacheObject key,
boolean retVal,
- @Nullable Collection<GridCacheMvccCandidate> cands,
GridCacheContext ctx
) throws IgniteCheckedException {
if (keys == null)
@@ -276,8 +272,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
keys.add(key);
- candidatesByIndex(idx, cands);
-
retVals[idx] = retVal;
idx++;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index cdd58b5..bb3f9ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -26,7 +26,6 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -156,34 +155,11 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
}
/**
- * @param idx Index of locked flag.
- * @return Value of locked flag at given index.
- */
- public boolean isCurrentlyLocked(int idx) {
- assert idx >= 0;
-
- Collection<GridCacheMvccCandidate> cands = candidatesByIndex(idx);
-
- for (GridCacheMvccCandidate cand : cands)
- if (cand.owner())
- return true;
-
- return false;
- }
-
- /**
- * @param idx Candidates index.
- * @param cands Collection of candidates.
* @param committedVers Committed versions relative to lock version.
* @param rolledbackVers Rolled back versions relative to lock version.
*/
- public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands,
- Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
- assert idx >= 0;
-
+ public void setCandidates(Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) {
completedVersions(committedVers, rolledbackVers);
-
- candidatesByIndex(idx, cands);
}
/**
@@ -218,9 +194,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId));
-// if (F.isEmpty(valBytes) && !F.isEmpty(vals))
-// valBytes = marshalValuesCollection(vals, ctx);
-
if (err != null)
errBytes = ctx.marshaller().marshal(err);
}
@@ -231,9 +204,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage {
finishUnmarshalCacheObjects(vals, ctx.cacheContext(cacheId), ldr);
-// if (F.isEmpty(vals) && !F.isEmpty(valBytes))
-// vals = unmarshalValueBytesCollection(valBytes, ctx, ldr);
-
if (errBytes != null)
err = ctx.marshaller().unmarshal(errBytes, ldr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
index 533c8ca..95176ff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -34,9 +35,13 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -52,6 +57,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** */
private static final long serialVersionUID = 0L;
+ /** Version in which direct marshalling of tx nodes was introduced. */
+ public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0");
+
+ /** Collection to message converter. */
+ public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() {
+ @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) {
+ return new UUIDCollectionMessage(uuids);
+ }
+ };
+
+ /** Message to collection converter. */
+ public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() {
+ @Override public Collection<UUID> apply(UUIDCollectionMessage msg) {
+ return msg.uuids();
+ }
+ };
+
/** Thread ID. */
@GridToStringInclude
private long threadId;
@@ -106,6 +128,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
@GridDirectTransient
private Map<UUID, Collection<UUID>> txNodes;
+ /** Tx nodes direct marshallable message. */
+ @GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class)
+ private Map<UUID, UUIDCollectionMessage> txNodesMsg;
+
/** */
private byte[] txNodesBytes;
@@ -302,8 +328,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
dhtVerVals = dhtVers.values();
}
- if (txNodes != null)
- txNodesBytes = ctx.marshaller().marshal(txNodes);
+ // Marshal txNodes only if there is a node in topology with an older version.
+ if (ctx.exchange().minimumNodeVersion(topologyVersion())
+ .compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) {
+ if (txNodes != null && txNodesBytes == null)
+ txNodesBytes = ctx.marshaller().marshal(txNodes);
+ }
+ else {
+ if (txNodesMsg == null)
+ txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG);
+ }
}
/** {@inheritDoc} */
@@ -334,7 +368,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
}
}
- if (txNodesBytes != null)
+ if (txNodesMsg != null)
+ txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL);
+
+ if (txNodesBytes != null && txNodes == null)
txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr);
}
@@ -431,18 +468,24 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
writer.incrementState();
case 19:
- if (!writer.writeInt("txSize", txSize))
+ if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 20:
- if (!writer.writeMessage("writeVer", writeVer))
+ if (!writer.writeInt("txSize", txSize))
return false;
writer.incrementState();
case 21:
+ if (!writer.writeMessage("writeVer", writeVer))
+ return false;
+
+ writer.incrementState();
+
+ case 22:
if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG))
return false;
@@ -569,7 +612,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 19:
- txSize = reader.readInt("txSize");
+ txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
@@ -577,7 +620,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 20:
- writeVer = reader.readMessage("writeVer");
+ txSize = reader.readInt("txSize");
if (!reader.isLastRead())
return false;
@@ -585,6 +628,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
reader.incrementState();
case 21:
+ writeVer = reader.readMessage("writeVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 22:
writes = reader.readCollection("writes", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -604,7 +655,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 22;
+ return 23;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 579d701..7284fd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @return Lock candidate.
* @throws GridCacheEntryRemovedException If entry was removed.
* @throws GridDistributedLockCancelledException If lock is canceled.
- * @throws IgniteCheckedException If failed.
*/
@Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry)
- throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException {
+ throws GridCacheEntryRemovedException, GridDistributedLockCancelledException {
if (log.isDebugEnabled())
log.debug("Adding entry: " + entry);
@@ -529,35 +528,57 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']');
- boolean found = false;
+ MiniFuture mini = miniFuture(res.miniId());
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture mini = (MiniFuture)fut;
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- if (mini.futureId().equals(res.miniId())) {
- assert mini.node().id().equals(nodeId);
+ if (log.isDebugEnabled())
+ log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
- if (log.isDebugEnabled())
- log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+ mini.onResult(res);
- found = true;
+ if (log.isDebugEnabled())
+ log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini +
+ ", res=" + res + ']');
- mini.onResult(res);
+ return;
+ }
- if (log.isDebugEnabled())
- log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini +
- ", res=" + res + ']');
+ U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
+ ", fut=" + this + ']');
+ }
+ }
- break;
- }
+ /**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
}
}
-
- if (!found)
- U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
- ", fut=" + this + ']');
}
+
+ return null;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 91ab1ca..18281d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -236,7 +236,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
) throws IgniteCheckedException {
invalidateEntries.set(idx, invalidateEntry);
- addKeyBytes(key, false, null, ctx);
+ addKeyBytes(key, false, ctx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
index fe91e5b..35f63e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java
@@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
-import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException;
import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -187,8 +186,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
IgniteTxKey txKey = ctx.txKey(key);
- assert F.isEmpty(req.candidatesByIndex(i));
-
if (log.isDebugEnabled())
log.debug("Unmarshalled key: " + key);
@@ -671,7 +668,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+ catch (GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index 70ebf3f..55ca12d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -181,6 +181,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
/**
+ * Gets flag that indicates that originating node has a near cache that participates in this transaction.
+ *
+ * @return Has near cache flag.
+ */
+ public boolean nearOnOriginatingNode() {
+ return nearOnOriginatingNode;
+ }
+
+ /**
* @return {@code True} if explicit lock transaction.
*/
public boolean explicitLock() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index a67950d..d081c0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -450,20 +450,45 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) {
if (!isDone()) {
- for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- f.onResult(res);
+ mini.onResult(res);
+ }
+ }
+ }
- break;
- }
+ /**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
}
}
}
+
+ return null;
}
/**
@@ -693,7 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
tx.activeCachesDeploymentEnabled());
if (prepErr == null) {
- addDhtValues(res);
+ if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor())
+ addDhtValues(res);
GridCacheVersion min = tx.minVersion();
@@ -949,7 +975,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
}
}
- catch (GridCacheEntryRemovedException e) {
+ catch (GridCacheEntryRemovedException ignore) {
assert false : "Got removed exception on entry with dht local candidate: " + entries;
}
@@ -1072,18 +1098,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
GridCacheContext<?, ?> cacheCtx = cached.context();
- if (entry.explicitVersion() == null) {
- GridCacheMvccCandidate added = cached.candidate(version());
-
- assert added != null : "Null candidate for non-group-lock entry " +
- "[added=" + added + ", entry=" + entry + ']';
- assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" +
- "[added=" + added + ", entry=" + entry + ']';
-
- if (added != null && added.ownerVersion() != null)
- req.owned(entry.txKey(), added.ownerVersion());
- }
-
// Do not invalidate near entry on originating transaction node.
req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) &&
cached.readerId(n.id()) != null);
@@ -1092,7 +1106,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(),
tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion());
- // Do not preload if local node is partition owner.
+ // Do not preload if local node is a partition owner.
if (!owners.contains(cctx.localNode()))
req.markKeyForPreload(idx);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index fcd66c2..394ff89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -345,79 +345,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 22:
+ case 23:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 23:
+ case 24:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 24:
+ case 25:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 25:
+ case 26:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 26:
+ case 27:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 27:
+ case 28:
if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 28:
+ case 29:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 29:
+ case 30:
if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 30:
+ case 31:
if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 31:
+ case 32:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 32:
+ case 33:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 34:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 34:
+ case 35:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -439,7 +439,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 22:
+ case 23:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -447,7 +447,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 23:
+ case 24:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -455,7 +455,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 24:
+ case 25:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -463,7 +463,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 25:
+ case 26:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -471,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 27:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 28:
nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 29:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -495,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 30:
ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -503,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 31:
ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -511,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 32:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -519,7 +519,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 33:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -527,7 +527,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 34:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -535,7 +535,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 35:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -555,6 +555,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 35;
+ return 36;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 83c220d..7131aa5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
if (log.isDebugEnabled())
log.debug("Got removed entry when adding lock (will retry): " + entry);
}
- catch (IgniteCheckedException | GridDistributedLockCancelledException e) {
+ catch (GridDistributedLockCancelledException e) {
if (log.isDebugEnabled())
log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 365b46b..abeb509 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -428,25 +428,21 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" +
this + ']');
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture mini = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (mini.futureId().equals(res.miniId())) {
- assert mini.node().id().equals(nodeId);
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- if (log.isDebugEnabled())
- log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
- mini.onResult(res);
+ mini.onResult(res);
- if (log.isDebugEnabled())
- log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
- ", res=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
+ ", res=" + res + ']');
- return;
- }
- }
+ return;
}
U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
@@ -458,6 +454,37 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
}
/**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @param t Error.
*/
private void onError(Throwable t) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index c5b55bd..9c3701f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -478,25 +478,21 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
if (log.isDebugEnabled())
log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']');
- for (IgniteInternalFuture<Boolean> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture mini = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (mini.futureId().equals(res.miniId())) {
- assert mini.node().id().equals(nodeId);
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- if (log.isDebugEnabled())
- log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']');
- mini.onResult(res);
+ mini.onResult(res);
- if (log.isDebugEnabled())
- log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
- ", res=" + res + ']');
+ if (log.isDebugEnabled())
+ log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini +
+ ", res=" + res + ']');
- return;
- }
- }
+ return;
}
U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res +
@@ -508,6 +504,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
}
/**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<Boolean> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
+ }
+ }
+ }
+
+ return null;
+ }
+
+
+ /**
* @param t Error.
*/
private void onError(Throwable t) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 165da84..805a6a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -20,13 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import java.util.Arrays;
-import java.util.Collection;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest;
@@ -300,7 +298,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
dhtVers[idx] = dhtVer;
// Delegate to super.
- addKeyBytes(key, retVal, (Collection<GridCacheMvccCandidate>)null, ctx);
+ addKeyBytes(key, retVal, ctx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 29774a5..1569b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -210,17 +211,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
- for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
-
- f.onResult(res);
- }
- }
- }
+ if (mini != null)
+ mini.onResult(res);
}
}
@@ -239,6 +233,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
}
/**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
* @param f Future.
* @return {@code True} if mini-future.
*/
@@ -276,32 +301,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param remap Remap flag.
*/
@Override protected void prepare0(boolean remap, boolean topLocked) {
- try {
- boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
-
- if (!txStateCheck) {
- if (tx.setRollbackOnly()) {
- if (tx.timedOut())
- onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
- "was rolled back: " + this));
- else
- onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
- "[state=" + tx.state() + ", tx=" + this + ']'));
- }
- else
- onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
- "prepare [state=" + tx.state() + ", tx=" + this + ']'));
+ boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING);
- return;
+ if (!txStateCheck) {
+ if (tx.setRollbackOnly()) {
+ if (tx.timedOut())
+ onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " +
+ "was rolled back: " + this));
+ else
+ onError(null, new IgniteCheckedException("Invalid transaction state for prepare " +
+ "[state=" + tx.state() + ", tx=" + this + ']'));
}
+ else
+ onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " +
+ "prepare [state=" + tx.state() + ", tx=" + this + ']'));
- prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
-
- markInitialized();
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ return;
}
+
+ prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked);
+
+ markInitialized();
}
/**
@@ -309,7 +329,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
* @param writes Write entries.
* @param remap Remap flag.
* @param topLocked Topology locked flag.
- * @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
private void prepare(
@@ -317,7 +336,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
Iterable<IgniteTxEntry> writes,
boolean remap,
boolean topLocked
- ) throws IgniteCheckedException {
+ ) {
AffinityTopologyVersion topVer = tx.topologyVersion();
assert topVer.topologyVersion() > 0;
@@ -355,9 +374,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
for (GridDistributedTxMapping m : mappings.values()) {
assert !m.empty();
- MiniFuture fut = new MiniFuture(m);
-
- add(fut);
+ add(new MiniFuture(m));
}
Collection<IgniteInternalFuture<?>> futs = (Collection)futures();
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 791d2f3..82e3868 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -187,18 +188,45 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/** {@inheritDoc} */
@Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) {
if (!isDone()) {
- for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
- if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture mini = miniFuture(res.miniId());
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ if (mini != null) {
+ assert mini.node().id().equals(nodeId);
- f.onResult(nodeId, res);
- }
+ mini.onResult(nodeId, res);
+ }
+ }
+ }
+
+ /**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i);
+
+ if (!isMini(fut))
+ continue;
+
+ MiniFuture mini = (MiniFuture)fut;
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
}
}
}
+
+ return null;
}
/** {@inheritDoc} */
@@ -277,10 +305,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
markInitialized();
}
catch (TransactionTimeoutException e) {
- onError( e);
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ onError(e);
}
}
@@ -327,12 +352,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
/**
* @param writes Write entries.
* @param topLocked {@code True} if thread already acquired lock preventing topology change.
- * @throws IgniteCheckedException If failed.
*/
private void prepare(
Iterable<IgniteTxEntry> writes,
boolean topLocked
- ) throws IgniteCheckedException {
+ ) {
AffinityTopologyVersion topVer = tx.topologyVersion();
assert topVer.topologyVersion() > 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 1554a62..103105e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -103,20 +103,45 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
if (!isDone()) {
assert res.clientRemapVersion() == null : res;
- for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) {
- MiniFuture f = (MiniFuture)fut;
+ MiniFuture f = miniFuture(res.miniId());
- if (f.futureId().equals(res.miniId())) {
- assert f.node().id().equals(nodeId);
+ if (f != null) {
+ assert f.node().id().equals(nodeId);
- if (log.isDebugEnabled())
- log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
+ if (log.isDebugEnabled())
+ log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f);
- f.onResult(res);
+ f.onResult(res);
+ }
+ }
+ }
+
+ /**
+ * Finds pending mini future by the given mini ID.
+ *
+ * @param miniId Mini ID to find.
+ * @return Mini future.
+ */
+ @SuppressWarnings("ForLoopReplaceableByForEach")
+ private MiniFuture miniFuture(IgniteUuid miniId) {
+ // We iterate directly over the futs collection here to avoid copy.
+ synchronized (futs) {
+ // Avoid iterator creation.
+ for (int i = 0; i < futs.size(); i++) {
+ MiniFuture mini = (MiniFuture)futs.get(i);
+
+ if (mini.futureId().equals(miniId)) {
+ if (!mini.isDone())
+ return mini;
+ else
+ return null;
}
}
}
+
+ return null;
}
+
/** {@inheritDoc} */
@Override public void prepare() {
if (!tx.state(PREPARING)) {