You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:33:06 UTC
[17/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
index c033273..73f86fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java
@@ -104,8 +104,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
* @param tx Transaction.
* @param dhtWrites DHT writes.
* @param nearWrites Near writes.
- * @param grpLockKey Group lock key if preparing group-lock transaction.
- * @param partLock {@code True} if group-lock transaction locks partition.
* @param txNodes Transaction nodes mapping.
* @param nearXidVer Near transaction ID.
* @param last {@code True} if this is last prepare request for node.
@@ -118,15 +116,13 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
GridDhtTxLocalAdapter tx,
Collection<IgniteTxEntry> dhtWrites,
Collection<IgniteTxEntry> nearWrites,
- IgniteTxKey grpLockKey,
- boolean partLock,
Map<UUID, Collection<UUID>> txNodes,
GridCacheVersion nearXidVer,
boolean last,
boolean onePhaseCommit,
UUID subjId,
int taskNameHash) {
- super(tx, null, dhtWrites, grpLockKey, partLock, txNodes, onePhaseCommit);
+ super(tx, null, dhtWrites, txNodes, onePhaseCommit);
assert futId != null;
assert miniId != null;
@@ -337,79 +333,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
}
switch (writer.state()) {
- case 25:
+ case 23:
if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
- case 26:
+ case 24:
if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries))
return false;
writer.incrementState();
- case 27:
+ case 25:
if (!writer.writeBoolean("last", last))
return false;
writer.incrementState();
- case 28:
+ case 26:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 29:
+ case 27:
if (!writer.writeUuid("nearNodeId", nearNodeId))
return false;
writer.incrementState();
- case 30:
+ case 28:
if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 31:
+ case 29:
if (!writer.writeMessage("nearXidVer", nearXidVer))
return false;
writer.incrementState();
- case 32:
+ case 30:
if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 33:
+ case 31:
if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 34:
+ case 32:
if (!writer.writeBitSet("preloadKeys", preloadKeys))
return false;
writer.incrementState();
- case 35:
+ case 33:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 36:
+ case 34:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 37:
+ case 35:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -431,7 +427,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
return false;
switch (reader.state()) {
- case 25:
+ case 23:
futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
@@ -439,7 +435,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 26:
+ case 24:
invalidateNearEntries = reader.readBitSet("invalidateNearEntries");
if (!reader.isLastRead())
@@ -447,7 +443,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 27:
+ case 25:
last = reader.readBoolean("last");
if (!reader.isLastRead())
@@ -455,7 +451,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 28:
+ case 26:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -463,7 +459,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 29:
+ case 27:
nearNodeId = reader.readUuid("nearNodeId");
if (!reader.isLastRead())
@@ -471,7 +467,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 30:
+ case 28:
nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -479,7 +475,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 31:
+ case 29:
nearXidVer = reader.readMessage("nearXidVer");
if (!reader.isLastRead())
@@ -487,7 +483,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 32:
+ case 30:
ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -495,7 +491,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 33:
+ case 31:
ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -503,7 +499,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 34:
+ case 32:
preloadKeys = reader.readBitSet("preloadKeys");
if (!reader.isLastRead())
@@ -511,7 +507,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 35:
+ case 33:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -519,7 +515,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 36:
+ case 34:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -527,7 +523,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
reader.incrementState();
- case 37:
+ case 35:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -547,6 +543,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 38;
+ return 36;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 30464a5..0a69910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -77,7 +77,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
* @param timeout Timeout.
* @param ctx Cache context.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
* @param nearXidVer Near transaction ID.
* @param txNodes Transaction nodes mapping.
*/
@@ -97,14 +96,13 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
boolean invalidate,
long timeout,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
GridCacheVersion nearXidVer,
Map<UUID, Collection<UUID>> txNodes,
@Nullable UUID subjId,
int taskNameHash
) {
super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
+ txSize, subjId, taskNameHash);
assert nearNodeId != null;
assert rmtFutId != null;
@@ -139,7 +137,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
* @param timeout Timeout.
* @param ctx Cache context.
* @param txSize Expected transaction size.
- * @param grpLockKey Group lock key if transaction is group-lock.
*/
public GridDhtTxRemote(
GridCacheSharedContext ctx,
@@ -158,12 +155,11 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
boolean invalidate,
long timeout,
int txSize,
- @Nullable IgniteTxKey grpLockKey,
@Nullable UUID subjId,
int taskNameHash
) {
super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout,
- txSize, grpLockKey, subjId, taskNameHash);
+ txSize, subjId, taskNameHash);
assert nearNodeId != null;
assert rmtFutId != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
index 8da4da5..098ec97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
@@ -86,7 +86,9 @@ public class GridNoStorageCacheMap extends GridCacheConcurrentMap {
boolean create)
{
if (create) {
- GridCacheMapEntry entry = new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0);
+ GridCacheMapEntry entry = ctx.useOffheapEntry() ?
+ new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0) :
+ new GridDhtCacheEntry(ctx, topVer, key, hash(key.hashCode()), val, null, 0);
return new GridTriple<>(entry, null, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 35ba2a9..2f41e63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -270,7 +270,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M
AffinityTopologyVersion topVer
) {
if (CU.affinityNodes(cctx, topVer).isEmpty()) {
- onDone(new ClusterTopologyCheckedException("Failed to map keys for cache " +
+ onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 905f7bf..8bbfe96 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -124,6 +124,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheMapEntry next,
int hdrId)
{
+ if (ctx.useOffheapEntry())
+ return new GridDhtAtomicOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
+
return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
}
});
@@ -1041,7 +1044,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
// Do not check topology version for CLOCK versioning since
// partition exchange will wait for near update future.
- if (topology().topologyVersion().equals(req.topologyVersion()) ||
+ // Also do not check topology version if topology was locked on near node by
+ // external transaction or explicit lock.
+ if (topology().topologyVersion().equals(req.topologyVersion()) || req.topologyLocked() ||
ctx.config().getAtomicWriteOrderMode() == CLOCK) {
ClusterNode node = ctx.discovery().node(nodeId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
new file mode 100644
index 0000000..91a8e65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicOffHeapCacheEntry.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+
+/**
+ * DHT atomic cache entry for off-heap tiered or off-heap values modes.
+ */
+public class GridDhtAtomicOffHeapCacheEntry extends GridDhtAtomicCacheEntry {
+ /** Off-heap value pointer. */
+ private long valPtr;
+
+ /**
+ * @param ctx Cache context.
+ * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
+ * @param key Cache key.
+ * @param hash Key hash value.
+ * @param val Entry value.
+ * @param next Next entry in the linked list.
+ * @param hdrId Header id.
+ */
+ public GridDhtAtomicOffHeapCacheEntry(GridCacheContext ctx,
+ AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ int hash,
+ CacheObject val,
+ GridCacheMapEntry next,
+ int hdrId) {
+ super(ctx, topVer, key, hash, val, next, hdrId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean hasOffHeapPointer() {
+ return valPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long offHeapPointer() {
+ return valPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void offHeapPointer(long valPtr) {
+ this.valPtr = valPtr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index d0a7620..c5b5a37 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -88,6 +88,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri
}
/**
+ * Sets update error.
+ * @param err
+ */
+ public void onError(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
* @return Gets update error.
*/
public IgniteCheckedException error() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 072ab52..76e05e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.dr.*;
+import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
@@ -136,6 +137,9 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
/** Task name hash. */
private final int taskNameHash;
+ /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
+ private boolean topLocked;
+
/** Skip store flag. */
private final boolean skipStore;
@@ -289,7 +293,23 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
* @param waitTopFut Whether to wait for topology future.
*/
public void map(boolean waitTopFut) {
- mapOnTopology(keys, false, null, waitTopFut);
+ AffinityTopologyVersion topVer = null;
+
+ IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
+
+ if (tx != null && tx.topologyVersionSnapshot() != null)
+ topVer = tx.topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
+
+ if (topVer == null)
+ mapOnTopology(keys, false, null, waitTopFut);
+ else {
+ topLocked = true;
+
+ map0(topVer, keys, false, null);
+ }
}
/** {@inheritDoc} */
@@ -341,7 +361,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
updateNear(singleReq, res);
if (res.error() != null)
- onDone(addFailedKeys(res.failedKeys(), res.error()));
+ onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error());
else {
if (op == TRANSFORM) {
if (ret != null)
@@ -430,15 +450,12 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
topVer = fut.topologyVersion();
-
- if (futVer == null)
- // Assign future version in topology read lock before first exception may be thrown.
- futVer = cctx.versions().next(topVer);
}
else {
if (waitTopFut) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
+ @Override
+ public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
mapOnTopology(keys, remap, oldNodeId, waitTopFut);
}
});
@@ -448,9 +465,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
-
- if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC))
- cctx.mvcc().addAtomicFuture(version(), this);
}
finally {
cache.topology().readUnlock();
@@ -474,6 +488,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
}
/**
+ * @param topVer Topology version.
* @param keys Keys to map.
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
@@ -494,6 +509,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
return;
}
+ if (futVer == null)
+ // Assign future version in topology read lock before first exception may be thrown.
+ futVer = cctx.versions().next(topVer);
+
+ if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC))
+ cctx.mvcc().addAtomicFuture(version(), this);
+
CacheConfiguration ccfg = cctx.config();
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
@@ -579,6 +601,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
fastMap,
updVer,
topVer,
+ topLocked,
syncMode,
op,
retval,
@@ -716,6 +739,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
fastMap,
updVer,
topVer,
+ topLocked,
syncMode,
op,
retval,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
index e0e3e26..a96a666 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java
@@ -64,6 +64,9 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** Topology version. */
private AffinityTopologyVersion topVer;
+ /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */
+ private boolean topLocked;
+
/** Write synchronization mode. */
private CacheWriteSynchronizationMode syncMode;
@@ -162,6 +165,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
boolean fastMap,
@Nullable GridCacheVersion updateVer,
@NotNull AffinityTopologyVersion topVer,
+ boolean topLocked,
CacheWriteSynchronizationMode syncMode,
GridCacheOperation op,
boolean retval,
@@ -179,6 +183,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
this.updateVer = updateVer;
this.topVer = topVer;
+ this.topLocked = topLocked;
this.syncMode = syncMode;
this.op = op;
this.retval = retval;
@@ -254,6 +259,13 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
}
/**
+ * @return Topology locked flag.
+ */
+ public boolean topologyLocked() {
+ return topLocked;
+ }
+
+ /**
* @return Cache write synchronization mode.
*/
public CacheWriteSynchronizationMode writeSynchronizationMode() {
@@ -664,18 +676,24 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
writer.incrementState();
case 20:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeBoolean("topLocked", topLocked))
return false;
writer.incrementState();
case 21:
- if (!writer.writeMessage("updateVer", updateVer))
+ if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
case 22:
+ if (!writer.writeMessage("updateVer", updateVer))
+ return false;
+
+ writer.incrementState();
+
+ case 23:
if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
return false;
@@ -842,7 +860,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 20:
- topVer = reader.readMessage("topVer");
+ topLocked = reader.readBoolean("topLocked");
if (!reader.isLastRead())
return false;
@@ -850,7 +868,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 21:
- updateVer = reader.readMessage("updateVer");
+ topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
return false;
@@ -858,6 +876,14 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
reader.incrementState();
case 22:
+ updateVer = reader.readMessage("updateVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 23:
vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -877,7 +903,7 @@ public class GridNearAtomicUpdateRequest extends GridCacheMessage implements Gri
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 23;
+ return 24;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
index 01d5722..330e43c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java
@@ -137,9 +137,17 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
}
/**
+ * Sets update error.
+ * @param err
+ */
+ public void error(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
* @return Update error, if any.
*/
- public Throwable error() {
+ public IgniteCheckedException error() {
return err;
}
@@ -335,10 +343,12 @@ public class GridNearAtomicUpdateResponse extends GridCacheMessage implements Gr
* @param e Error cause.
*/
public synchronized void addFailedKeys(Collection<KeyCacheObject> keys, Throwable e) {
- if (failedKeys == null)
- failedKeys = new ArrayList<>(keys.size());
+ if (keys != null) {
+ if (failedKeys == null)
+ failedKeys = new ArrayList<>(keys.size());
- failedKeys.addAll(keys);
+ failedKeys.addAll(keys);
+ }
if (err == null)
err = new IgniteCheckedException("Failed to update keys on primary node.");
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index c92d9ce..05b3c7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -86,6 +86,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
GridCacheMapEntry next,
int hdrId)
{
+ if (ctx.useOffheapEntry())
+ return new GridDhtColocatedOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
+
return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val, next, hdrId);
}
});
@@ -126,7 +129,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte
boolean allowDetached
) {
return allowDetached && !ctx.affinity().primary(ctx.localNode(), key, topVer) ?
- new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0) : entryExx(key, topVer);
+ createEntry(key) : entryExx(key, topVer);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 5b74b31..372c517 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -292,7 +292,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
false,
false);
- cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion()));
+ cand.topologyVersion(topVer.get());
}
}
else {
@@ -311,7 +311,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
false,
false);
- cand.topologyVersion(new AffinityTopologyVersion(topVer.get().topologyVersion()));
+ cand.topologyVersion(topVer.get());
}
else
cand = cand.reenter();
@@ -754,8 +754,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
mappedKeys.size(),
inTx() ? tx.size() : mappedKeys.size(),
inTx() && tx.syncCommit(),
- inTx() ? tx.groupLockKey() : null,
- inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
@@ -786,6 +784,8 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Got removed entry in lockAsync(..) method (will retry): " + entry);
+
+ entry = null;
}
}
@@ -1088,10 +1088,6 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
// If primary node left the grid before lock acquisition, fail the whole future.
throw newTopologyException(null, primary.id());
- if (inTx() && tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
- " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
-
if (mapping == null || !primary.id().equals(mapping.node().id()))
mapping = new GridNearLockMapping(primary, key);
else
@@ -1279,25 +1275,18 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached();
- try {
- if (res.dhtVersion(i) == null) {
- onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
- "(will fail the lock): " + res));
-
- return;
- }
-
- // Set value to detached entry.
- entry.resetFromPrimary(newVal, dhtVer);
-
- if (log.isDebugEnabled())
- log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
- }
- catch (IgniteCheckedException e) {
- onDone(e);
+ if (res.dhtVersion(i) == null) {
+ onDone(new IgniteCheckedException("Failed to receive DHT version from remote node " +
+ "(will fail the lock): " + res));
return;
}
+
+ // Set value to detached entry.
+ entry.resetFromPrimary(newVal, dhtVer);
+
+ if (log.isDebugEnabled())
+ log.debug("Processed response for entry [res=" + res + ", entry=" + entry + ']');
}
else
cctx.mvcc().markExplicitOwner(k, threadId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
new file mode 100644
index 0000000..ed842ad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedOffHeapCacheEntry.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.colocated;
+
+import org.apache.ignite.internal.processors.affinity.*;
+import org.apache.ignite.internal.processors.cache.*;
+
+/**
+ * Cache entry for colocated cache for off-heap tiered or off-heap values modes.
+ */
+public class GridDhtColocatedOffHeapCacheEntry extends GridDhtColocatedCacheEntry {
+ /** Off-heap value pointer. */
+ private long valPtr;
+
+ /**
+ * @param ctx Cache context.
+ * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed).
+ * @param key Cache key.
+ * @param hash Key hash value.
+ * @param val Entry value.
+ * @param next Next entry in the linked list.
+ * @param hdrId Header id.
+ */
+ public GridDhtColocatedOffHeapCacheEntry(GridCacheContext ctx,
+ AffinityTopologyVersion topVer,
+ KeyCacheObject key,
+ int hash,
+ CacheObject val,
+ GridCacheMapEntry next,
+ int hdrId) {
+ super(ctx, topVer, key, hash, val, next, hdrId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean hasOffHeapPointer() {
+ return valPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long offHeapPointer() {
+ return valPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void offHeapPointer(long valPtr) {
+ this.valPtr = valPtr;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index 5c4dd13..2c84bd4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -46,10 +46,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
*
* @param val Value.
* @param ver Version.
- * @throws IgniteCheckedException If value unmarshalling failed.
*/
- public void resetFromPrimary(CacheObject val, GridCacheVersion ver)
- throws IgniteCheckedException {
+ public void resetFromPrimary(CacheObject val, GridCacheVersion ver) {
value(val);
this.ver = ver;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index 61aaa14..78966d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -451,6 +451,12 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
* @param res Result callback.
*/
void onResult(GridDhtForceKeysResponse res) {
+ if (res.error() != null) {
+ onDone(res.error());
+
+ return;
+ }
+
Collection<KeyCacheObject> missedKeys = res.missedKeys();
boolean remapMissed = false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 8919185..41ce0be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -42,6 +42,13 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
/** Mini-future ID. */
private IgniteUuid miniId;
+ /** Error. */
+ @GridDirectTransient
+ private volatile IgniteCheckedException err;
+
+ /** Serialized error. */
+ private byte[] errBytes;
+
/** Missed (not found) keys. */
@GridToStringInclude
@GridDirectCollection(KeyCacheObject.class)
@@ -73,6 +80,21 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
this.miniId = miniId;
}
+ /**
+ * Sets error.
+ * @param err
+ */
+ public void error(IgniteCheckedException err){
+ this.err = err;
+ }
+
+ /**
+ * @return Error, if any.
+ */
+ public IgniteCheckedException error() {
+ return err;
+ }
+
/** {@inheritDoc} */
@Override public boolean allowForStartup() {
return true;
@@ -142,6 +164,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
for (GridCacheEntryInfo info : infos)
info.marshal(cctx);
}
+
+ errBytes = ctx.marshaller().marshal(err);
}
/** {@inheritDoc} */
@@ -157,6 +181,8 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
for (GridCacheEntryInfo info : infos)
info.unmarshal(cctx, ldr);
}
+
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
}
/** {@inheritDoc} */
@@ -175,24 +201,30 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
switch (writer.state()) {
case 3:
- if (!writer.writeIgniteUuid("futId", futId))
+ if (!writer.writeByteArray("errBytes", errBytes))
return false;
writer.incrementState();
case 4:
- if (!writer.writeCollection("infos", infos, MessageCollectionItemType.MSG))
+ if (!writer.writeIgniteUuid("futId", futId))
return false;
writer.incrementState();
case 5:
- if (!writer.writeIgniteUuid("miniId", miniId))
+ if (!writer.writeCollection("infos", infos, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 6:
+ if (!writer.writeIgniteUuid("miniId", miniId))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
if (!writer.writeCollection("missedKeys", missedKeys, MessageCollectionItemType.MSG))
return false;
@@ -215,7 +247,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
switch (reader.state()) {
case 3:
- futId = reader.readIgniteUuid("futId");
+ errBytes = reader.readByteArray("errBytes");
if (!reader.isLastRead())
return false;
@@ -223,7 +255,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
reader.incrementState();
case 4:
- infos = reader.readCollection("infos", MessageCollectionItemType.MSG);
+ futId = reader.readIgniteUuid("futId");
if (!reader.isLastRead())
return false;
@@ -231,7 +263,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
reader.incrementState();
case 5:
- miniId = reader.readIgniteUuid("miniId");
+ infos = reader.readCollection("infos", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
@@ -239,6 +271,14 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
reader.incrementState();
case 6:
+ miniId = reader.readIgniteUuid("miniId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
missedKeys = reader.readCollection("missedKeys", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -258,7 +298,7 @@ public class GridDhtForceKeysResponse extends GridCacheMessage implements GridCa
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 7;
+ return 8;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 45d332c..4b8db00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1252,7 +1252,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
cctx.kernalContext().timeout().removeTimeoutObject(old);
GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter(
- cctx.gridConfig().getNetworkTimeout() * cctx.gridConfig().getCacheConfiguration().length) {
+ cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) {
@Override public void onTimeout() {
if (isDone())
return;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 29c1d45..8258b14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -76,6 +76,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
) {
// Can't hold any locks here - this method is invoked when
// holding write-lock on the whole cache map.
+ if (ctx.useOffheapEntry())
+ return new GridNearOffHeapCacheEntry(ctx, key, hash, val, next, hdrId);
+
return new GridNearCacheEntry(ctx, key, hash, val, next, hdrId);
}
});
@@ -436,16 +439,6 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
}
/** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> swapIterator() throws IgniteCheckedException {
- return dht().swapIterator();
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Map.Entry<K, V>> offHeapIterator() throws IgniteCheckedException {
- return dht().offHeapIterator();
- }
-
- /** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
return dht().offHeapEntriesCount();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index c7fa4ab..9e8d76b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -204,15 +204,13 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
* @param topVer Topology version.
* @return {@code True} if reset was done.
* @throws GridCacheEntryRemovedException If obsolete.
- * @throws IgniteCheckedException If failed.
*/
- @SuppressWarnings( {"RedundantTypeArguments"})
public boolean resetFromPrimary(CacheObject val,
GridCacheVersion ver,
GridCacheVersion dhtVer,
UUID primaryNodeId,
AffinityTopologyVersion topVer)
- throws GridCacheEntryRemovedException, IgniteCheckedException
+ throws GridCacheEntryRemovedException
{
assert dhtVer != null;
@@ -301,7 +299,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
else {
CacheObject val0 = valueBytesUnlocked();
- return F.t(ver, val0);
+ return F.t(dhtVer, val0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
index 57652bd..73d877a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java
@@ -65,7 +65,7 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
/** Error. */
@GridDirectTransient
- private Throwable err;
+ private IgniteCheckedException err;
/** Serialized error. */
private byte[] errBytes;
@@ -152,20 +152,20 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe
* @return Topology version if this response has invalid partitions.
*/
@Override public AffinityTopologyVersion topologyVersion() {
- return topVer;
+ return topVer != null ? topVer : super.topologyVersion();
}
/**
* @return Error.
*/
- public Throwable error() {
+ public IgniteCheckedException error() {
return err;
}
/**
* @param err Error.
*/
- public void error(Throwable err) {
+ public void error(IgniteCheckedException err) {
this.err = err;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index a427b65..0ffb4e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -890,8 +890,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
mappedKeys.size(),
inTx() ? tx.size() : mappedKeys.size(),
inTx() && tx.syncCommit(),
- inTx() ? tx.groupLockKey() : null,
- inTx() && tx.partitionLock(),
inTx() ? tx.subjectId() : null,
inTx() ? tx.taskNameHash() : 0,
read ? accessTtl : -1L,
@@ -1188,10 +1186,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// If primary node left the grid before lock acquisition, fail the whole future.
throw newTopologyException(null, primary.id());
- if (inTx() && tx.groupLock() && !primary.isLocal())
- throw new IgniteCheckedException("Failed to start group lock transaction (local node is not primary for " +
- " key) [key=" + key + ", primaryNodeId=" + primary.id() + ']');
-
if (mapping == null || !primary.id().equals(mapping.node().id()))
mapping = new GridNearLockMapping(primary, key);
else
@@ -1450,11 +1444,6 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
// Replace old entry with new one.
entries.set(i, (GridDistributedCacheEntry)cctx.cache().entryEx(entry.key()));
}
- catch (IgniteCheckedException e) {
- onDone(e);
-
- return;
- }
}
i++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
index 1ba4bfe..e71dd65 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java
@@ -21,7 +21,6 @@ import org.apache.ignite.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
-import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -105,8 +104,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
* @param keyCnt Number of keys.
* @param txSize Expected transaction size.
* @param syncCommit Synchronous commit flag.
- * @param grpLockKey Group lock key if this is a group-lock transaction.
- * @param partLock If partition is locked.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param accessTtl TTL for read operation.
@@ -130,8 +127,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
int keyCnt,
int txSize,
boolean syncCommit,
- @Nullable IgniteTxKey grpLockKey,
- boolean partLock,
@Nullable UUID subjId,
int taskNameHash,
long accessTtl,
@@ -151,8 +146,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
timeout,
keyCnt,
txSize,
- grpLockKey,
- partLock,
skipStore);
assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0;
@@ -356,79 +349,79 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
}
switch (writer.state()) {
- case 23:
+ case 21:
if (!writer.writeLong("accessTtl", accessTtl))
return false;
writer.incrementState();
- case 24:
+ case 22:
if (!writer.writeObjectArray("dhtVers", dhtVers, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 25:
+ case 23:
if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 26:
+ case 24:
if (!writer.writeBoolean("hasTransforms", hasTransforms))
return false;
writer.incrementState();
- case 27:
+ case 25:
if (!writer.writeBoolean("implicitSingleTx", implicitSingleTx))
return false;
writer.incrementState();
- case 28:
+ case 26:
if (!writer.writeBoolean("implicitTx", implicitTx))
return false;
writer.incrementState();
- case 29:
+ case 27:
if (!writer.writeIgniteUuid("miniId", miniId))
return false;
writer.incrementState();
- case 30:
+ case 28:
if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit))
return false;
writer.incrementState();
- case 31:
+ case 29:
if (!writer.writeBoolean("retVal", retVal))
return false;
writer.incrementState();
- case 32:
+ case 30:
if (!writer.writeUuid("subjId", subjId))
return false;
writer.incrementState();
- case 33:
+ case 31:
if (!writer.writeBoolean("syncCommit", syncCommit))
return false;
writer.incrementState();
- case 34:
+ case 32:
if (!writer.writeInt("taskNameHash", taskNameHash))
return false;
writer.incrementState();
- case 35:
+ case 33:
if (!writer.writeMessage("topVer", topVer))
return false;
@@ -450,7 +443,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
return false;
switch (reader.state()) {
- case 23:
+ case 21:
accessTtl = reader.readLong("accessTtl");
if (!reader.isLastRead())
@@ -458,7 +451,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 24:
+ case 22:
dhtVers = reader.readObjectArray("dhtVers", MessageCollectionItemType.MSG, GridCacheVersion.class);
if (!reader.isLastRead())
@@ -466,7 +459,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 25:
+ case 23:
filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class);
if (!reader.isLastRead())
@@ -474,7 +467,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 26:
+ case 24:
hasTransforms = reader.readBoolean("hasTransforms");
if (!reader.isLastRead())
@@ -482,7 +475,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 27:
+ case 25:
implicitSingleTx = reader.readBoolean("implicitSingleTx");
if (!reader.isLastRead())
@@ -490,7 +483,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 28:
+ case 26:
implicitTx = reader.readBoolean("implicitTx");
if (!reader.isLastRead())
@@ -498,7 +491,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 29:
+ case 27:
miniId = reader.readIgniteUuid("miniId");
if (!reader.isLastRead())
@@ -506,7 +499,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 30:
+ case 28:
onePhaseCommit = reader.readBoolean("onePhaseCommit");
if (!reader.isLastRead())
@@ -514,7 +507,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 31:
+ case 29:
retVal = reader.readBoolean("retVal");
if (!reader.isLastRead())
@@ -522,7 +515,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 32:
+ case 30:
subjId = reader.readUuid("subjId");
if (!reader.isLastRead())
@@ -530,7 +523,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 33:
+ case 31:
syncCommit = reader.readBoolean("syncCommit");
if (!reader.isLastRead())
@@ -538,7 +531,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 34:
+ case 32:
taskNameHash = reader.readInt("taskNameHash");
if (!reader.isLastRead())
@@ -546,7 +539,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
reader.incrementState();
- case 35:
+ case 33:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -566,7 +559,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 36;
+ return 34;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
new file mode 100644
index 0000000..25eb869
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOffHeapCacheEntry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.internal.processors.cache.*;
+
+/**
+ * Near cache entry for off-heap tiered or off-heap values modes.
+ */
+public class GridNearOffHeapCacheEntry extends GridNearCacheEntry {
+ /** Off-heap value pointer. */
+ private long valPtr;
+
+ /**
+ * @param ctx Cache context.
+ * @param key Cache key.
+ * @param hash Key hash value.
+ * @param val Entry value.
+ * @param next Next entry in the linked list.
+ * @param hdrId Header id.
+ */
+ public GridNearOffHeapCacheEntry(GridCacheContext ctx,
+ KeyCacheObject key,
+ int hash,
+ CacheObject val,
+ GridCacheMapEntry next,
+ int hdrId) {
+ super(ctx, key, hash, val, next, hdrId);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean hasOffHeapPointer() {
+ return valPtr != 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected long offHeapPointer() {
+ return valPtr;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void offHeapPointer(long valPtr) {
+ this.valPtr = valPtr;
+ }
+}