You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 22:22:36 UTC
[6/6] incubator-ignite git commit: # ignite-51
# ignite-51
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ea39d669
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ea39d669
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ea39d669
Branch: refs/heads/ignite-51
Commit: ea39d669b0964b0d420c4e21f0f862667c88c41d
Parents: 6445389
Author: sboikov <sb...@gridgain.com>
Authored: Mon Mar 2 18:41:15 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Mar 3 00:21:43 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 2 +-
.../communication/GridIoMessageFactory.java | 6 +
.../processors/cache/GridCacheAdapter.java | 24 +-
.../processors/cache/GridCacheEntryEx.java | 21 +-
.../processors/cache/GridCacheEntryInfo.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 161 ++---------
.../processors/cache/GridCacheSwapManager.java | 4 +-
.../distributed/GridCacheTtlUpdateRequest.java | 4 +-
.../GridDistributedTxRemoteAdapter.java | 40 +--
.../distributed/dht/GridDhtCacheAdapter.java | 12 +-
.../distributed/dht/GridDhtCacheEntry.java | 11 +-
.../cache/distributed/dht/GridDhtGetFuture.java | 6 +-
.../distributed/dht/GridDhtLockFuture.java | 2 +-
.../distributed/dht/GridDhtLockRequest.java | 2 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 10 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 16 +-
.../dht/GridDhtTxPrepareResponse.java | 24 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 2 +-
.../dht/atomic/GridDhtAtomicCache.java | 2 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 26 +-
.../colocated/GridDhtColocatedLockFuture.java | 2 +-
.../colocated/GridDhtDetachedCacheEntry.java | 7 +-
.../distributed/near/GridNearCacheEntry.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 2 +-
.../cache/distributed/near/GridNearTxLocal.java | 6 +-
.../near/GridNearTxPrepareFuture.java | 8 +-
.../distributed/near/GridNearTxRemote.java | 2 +-
.../cache/local/GridLocalTxFuture.java | 4 +-
.../continuous/CacheContinuousQueryEntry.java | 272 +++++++++++--------
.../continuous/CacheContinuousQueryEvent.java | 32 ++-
.../continuous/CacheContinuousQueryHandler.java | 64 ++---
.../continuous/CacheContinuousQueryManager.java | 80 +++---
.../cache/transactions/IgniteTxEntry.java | 29 +-
.../transactions/IgniteTxLocalAdapter.java | 18 +-
.../cache/transactions/IgniteTxManager.java | 8 +-
.../continuous/GridContinuousMessage.java | 43 ++-
.../continuous/GridContinuousProcessor.java | 40 ++-
.../cache/GridCacheAbstractFullApiSelfTest.java | 2 +-
.../cache/GridCacheStoreValueBytesSelfTest.java | 21 --
.../processors/cache/GridCacheTestEntryEx.java | 23 +-
...achePartitionedMultiNodeCounterSelfTest.java | 4 +-
41 files changed, 470 insertions(+), 589 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 68d8c0b..1b0c09c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -150,7 +150,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
}
- ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false);
+ ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
}
catch (IgniteCheckedException e) {
U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9642bfb..57b5ac4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.cache.query.continuous.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.clock.*;
@@ -528,6 +529,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 96:
+ msg = new CacheContinuousQueryEntry();
+
+ break;
+
default:
if (ext != null) {
for (MessageFactory factory : ext) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 34d65bf..58e21f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -963,12 +963,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
if (peek != null) {
CacheObject v = peek.get();
- return v.value(ctx, true);
-// TODO IGNITE-51
-// if (ctx.portableEnabled())
-// v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
-//
-// return F.t(ctx.cloneOnFlag(v));
+ Object val0 = v.value(ctx, true);
+
+ if (ctx.portableEnabled())
+ val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
+
+ return F.t((V)val0);
}
}
@@ -980,12 +980,12 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K, V>,
if (peek != null) {
CacheObject v = peek.get();
- return v.value(ctx, true);
-// TODO IGNITE-51
-// if (ctx.portableEnabled())
-// v = (V)ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
-//
-// return F.t(ctx.cloneOnFlag(v));
+ Object val0 = v.value(ctx, true);
+
+ if (ctx.portableEnabled())
+ val0 = ctx.unwrapPortableIfNeeded(v, ctx.keepPortable());
+
+ return F.t((V) val0);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 30df242..5196965 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -565,17 +565,6 @@ public interface GridCacheEntryEx {
public boolean markObsoleteVersion(GridCacheVersion ver);
/**
- * @return Key bytes.
- */
- public byte[] keyBytes();
-
- /**
- * @return Key bytes.
- * @throws IgniteCheckedException If marshalling failed.
- */
- public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException;
-
- /**
* @return Version.
* @throws GridCacheEntryRemovedException If entry has been removed.
*/
@@ -858,16 +847,10 @@ public interface GridCacheEntryEx {
@Nullable public GridCacheMvccCandidate localOwner() throws GridCacheEntryRemovedException;
/**
- * @param keyBytes Key bytes.
- * @throws GridCacheEntryRemovedException If entry was removed.
- */
- public void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException;
-
- /**
* @return Value bytes.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- public GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException;
+ public CacheObject valueBytes() throws GridCacheEntryRemovedException;
/**
* Gets cached serialized value bytes.
@@ -877,7 +860,7 @@ public interface GridCacheEntryEx {
* @throws IgniteCheckedException If serialization failed.
* @throws GridCacheEntryRemovedException If entry was removed.
*/
- @Nullable public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver)
+ @Nullable public CacheObject valueBytes(@Nullable GridCacheVersion ver)
throws IgniteCheckedException, GridCacheEntryRemovedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 96d7ee2..0179ad0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -325,7 +325,8 @@ public class GridCacheEntryInfo implements Externalizable, Message {
public void marshal(GridCacheContext ctx) throws IgniteCheckedException {
key.prepareMarshal(ctx.cacheObjectContext());
- val.prepareMarshal(ctx.cacheObjectContext());
+ if (val != null)
+ val.prepareMarshal(ctx.cacheObjectContext());
// TODO IGNITE-51
// boolean depEnabled = ctx.gridDeploy().enabled();
//
@@ -352,7 +353,8 @@ public class GridCacheEntryInfo implements Externalizable, Message {
public void unmarshal(GridCacheContext ctx, ClassLoader clsLdr) throws IgniteCheckedException {
key.finishUnmarshal(ctx, clsLdr);
- val.finishUnmarshal(ctx, clsLdr);
+ if (val != null)
+ val.finishUnmarshal(ctx, clsLdr);
// TODO IGNITE-51
// Marshaller mrsh = ctx.marshaller();
//
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 21255e2..7d1dc73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -272,33 +272,21 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
/**
* @return Value bytes.
*/
- protected GridCacheValueBytes valueBytesUnlocked() {
+ protected CacheObject valueBytesUnlocked() {
assert Thread.holdsLock(this);
- if (!isOffHeapValuesOnly()) {
-// TODO IGNITE-51.
-// if (valBytes != null)
-// return GridCacheValueBytes.marshaled(valBytes);
+ CacheObject val0 = val;
- try {
- if (valPtr != 0 && cctx.offheapTiered())
- return offheapValueBytes();
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- else {
- if (valPtr != 0) {
- GridUnsafeMemory mem = cctx.unsafeMemory();
-
- assert mem != null;
+ if (val0 == null && valPtr != 0) {
+ IgniteBiTuple<byte[], Boolean> t = valueBytes0();
- return mem.getOffHeap(valPtr);
- }
+ if (t.get2())
+ val0 = cctx.toCacheObject(t.get1(), null);
+ else
+ val0 = cctx.toCacheObject(null, t.get1());
}
- return GridCacheValueBytes.nil();
+ return val0;
}
/** {@inheritDoc} */
@@ -439,33 +427,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
info.setNew(isStartVersion());
info.setDeleted(deletedUnlocked());
- if (!expired) {
- CacheObject val0 = val;
-
- if (val0 == null && valPtr != 0) {
- IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
- if (t.get2())
- val0 = cctx.toCacheObject(t.get1(), null);
- else
- val0 = cctx.toCacheObject(null, t.get1());
-
- }
-
- info.value(val0);
-// TODO IGNITE-51.
-// info.value(cctx.kernalContext().config().isPeerClassLoadingEnabled() ?
-// rawGetOrUnmarshalUnlocked(false) : val);
-//
-// GridCacheValueBytes valBytes = valueBytesUnlocked();
-//
-// if (!valBytes.isNull()) {
-// if (valBytes.isPlain())
-// info.value((V)valBytes.get());
-// else
-// info.valueBytes(valBytes.get());
-// }
- }
+ if (!expired)
+ info.value(valueBytesUnlocked());
}
}
}
@@ -611,6 +574,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
* @return Value bytes and flag indicating whether value is byte array.
*/
protected IgniteBiTuple<byte[], Boolean> valueBytes0() {
+ assert Thread.holdsLock(this);
+
if (valPtr != 0) {
assert isOffHeapValuesOnly() || cctx.offheapTiered();
@@ -827,7 +792,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
taskName);
}
- cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null);
+ cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
// No more notifications.
evt = false;
@@ -1064,8 +1029,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val;
- GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
if (intercept) {
key0 = key.value(cctx, false);
val0 = CU.value(val, cctx, false);
@@ -1138,7 +1101,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+ cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -1228,8 +1191,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
}
- GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
if (old == null)
old = saveValueForIndexUnlocked();
@@ -1295,7 +1256,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear()))
- cctx.continuousQueries().onEntryUpdated(this, key, null, null, old, oldBytes, false);
+ cctx.continuousQueries().onEntryUpdated(this, key, null, old, false);
cctx.dataStructures().onEntryUpdated(key, true);
}
@@ -1388,8 +1349,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Possibly get old value form store.
old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
- GridCacheValueBytes oldBytes = valueBytesUnlocked();
-
boolean readThrough = false;
Object old0 = null;
@@ -1618,7 +1577,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (res)
updateMetrics(op, metrics);
- cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), old, oldBytes, false);
+ cctx.continuousQueries().onEntryUpdated(this, key, val, old, false);
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -1821,7 +1780,6 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
// Prepare old value and value bytes.
oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val;
- GridCacheValueBytes oldValBytes = valueBytesUnlocked();
// Possibly read value from store.
boolean readThrough = false;
@@ -2196,8 +2154,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
updateMetrics(op, metrics);
if (cctx.isReplicated() || primary)
- cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(),
- oldVal, oldValBytes, false);
+ cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false);
cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
@@ -3298,8 +3255,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (!skipQryNtf) {
if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer))
- cctx.continuousQueries().onEntryUpdated(this, key, val, valueBytesUnlocked(), null, null,
- preload);
+ cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload);
cctx.dataStructures().onEntryUpdated(key, false);
}
@@ -3617,7 +3573,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
null);
}
- cctx.continuousQueries().onEntryExpired(this, key, expiredVal, null);
+ cctx.continuousQueries().onEntryExpired(this, key, expiredVal);
}
}
}
@@ -3707,84 +3663,25 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
- @Override public synchronized void keyBytes(byte[] keyBytes) throws GridCacheEntryRemovedException {
- checkObsolete();
-
-// TODO IGNITE-51.
-// if (keyBytes != null)
-// this.keyBytes = keyBytes;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized byte[] keyBytes() {
-// TODO IGNITE-51.
-// return keyBytes;
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] getOrMarshalKeyBytes() throws IgniteCheckedException {
-// TODO IGNITE-51.
-// byte[] bytes = keyBytes();
-//
-// if (bytes != null)
-// return bytes;
-//
-// bytes = CU.marshal(cctx.shared(), key);
-//
-// synchronized (this) {
-// keyBytes = bytes;
-// }
-//
-// return bytes;
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized GridCacheValueBytes valueBytes() throws GridCacheEntryRemovedException {
+ @Override public synchronized CacheObject valueBytes() throws GridCacheEntryRemovedException {
checkObsolete();
return valueBytesUnlocked();
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheValueBytes valueBytes(@Nullable GridCacheVersion ver)
+ @Nullable @Override public CacheObject valueBytes(@Nullable GridCacheVersion ver)
throws IgniteCheckedException, GridCacheEntryRemovedException {
CacheObject val = null;
- GridCacheValueBytes valBytes = GridCacheValueBytes.nil();
-// TODO IGNITE-51.
-// synchronized (this) {
-// checkObsolete();
-//
-// if (ver == null || this.ver.equals(ver)) {
-// val = this.val;
-// ver = this.ver;
-// valBytes = valueBytesUnlocked();
-//
-// if (valBytes.isNull() && cctx.offheapTiered() && valPtr != 0)
-// valBytes = offheapValueBytes();
-// }
-// else
-// ver = null;
-// }
-//
-// if (valBytes.isNull()) {
-// if (val != null)
-// valBytes = (val instanceof byte[]) ? GridCacheValueBytes.plain(val) :
-// GridCacheValueBytes.marshaled(CU.marshal(cctx.shared(), val));
-//
-// if (ver != null && !isOffHeapValuesOnly()) {
-// synchronized (this) {
-// checkObsolete();
-//
-// if (this.val == val)
-// this.valBytes = isStoreValueBytes() ? valBytes.getIfMarshaled() : null;
-// }
-// }
-// }
+ synchronized (this) {
+ checkObsolete();
+
+ if (ver == null || this.ver.equals(ver))
+ val = valueBytesUnlocked();
+ }
- return valBytes;
+ return val;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 012d393..ef04a8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -678,7 +678,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
return null;
return read(entry.key(),
- entry.getOrMarshalKeyBytes(),
+ entry.key().valueBytes(cctx),
entry.partition(),
locked,
readOffheap,
@@ -698,8 +698,6 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- byte[] keyBytes = entry.getOrMarshalKeyBytes();
-
IgniteBiTuple<Long, Integer> ptr = offheap.valuePointer(spaceName, part, key, key.valueBytes(cctx));
if (ptr != null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 95b9095..028ab12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -67,12 +67,14 @@ public class GridCacheTtlUpdateRequest extends GridCacheMessage {
}
/**
+ * @param cacheId Cache ID.
* @param topVer Topology version.
* @param ttl TTL.
*/
- public GridCacheTtlUpdateRequest(long topVer, long ttl) {
+ public GridCacheTtlUpdateRequest(int cacheId, long topVer, long ttl) {
assert ttl >= 0 || ttl == CU.TTL_ZERO : ttl;
+ this.cacheId = cacheId;
this.topVer = topVer;
this.ttl = ttl;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 3ce5cd3..c787261 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -281,7 +281,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
log.debug("Replacing obsolete entry in remote transaction [entry=" + entry + ", tx=" + this + ']');
// Replace the entry.
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
}
}
}
@@ -327,13 +327,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
IgniteTxEntry rmv = readMap.remove(e.txKey());
if (rmv != null) {
- e.cached(rmv.cached(), null);
+ e.cached(rmv.cached());
writeMap.put(e.txKey(), e);
}
// If lock is explicit.
else {
- e.cached(e.context().cache().entryEx(e.key()), null);
+ e.cached(e.context().cache().entryEx(e.key()));
// explicit lock.
writeMap.put(e.txKey(), e);
@@ -456,7 +456,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
if (log.isDebugEnabled())
log.debug("Got removed entry while committing (will retry): " + txEntry);
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
}
}
}
@@ -484,7 +484,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
GridCacheEntryEx cached = txEntry.cached();
if (cached == null)
- txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()), null);
+ txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key()));
if (near() && cacheCtx.dr().receiveEnabled()) {
cached.markObsolete(xidVer);
@@ -563,19 +563,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
// Keep near entry up to date.
if (nearCached != null) {
- CacheObject val0 = null;
-
- GridCacheValueBytes valBytesTuple = cached.valueBytes();
-
- if (!valBytesTuple.isNull()) {
-// TODO IGNITE-51.
-// if (valBytesTuple.isPlain())
-// val0 = (V)valBytesTuple.get();
-// else
-// valBytes0 = valBytesTuple.get();
- }
- else
- val0 = cached.rawGet();
+ CacheObject val0 = cached.valueBytes();
nearCached.updateOrEvict(xidVer,
val0,
@@ -621,19 +609,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
cached.updateTtl(null, txEntry.ttl());
if (nearCached != null) {
- CacheObject val0 = null;
-
- GridCacheValueBytes valBytesTuple = cached.valueBytes();
-
- if (!valBytesTuple.isNull()) {
-// TODO IGNITE-51.
-// if (valBytesTuple.isPlain())
-// val0 = (V)valBytesTuple.get();
-// else
-// valBytes0 = valBytesTuple.get();
- }
- else
- val0 = cached.rawGet();
+ CacheObject val0 = cached.valueBytes();
nearCached.updateOrEvict(xidVer,
val0,
@@ -662,7 +638,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
log.debug("Attempting to commit a removed entry (will retry): " + txEntry);
// Renew cached entry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5febfcc..bb7d308 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -695,10 +695,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
GridCacheTtlUpdateRequest req = reqMap.get(node);
if (req == null) {
- reqMap.put(node,
- req = new GridCacheTtlUpdateRequest(topVer, expiryPlc.forAccess()));
-
- req.cacheId(ctx.cacheId());
+ reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+ topVer,
+ expiryPlc.forAccess()));
}
req.addEntry(e.getKey(), e.getValue());
@@ -718,10 +717,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
GridCacheTtlUpdateRequest req = reqMap.get(node);
if (req == null) {
- reqMap.put(node, req = new GridCacheTtlUpdateRequest(topVer,
+ reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(),
+ topVer,
expiryPlc.forAccess()));
-
- req.cacheId(ctx.cacheId());
}
for (IgniteBiTuple<KeyCacheObject, GridCacheVersion> t : e.getValue())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
index 6e4eac8..15648fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java
@@ -311,16 +311,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry {
if (isNew() || !valid(-1) || deletedUnlocked())
return null;
else {
- CacheObject val0 = val;
-
- if (val0 == null && valPtr != 0) {
- IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
- if (t.get2())
- val0 = cctx.toCacheObject(t.get1(), null);
- else
- val0 = cctx.toCacheObject(null, t.get1());
- }
+ CacheObject val0 = valueBytesUnlocked();
return F.t(ver, val0, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index e9674c8..8eb0809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -419,12 +419,12 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
GridCacheEntryInfo info = it.next();
- CacheObject v = map.get(info.key());
+ Object v = map.get(info.key());
if (v == null)
it.remove();
- else
- info.value(v);
+ else if (!skipVals)
+ info.value((CacheObject)v);
}
return infos;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index 056c3ad..6fcc7f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -906,7 +906,7 @@ public final class GridDhtLockFuture<K, V> extends GridCompoundIdentityFuture<Bo
assert added.dhtLocal();
if (added.ownerVersion() != null)
- req.owned(e.key(), e.getOrMarshalKeyBytes(), added.ownerVersion());
+ req.owned(e.key(), added.ownerVersion());
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index 4f54f47..87c786d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -260,7 +260,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest {
* @param keyBytes Key bytes.
* @param ownerMapped Owner mapped version.
*/
- public void owned(KeyCacheObject key, byte[] keyBytes, GridCacheVersion ownerMapped) {
+ public void owned(KeyCacheObject key, GridCacheVersion ownerMapped) {
if (owned == null)
owned = new GridLeanMap<>(3);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
index d2b7d36..060b02c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java
@@ -209,7 +209,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
if (e.cached().obsolete()) {
GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
- e.cached(cached, cached.keyBytes());
+ e.cached(cached);
}
if (e.cached().detached() || e.cached().isLocal())
@@ -234,7 +234,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
catch (GridCacheEntryRemovedException ignore) {
GridCacheEntryEx cached = cacheCtx.cache().entryEx(e.key());
- e.cached(cached, cached.keyBytes());
+ e.cached(cached);
}
}
}
@@ -462,7 +462,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
GridDhtCacheEntry cached = dhtCache.entryExx(entry.key(), topologyVersion());
- entry.cached(cached, null);
+ entry.cached(cached);
GridCacheVersion explicit = entry.explicitVersion();
@@ -562,7 +562,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
if (read)
txEntry.ttl(accessTtl);
- txEntry.cached(cached, null);
+ txEntry.cached(cached);
addReader(msgId, cached, txEntry, topVer);
}
@@ -710,7 +710,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter {
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
- txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()), null);
+ txEntry.cached(txEntry.context().dht().entryExx(key.key(), topologyVersion()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 7f9022a..11101fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -429,7 +429,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
if (entry == null) {
entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
- txEntry.cached(entry, null);
+ txEntry.cached(entry);
}
if (tx.optimistic() && txEntry.explicitVersion() == null) {
@@ -455,7 +455,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
entry = (GridDistributedCacheEntry)cacheCtx.cache().entryEx(txEntry.key());
- txEntry.cached(entry, null);
+ txEntry.cached(entry);
}
}
}
@@ -625,7 +625,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridCacheVersion dhtVer = entry.version();
- CacheObject val0 = entry.rawGet();
+ CacheObject val0 = entry.valueBytes();
if (val0 != null)
res.addOwnedValue(txEntry.txKey(), dhtVer, val0);
@@ -634,7 +634,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
}
}
}
@@ -655,7 +655,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
GridCacheVersion dhtVer = entry.version();
if (ver.getValue() == null || !ver.getValue().equals(dhtVer)) {
- CacheObject val0 = entry.rawGet();
+ CacheObject val0 = entry.valueBytes();
res.addOwnedValue(txEntry.txKey(), dhtVer, val0);
}
@@ -664,7 +664,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
}
catch (GridCacheEntryRemovedException ignored) {
// Retry.
- txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()), null);
+ txEntry.cached(cacheCtx.cache().entryEx(txEntry.key()));
}
}
}
@@ -1015,7 +1015,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
catch (GridCacheEntryRemovedException ignore) {
cached = dht.entryExx(entry.key());
- entry.cached(cached, cached.keyBytes());
+ entry.cached(cached);
}
}
@@ -1195,7 +1195,7 @@ public final class GridDhtTxPrepareFuture<K, V> extends GridCompoundIdentityFutu
if (e == null)
break;
- entry.cached(e, null);
+ entry.cached(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
index 60c9c2f..cfec044 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java
@@ -167,16 +167,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (nearEvicted != null) {
- for (IgniteTxKey key : nearEvicted)
+ for (IgniteTxKey key : nearEvicted) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.prepareMarshal(cctx);
+ }
}
if (preloadEntries != null) {
- for (GridCacheEntryInfo info : preloadEntries)
+ for (GridCacheEntryInfo info : preloadEntries) {
+ GridCacheContext cctx = ctx.cacheContext(info.cacheId());
+
info.marshal(cctx);
+ }
}
}
@@ -184,16 +188,20 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse {
@Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
-
if (nearEvicted != null) {
- for (IgniteTxKey key : nearEvicted)
+ for (IgniteTxKey key : nearEvicted) {
+ GridCacheContext cctx = ctx.cacheContext(key.cacheId());
+
key.finishUnmarshal(cctx, ldr);
+ }
}
if (preloadEntries != null) {
- for (GridCacheEntryInfo info : preloadEntries)
+ for (GridCacheEntryInfo info : preloadEntries) {
+ GridCacheContext cctx = ctx.cacheContext(info.cacheId());
+
info.unmarshal(cctx, ldr);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
index 3f9ce8b..8dc91c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java
@@ -264,7 +264,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter {
checkInternal(entry.txKey());
// Initialize cache entry.
- entry.cached(cached, null);
+ entry.cached(cached);
writeMap.put(entry.txKey(), entry);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index adcaebf..2012252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -2032,7 +2032,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
else
res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE);
- if (writeVal != null || !entry.valueBytes().isNull()) {
+ if (writeVal != null || entry.hasValue()) {
IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer);
assert f == null : f;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index a9a26c6..567bf67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -327,12 +327,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
GridCacheReturn ret = (GridCacheReturn)res;
- if (op != TRANSFORM && ret != null) {
- CacheObject val = (CacheObject)ret.value();
-
- ret.value(CU.value(val, cctx, false));
- }
-
Object retval = res == null ? null : rawRetval ? ret : this.retval ? ret.value() : ret.success();
if (op == TRANSFORM && retval == null)
@@ -362,6 +356,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
return;
}
+ GridCacheReturn ret = res.returnValue();
+
+ if (op != TRANSFORM && ret != null) {
+ CacheObject val = (CacheObject)ret.value();
+
+ ret.value(CU.value(val, cctx, false));
+ }
+
Boolean single0 = single;
if (single0 != null && single0) {
@@ -374,13 +376,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
onDone(addFailedKeys(res.failedKeys(), res.error()));
else {
if (op == TRANSFORM) {
- if (res.returnValue() != null)
- addInvokeResults(res.returnValue());
+ if (ret != null)
+ addInvokeResults(ret);
onDone(opRes);
}
else {
- GridCacheReturn<?> opRes0 = opRes = res.returnValue();
+ GridCacheReturn<?> opRes0 = opRes = ret;
onDone(opRes0);
}
@@ -398,11 +400,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> implem
if (op == TRANSFORM) {
assert !req.fastMap();
- if (res.returnValue() != null)
- addInvokeResults(res.returnValue());
+ if (ret != null)
+ addInvokeResults(ret);
}
else if (req.fastMap() && req.hasPrimary())
- opRes = res.returnValue();
+ opRes = ret;
}
mappings.remove(nodeId);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 29e9730..441c2fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -270,7 +270,7 @@ public final class GridDhtColocatedLockFuture<K, V> extends GridCompoundIdentity
if (inTx()) {
IgniteTxEntry txEntry = tx.entry(entry.txKey());
- txEntry.cached(entry, null);
+ txEntry.cached(entry);
if (cand != null) {
if (!tx.implicit())
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
index b9602d9..154d99e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java
@@ -67,11 +67,8 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry {
}
/** {@inheritDoc} */
- @Override protected GridCacheValueBytes valueBytesUnlocked() {
- return null;
-// TODO IGNITE-51.
-// return (val != null && val instanceof byte[]) ? GridCacheValueBytes.plain(val) :
-// valBytes == null ? GridCacheValueBytes.nil() : GridCacheValueBytes.marshaled(valBytes);
+ @Override protected CacheObject valueBytesUnlocked() {
+ return val;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index b47d288..d06ca5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -273,16 +273,7 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry {
if (dhtVer == null)
return null;
else {
- CacheObject val0 = val;
-
- if (val0 == null && valPtr != 0) {
- IgniteBiTuple<byte[], Boolean> t = valueBytes0();
-
- if (t.get2())
- val0 = cctx.toCacheObject(t.get1(), null);
- else
- val0 = cctx.toCacheObject(null, t.get1());
- }
+ CacheObject val0 = valueBytesUnlocked();
return F.t(ver, val0, null);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 9da55ea..c855b47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -316,7 +316,7 @@ public final class GridNearLockFuture<K, V> extends GridCompoundIdentityFuture<B
if (inTx()) {
IgniteTxEntry txEntry = tx.entry(entry.txKey());
- txEntry.cached(entry, null);
+ txEntry.cached(entry);
}
if (c != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8d6800d..ef2899a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -582,7 +582,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
", tx=" + this + ']');
// Replace the entry.
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), entry.keyBytes());
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
}
}
}
@@ -1129,7 +1129,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (cached.obsoleteVersion() != null) {
cached = cacheCtx.colocated().entryExx(key.key(), topologyVersion(), true);
- txEntry.cached(cached, null);
+ txEntry.cached(cached);
}
return cached;
@@ -1156,7 +1156,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter {
if (cached.obsoleteVersion() != null) {
cached = cacheCtx.colocated().entryExx(key.key(), topVer, true);
- txEntry.cached(cached, null);
+ txEntry.cached(cached);
}
return cached;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
index 491a171..9bb3aa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFuture.java
@@ -778,11 +778,11 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
// Must re-initialize cached entry while holding topology lock.
if (cacheCtx.isNear())
- entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer), null);
+ entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer));
else if (!cacheCtx.isLocal())
- entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true), null);
+ entry.cached(cacheCtx.colocated().entryExx(entry.key(), topVer, true));
else
- entry.cached(cacheCtx.local().entryEx(entry.key(), topVer), null);
+ entry.cached(cacheCtx.local().entryEx(entry.key(), topVer));
if (cacheCtx.isNear() || cacheCtx.isLocal()) {
if (waitLock && entry.explicitVersion() == null) {
@@ -812,7 +812,7 @@ public final class GridNearTxPrepareFuture<K, V> extends GridCompoundIdentityFut
break;
}
catch (GridCacheEntryRemovedException ignore) {
- entry.cached(cacheCtx.near().entryEx(entry.key()), null);
+ entry.cached(cacheCtx.near().entryEx(entry.key()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
index 2b40e88..ccd4c8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java
@@ -299,7 +299,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter {
}
else {
// Initialize cache entry.
- entry.cached(cached, null);
+ entry.cached(cached);
writeMap.put(entry.txKey(), entry);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
index 619ce38..bc248aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalTxFuture.java
@@ -218,7 +218,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx>
if (log.isDebugEnabled())
log.debug("Got removed entry in checkLocks method (will retry): " + txEntry);
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
}
}
}
@@ -264,7 +264,7 @@ final class GridLocalTxFuture<K, V> extends GridFutureAdapter<IgniteInternalTx>
if (log.isDebugEnabled())
log.debug("Got removed entry in onOwnerChanged method (will retry): " + txEntry);
- txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()), null);
+ txEntry.cached(txEntry.context().cache().entryEx(txEntry.key()));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index d98b254..43aaec3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -18,144 +18,145 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.*;
+import org.apache.ignite.plugin.extensions.communication.*;
import org.jetbrains.annotations.*;
-import java.io.*;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheValueBytes.*;
+import javax.cache.event.*;
+import java.nio.*;
/**
* Continuous query entry.
*/
-class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externalizable {
+public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
+ /** */
+ private static final EventType[] EVT_TYPE_VALS = EventType.values();
+
+ /**
+ * @param ord Event type ordinal value.
+ * @return Event type.
+ */
+ @Nullable public static EventType eventTypeFromOrdinal(int ord) {
+ return ord >= 0 && ord < EVT_TYPE_VALS.length ? EVT_TYPE_VALS[ord] : null;
+ }
+
/** */
- private static final long serialVersionUID = 0L;
+ private EventType evtType;
/** Key. */
@GridToStringInclude
- private K key;
+ private KeyCacheObject key;
/** New value. */
@GridToStringInclude
- private V newVal;
+ private CacheObject newVal;
/** Old value. */
@GridToStringInclude
- private V oldVal;
-
- /** Serialized key. */
- @GridToStringExclude
- private byte[] keyBytes;
-
- /** Serialized value. */
- @GridToStringExclude
- private GridCacheValueBytes newValBytes;
-
- /** Serialized value. */
- @GridToStringExclude
- private GridCacheValueBytes oldValBytes;
+ private CacheObject oldVal;
/** Cache name. */
- private String cacheName;
+ private int cacheId;
/** Deployment info. */
@GridToStringExclude
+ @GridDirectTransient
private GridDeploymentInfo depInfo;
+ /**
+ * Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
+ */
public CacheContinuousQueryEntry() {
// No-op.
}
- CacheContinuousQueryEntry(K key, @Nullable V newVal, @Nullable GridCacheValueBytes newValBytes, @Nullable V oldVal,
- @Nullable GridCacheValueBytes oldValBytes) {
-
+ /**
+ * @param cacheId Cache ID.
+ * @param evtType Event type.
+ * @param key Key.
+ * @param newVal New value.
+ * @param oldVal Old value.
+ */
+ CacheContinuousQueryEntry(
+ int cacheId,
+ EventType evtType,
+ KeyCacheObject key,
+ @Nullable CacheObject newVal,
+ @Nullable CacheObject oldVal) {
+ this.cacheId = cacheId;
+ this.evtType = evtType;
this.key = key;
this.newVal = newVal;
- this.newValBytes = newValBytes;
this.oldVal = oldVal;
- this.oldValBytes = oldValBytes;
}
/**
- * @param cacheName Cache name.
+ * @return Cache ID.
*/
- void cacheName(String cacheName) {
- this.cacheName = cacheName;
+ int cacheId() {
+ return cacheId;
}
/**
- * @return cache name.
+ * @return Event type.
*/
- String cacheName() {
- return cacheName;
+ EventType eventType() {
+ return evtType;
}
/**
- * @param marsh Marshaller.
+ * @param cctx Cache context.
* @throws IgniteCheckedException In case of error.
*/
- void p2pMarshal(Marshaller marsh) throws IgniteCheckedException {
- assert marsh != null;
-
+ void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
assert key != null;
- keyBytes = marsh.marshal(key);
+ key.prepareMarshal(cctx.cacheObjectContext());
- if (newValBytes == null || newValBytes.isNull())
- newValBytes = newVal != null ?
- newVal instanceof byte[] ? plain(newVal) : marshaled(marsh.marshal(newVal)) : null;
+ if (newVal != null)
+ newVal.prepareMarshal(cctx.cacheObjectContext());
- if (oldValBytes == null || oldValBytes.isNull())
- oldValBytes = oldVal != null ?
- oldVal instanceof byte[] ? plain(oldVal) : marshaled(marsh.marshal(oldVal)) : null;
+ if (oldVal != null)
+ oldVal.prepareMarshal(cctx.cacheObjectContext());
}
/**
- * @param marsh Marshaller.
+ * @param cctx Cache context.
* @param ldr Class loader.
* @throws IgniteCheckedException In case of error.
*/
- void p2pUnmarshal(Marshaller marsh, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- assert marsh != null;
+ void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ key.finishUnmarshal(cctx, ldr);
- assert key == null : "Key should be null: " + key;
- assert newVal == null : "New value should be null: " + newVal;
- assert oldVal == null : "Old value should be null: " + oldVal;
- assert keyBytes != null;
+ if (newVal != null)
+ newVal.finishUnmarshal(cctx, ldr);
- key = marsh.unmarshal(keyBytes, ldr);
-
- if (newValBytes != null && !newValBytes.isNull())
- newVal = newValBytes.isPlain() ? (V)newValBytes.get() : marsh.<V>unmarshal(newValBytes.get(), ldr);
-
- if (oldValBytes != null && !oldValBytes.isNull())
- oldVal = oldValBytes.isPlain() ? (V)oldValBytes.get() : marsh.<V>unmarshal(oldValBytes.get(), ldr);
+ if (oldVal != null)
+ oldVal.finishUnmarshal(cctx, ldr);
}
/**
* @return Key.
*/
- K key() {
+ KeyCacheObject key() {
return key;
}
/**
* @return New value.
*/
- V value() {
+ CacheObject value() {
return newVal;
}
/**
* @return Old value.
*/
- V oldValue() {
+ CacheObject oldValue() {
return oldVal;
}
@@ -170,62 +171,117 @@ class CacheContinuousQueryEntry<K, V> implements GridCacheDeployable, Externaliz
}
/** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- boolean b = keyBytes != null;
-
- out.writeBoolean(b);
-
- if (b) {
- U.writeByteArray(out, keyBytes);
-
- if (newValBytes != null && !newValBytes.isNull()) {
- out.writeBoolean(true);
- out.writeBoolean(newValBytes.isPlain());
- U.writeByteArray(out, newValBytes.get());
- }
- else
- out.writeBoolean(false);
-
- if (oldValBytes != null && !oldValBytes.isNull()) {
- out.writeBoolean(true);
- out.writeBoolean(oldValBytes.isPlain());
- U.writeByteArray(out, oldValBytes.get());
- }
- else
- out.writeBoolean(false);
-
- U.writeString(out, cacheName);
- out.writeObject(depInfo);
+ @Override public byte directType() {
+ return 96;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
}
- else {
- out.writeObject(key);
- out.writeObject(newVal);
- out.writeObject(oldVal);
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("cacheId", cacheId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeMessage("key", key))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeMessage("newVal", newVal))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMessage("oldVal", oldVal))
+ return false;
+
+ writer.incrementState();
+
}
+
+ return true;
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- boolean b = in.readBoolean();
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
- if (b) {
- keyBytes = U.readByteArray(in);
+ if (!reader.beforeMessageRead())
+ return false;
- if (in.readBoolean())
- newValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+ switch (reader.state()) {
+ case 0:
+ cacheId = reader.readInt("cacheId");
- if (in.readBoolean())
- oldValBytes = in.readBoolean() ? plain(U.readByteArray(in)) : marshaled(U.readByteArray(in));
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ byte evtTypeOrd;
+
+ evtTypeOrd = reader.readByte("evtType");
+
+ if (!reader.isLastRead())
+ return false;
+
+ evtType = eventTypeFromOrdinal(evtTypeOrd);
+
+ reader.incrementState();
+
+ case 2:
+ key = reader.readMessage("key");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ newVal = reader.readMessage("newVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ oldVal = reader.readMessage("oldVal");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
- cacheName = U.readString(in);
- depInfo = (GridDeploymentInfo)in.readObject();
- }
- else {
- key = (K)in.readObject();
- newVal = (V)in.readObject();
- oldVal = (V)in.readObject();
}
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index c90ae34..1bdadaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -27,43 +28,45 @@ import javax.cache.event.*;
* Continuous query event.
*/
class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
+ /** */
+ private final GridCacheContext cctx;
+
/** Entry. */
@GridToStringExclude
- private final CacheContinuousQueryEntry<K, V> e;
+ private final CacheContinuousQueryEntry e;
/**
- * @param source Source cache.
- * @param eventType Event type.
+ * @param src Source cache.
+ * @param cctx Cache context.
* @param e Entry.
*/
- CacheContinuousQueryEvent(Cache source, EventType eventType, CacheContinuousQueryEntry<K, V> e) {
- super(source, eventType);
-
- assert e != null;
+ CacheContinuousQueryEvent(Cache src, GridCacheContext cctx, CacheContinuousQueryEntry e) {
+ super(src, e.eventType());
+ this.cctx = cctx;
this.e = e;
}
/**
* @return Entry.
*/
- CacheContinuousQueryEntry<K, V> entry() {
+ CacheContinuousQueryEntry entry() {
return e;
}
/** {@inheritDoc} */
@Override public K getKey() {
- return e.key();
+ return e.key().value(cctx, false);
}
/** {@inheritDoc} */
@Override public V getValue() {
- return e.value();
+ return CU.value(e.value(), cctx, false);
}
/** {@inheritDoc} */
@Override public V getOldValue() {
- return e.oldValue();
+ return CU.value(e.oldValue(), cctx, false);
}
/** {@inheritDoc} */
@@ -81,7 +84,10 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(CacheContinuousQueryEvent.class, this, "key", e.key(), "newVal", e.value(), "oldVal",
- e.oldValue(), "cacheName", e.cacheName());
+ return S.toString(CacheContinuousQueryEvent.class, this,
+ "evtType", getEventType(),
+ "key", getKey(),
+ "newVal", getValue(),
+ "oldVal", getOldValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ea39d669/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 9502b3f..53a2cdb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -212,17 +212,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (ctx.config().isPeerClassLoadingEnabled() && node != null &&
U.hasCache(node, cacheName)) {
- evt.entry().p2pMarshal(ctx.config().getMarshaller());
-
- evt.entry().cacheName(cacheName);
+ evt.entry().prepareMarshal(cctx);
GridCacheDeploymentManager depMgr =
ctx.cache().internalCache(cacheName).context().deploy();
depMgr.prepare(evt.entry());
}
+ else
+ evt.entry().prepareMarshal(cctx);
- ctx.continuous().addNotification(nodeId, routineId, evt, topic, sync);
+ ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
}
catch (IgniteCheckedException ex) {
U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
@@ -302,46 +302,42 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
assert objs != null;
assert ctx != null;
- Collection<CacheEntryEvent<? extends K, ? extends V>> evts =
- (Collection<CacheEntryEvent<? extends K, ? extends V>>)objs;
-
- if (ctx.config().isPeerClassLoadingEnabled()) {
- for (CacheEntryEvent<? extends K, ? extends V> evt : evts) {
- assert evt instanceof CacheContinuousQueryEvent;
+ Collection<CacheContinuousQueryEntry> entries = (Collection<CacheContinuousQueryEntry>)objs;
- CacheContinuousQueryEntry<? extends K, ? extends V> e = ((CacheContinuousQueryEvent)evt).entry();
+ final GridCacheContext cctx = cacheContext(ctx);
- GridCacheAdapter cache = ctx.cache().internalCache(e.cacheName());
+ for (CacheContinuousQueryEntry e : entries) {
+ GridCacheDeploymentManager depMgr = cctx.deploy();
- ClassLoader ldr = null;
+ ClassLoader ldr = depMgr.globalLoader();
- if (cache != null) {
- GridCacheDeploymentManager depMgr = cache.context().deploy();
+ if (ctx.config().isPeerClassLoadingEnabled()) {
+ GridDeploymentInfo depInfo = e.deployInfo();
- GridDeploymentInfo depInfo = e.deployInfo();
-
- if (depInfo != null) {
- depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
- depInfo.participants(), depInfo.localDeploymentOwner());
- }
-
- ldr = depMgr.globalLoader();
- }
- else {
- U.warn(ctx.log(getClass()), "Received cache event for cache that is not configured locally " +
- "when peer class loading is enabled: " + e.cacheName() + ". Will try to unmarshal " +
- "with default class loader.");
+ if (depInfo != null) {
+ depMgr.p2pContext(nodeId, depInfo.classLoaderId(), depInfo.userVersion(), depInfo.deployMode(),
+ depInfo.participants(), depInfo.localDeploymentOwner());
}
+ }
- try {
- e.p2pUnmarshal(ctx.config().getMarshaller(), ldr);
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
- }
+ try {
+ e.unmarshal(cctx, ldr);
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to unmarshal entry.", ex);
}
}
+ final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+ new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+ @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
+ return new CacheContinuousQueryEvent<K, V>(cache, cctx, e);
+ };
+ }
+ );
+
locLsnr.onUpdated(evts);
}