You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/11/26 10:30:14 UTC
[05/24] ignite git commit: Fixing keepBinary for continuous queries.
Fixing keepBinary for continuous queries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01c24e7d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01c24e7d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01c24e7d
Branch: refs/heads/ignite-1.5
Commit: 01c24e7d07e15df8a9a4722c0ec2a9366cb2f669
Parents: eeb9142
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Mon Nov 23 21:40:52 2015 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Mon Nov 23 21:40:52 2015 +0300
----------------------------------------------------------------------
.../ignite/codegen/MessageCodeGenerator.java | 1 +
.../internal/GridEventConsumeHandler.java | 5 ++
.../internal/GridMessageListenHandler.java | 5 ++
.../processors/cache/IgniteCacheProxy.java | 14 +++--
.../continuous/CacheContinuousQueryEntry.java | 50 ++++++++++++-----
.../continuous/CacheContinuousQueryEvent.java | 6 +--
.../continuous/CacheContinuousQueryHandler.java | 33 +++++++++---
.../CacheContinuousQueryListener.java | 5 ++
.../continuous/CacheContinuousQueryManager.java | 57 ++++++++++++--------
.../continuous/GridContinuousHandler.java | 5 ++
.../continuous/GridContinuousProcessor.java | 10 +++-
.../StartRoutineDiscoveryMessage.java | 13 ++++-
12 files changed, 156 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a31415..8733bb3 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.IgniteCodeGeneratingFail;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 3918976..f4bbd6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -125,6 +125,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String cacheName() {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index aa837b8..9a2829b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -98,6 +98,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String cacheName() {
throw new IllegalStateException();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cb36432..2a52a1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -555,7 +555,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
* @return Initial iteration cursor.
*/
@SuppressWarnings("unchecked")
- private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc) {
+ private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) {
if (qry.getInitialQuery() instanceof ContinuousQuery)
throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
"continuous query. Use SCAN or SQL query for initial iteration.");
@@ -570,7 +570,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
qry.getPageSize(),
qry.getTimeInterval(),
qry.isAutoUnsubscribe(),
- loc);
+ loc,
+ keepBinary);
final QueryCursor<Cache.Entry<K, V>> cur =
qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
@@ -616,8 +617,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
validate(qry);
+ CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
if (qry instanceof ContinuousQuery)
- return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal());
+ return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(),
+ opCtxCall != null && opCtxCall.isKeepBinary());
if (qry instanceof SqlQuery) {
final SqlQuery p = (SqlQuery)qry;
@@ -1623,7 +1627,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
CacheOperationContext prev = onEnter(gate, opCtx);
try {
- ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null && opCtx.isKeepBinary());
}
catch (IgniteCheckedException e) {
throw cacheException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 0495e6d..4d3786a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -99,6 +99,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** Filtered events. */
private GridLongList filteredEvts;
+ /** Keep binary. */
+ private boolean keepBinary;
+
/**
* Required by {@link Message}.
*/
@@ -122,6 +125,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
KeyCacheObject key,
@Nullable CacheObject newVal,
@Nullable CacheObject oldVal,
+ boolean keepBinary,
int part,
long updateCntr,
@Nullable AffinityTopologyVersion topVer) {
@@ -133,6 +137,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
this.part = part;
this.updateCntr = updateCntr;
this.topVer = topVer;
+ this.keepBinary = keepBinary;
}
/**
@@ -203,6 +208,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return Keep binary flag.
+ */
+ boolean isKeepBinary() {
+ return keepBinary;
+ }
+
+ /**
* @param cntrs Filtered events.
*/
void filteredEvents(GridLongList cntrs) {
@@ -322,36 +334,42 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
case 4:
- if (!writer.writeMessage("key", key))
+ if (!writer.writeBoolean("keepBinary", keepBinary))
return false;
writer.incrementState();
case 5:
- if (!writer.writeMessage("newVal", newVal))
+ if (!writer.writeMessage("key", key))
return false;
writer.incrementState();
case 6:
- if (!writer.writeMessage("oldVal", oldVal))
+ if (!writer.writeMessage("newVal", newVal))
return false;
writer.incrementState();
case 7:
- if (!writer.writeInt("part", part))
+ if (!writer.writeMessage("oldVal", oldVal))
return false;
writer.incrementState();
case 8:
- if (!writer.writeMessage("topVer", topVer))
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
@@ -407,7 +425,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 4:
- key = reader.readMessage("key");
+ keepBinary = reader.readBoolean("keepBinary");
if (!reader.isLastRead())
return false;
@@ -415,7 +433,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 5:
- newVal = reader.readMessage("newVal");
+ key = reader.readMessage("key");
if (!reader.isLastRead())
return false;
@@ -423,7 +441,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 6:
- oldVal = reader.readMessage("oldVal");
+ newVal = reader.readMessage("newVal");
if (!reader.isLastRead())
return false;
@@ -431,7 +449,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 7:
- part = reader.readInt("part");
+ oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
return false;
@@ -439,7 +457,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 8:
- topVer = reader.readMessage("topVer");
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
@@ -447,6 +465,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -461,11 +487,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 10;
+ return 11;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheContinuousQueryEntry.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26be5f..f665339 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -59,18 +59,18 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K, V> {
/** {@inheritDoc} */
@Override
public K getKey() {
- return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), true, false);
+ return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), e.isKeepBinary(), false);
}
/** {@inheritDoc} */
@Override public V getValue() {
- return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), true, false);
+ return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), e.isKeepBinary(), false);
}
/** {@inheritDoc} */
@Override
public V getOldValue() {
- return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), true, false);
+ return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), e.isKeepBinary(), false);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b69d4cd..aa9bea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -81,7 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
/**
* Continuous query handler.
*/
-class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
+public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private static final long serialVersionUID = 0L;
@@ -128,7 +128,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient Collection<CacheContinuousQueryEntry> backupQueue;
/** */
- private boolean localCache;
+ private boolean locCache;
+
+ /** */
+ private transient boolean keepBinary;
/** */
private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
@@ -180,7 +183,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
boolean ignoreExpired,
int taskHash,
boolean skipPrimaryCheck,
- boolean locCache) {
+ boolean locCache,
+ boolean keepBinary) {
assert topic != null;
assert locLsnr != null;
@@ -195,7 +199,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
- this.localCache = locCache;
+ this.locCache = locCache;
+ this.keepBinary = keepBinary;
cacheId = CU.cacheId(cacheName);
}
@@ -216,6 +221,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
+ }
+
+ /**
+ * @param keepBinary Keep binary flag.
+ */
+ public void keepBinary(boolean keepBinary) {
+ this.keepBinary = keepBinary;
+ }
+
+ /** {@inheritDoc} */
@Override public String cacheName() {
return cacheName;
}
@@ -284,6 +301,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
+ /** {@inheritDoc} */
+ @Override public boolean keepBinary() {
+ return keepBinary;
+ }
+
@Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
boolean recordIgniteEvt) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
@@ -317,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (primary || skipPrimaryCheck) {
if (loc) {
- if (!localCache) {
+ if (!locCache) {
Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx, entry);
if (!entries.isEmpty()) {
@@ -848,7 +870,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/**
* @param e Entry.
- * @param topVer Topology version.
* @return Continuous query entry.
*/
private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 8342acf..86abbef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -81,6 +81,11 @@ interface CacheContinuousQueryListener<K, V> {
public boolean oldValueRequired();
/**
+ * @return Keep binary flag.
+ */
+ public boolean keepBinary();
+
+ /**
* @return Whether to notify on existing entries.
*/
public boolean notifyExisting();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b2e7490..0e4cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -135,7 +135,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
if (cfgs != null) {
for (CacheEntryListenerConfiguration cfg : cfgs)
- executeJCacheQuery(cfg, true);
+ executeJCacheQuery(cfg, true, false);
}
}
@@ -161,21 +161,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
*/
public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
if (lsnrCnt.get() > 0) {
- CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
- cctx.cacheId(),
- UPDATED,
- key,
- null,
- null,
- partId,
- updCntr,
- topVer);
+ for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ UPDATED,
+ key,
+ null,
+ null,
+ lsnr.keepBinary(),
+ partId,
+ updCntr,
+ topVer);
- CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
- for (CacheContinuousQueryListener lsnr : lsnrs.values())
lsnr.skipUpdateEvent(evt, topVer);
+ }
}
}
@@ -253,6 +255,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
key,
newVal,
lsnr.oldValueRequired() ? oldVal : null,
+ lsnr.keepBinary(),
partId,
updateCntr,
topVer);
@@ -306,6 +309,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
key,
null,
lsnr.oldValueRequired() ? oldVal : null,
+ lsnr.keepBinary(),
e.partition(),
-1,
null);
@@ -333,7 +337,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
int bufSize,
long timeInterval,
boolean autoUnsubscribe,
- boolean loc) throws IgniteCheckedException
+ boolean loc,
+ boolean keepBinary) throws IgniteCheckedException
{
return executeQuery0(
locLsnr,
@@ -346,7 +351,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
true,
false,
true,
- loc);
+ loc,
+ keepBinary);
}
/**
@@ -374,7 +380,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
true,
false,
true,
- loc);
+ loc,
+ false);
}
/**
@@ -395,9 +402,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param onStart Whether listener is created on node start.
* @throws IgniteCheckedException If failed.
*/
- public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart)
+ public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary)
throws IgniteCheckedException {
- JCacheQuery lsnr = new JCacheQuery(cfg, onStart);
+ JCacheQuery lsnr = new JCacheQuery(cfg, onStart, keepBinary);
JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr);
@@ -471,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean oldValRequired,
boolean sync,
boolean ignoreExpired,
- boolean loc) throws IgniteCheckedException
+ boolean loc,
+ final boolean keepBinary) throws IgniteCheckedException
{
cctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -492,7 +500,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
ignoreExpired,
taskNameHash,
skipPrimaryCheck,
- cctx.isLocal());
+ cctx.isLocal(),
+ keepBinary);
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -550,6 +559,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
e.key(),
e.rawGet(),
null,
+ keepBinary,
0,
-1,
null);
@@ -633,15 +643,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
private final boolean onStart;
/** */
+ private final boolean keepBinary;
+
+ /** */
private volatile UUID routineId;
/**
* @param cfg Listener configuration.
* @param onStart {@code True} if executed on cache start.
*/
- private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart) {
+ private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean keepBinary) {
this.cfg = cfg;
this.onStart = onStart;
+ this.keepBinary = keepBinary;
}
/**
@@ -694,7 +708,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
cfg.isOldValueRequired(),
cfg.isSynchronous(),
false,
- false);
+ false,
+ keepBinary);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index d8698b3..68b83ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -143,6 +143,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public boolean isForQuery();
/**
+ * @return {@code True} if Ignite Binary objects should be passed to the listener and filter.
+ */
+ public boolean keepBinary();
+
+ /**
* @return Cache name if this is a continuous query handler.
*/
public String cacheName();
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index e218790..f473d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -612,7 +613,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd, bufSize, interval, autoUnsubscribe, true))
hnd.onListenerRegistered(routineId, ctx);
- ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
+ ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
+ reqData.handler().keepBinary()));
}
catch (IgniteCheckedException e) {
startFuts.remove(routineId);
@@ -822,6 +824,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridContinuousHandler hnd = data.handler();
+ if (req.keepBinary()) {
+ assert hnd instanceof CacheContinuousQueryHandler;
+
+ ((CacheContinuousQueryHandler)hnd).keepBinary(true);
+ }
+
IgniteCheckedException err = null;
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 82c0377..ff037d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,14 +40,18 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private Map<Integer, Long> updateCntrs;
+ /** Keep binary flag. */
+ private boolean keepBinary;
+
/**
* @param routineId Routine id.
* @param startReqData Start request data.
*/
- public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
+ public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, boolean keepBinary) {
super(routineId);
this.startReqData = startReqData;
+ this.keepBinary = keepBinary;
}
/**
@@ -88,6 +92,13 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
return errs;
}
+ /**
+ * @return {@code True} if keep binary flag was set on continuous handler.
+ */
+ public boolean keepBinary() {
+ return keepBinary;
+ }
+
/** {@inheritDoc} */
@Override public boolean isMutable() {
return true;