You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/28 14:15:26 UTC
[50/50] [abbrv] ignite git commit: IGNITE-426 Implemented review
notes.
IGNITE-426 Implemented review notes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c59f761
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c59f761
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c59f761
Branch: refs/heads/ignite-426-2-reb
Commit: 6c59f76170932b69552f8744369187f88227dfd0
Parents: ecc3216
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Oct 28 15:07:31 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Wed Oct 28 16:13:30 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 6 +
.../internal/GridMessageListenHandler.java | 6 +
.../processors/cache/GridCacheMapEntry.java | 13 +-
.../cache/GridCacheUpdateAtomicResult.java | 1 -
.../dht/GridDhtPartitionTopologyImpl.java | 3 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 4 +-
.../dht/atomic/GridDhtAtomicCache.java | 6 +-
.../distributed/near/GridNearAtomicCache.java | 2 +-
.../CacheContinuousQueryBatchAck.java | 11 +-
.../continuous/CacheContinuousQueryEntry.java | 67 +++++-
.../continuous/CacheContinuousQueryHandler.java | 220 ++++++++++++++++---
.../continuous/CacheContinuousQueryManager.java | 1 -
.../cache/transactions/IgniteTxEntry.java | 16 +-
.../continuous/GridContinuousHandler.java | 6 +
.../continuous/GridContinuousProcessor.java | 16 +-
.../StartRoutineAckDiscoveryMessage.java | 12 +-
.../StartRoutineDiscoveryMessage.java | 18 +-
...acheContinuousQueryFailoverAbstractTest.java | 67 +++++-
18 files changed, 387 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index dc3842b..fc65b55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -129,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateIdx(Map<Integer, Long> idx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index bddebba..7711843 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
@@ -102,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateIdx(Map<Integer, Long> idx) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException {
ctx.io().addUserMessageListener(topic, pred);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index e842f61..12f9290 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1768,12 +1768,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject oldVal;
CacheObject updated;
- if (!primary) {
- int z = 0;
-
- ++z;
- }
-
GridCacheVersion enqueueVer = null;
GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1990,7 +1984,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
@Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
try {
cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0,
- prevVal0, primary0, false, updateIdx00, topVer0);
+ prevVal0, primary0, false, updateIdx00, topVer0);
}
catch (IgniteCheckedException e) {
// No-op.
@@ -2412,12 +2406,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
final CacheObject oldVal0 = oldVal;
final AffinityTopologyVersion topVer0 = topVer;
final long updateIdx00 = updateIdx0;
+ final CacheObject val0 = val;
contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) {
try {
- cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0,
- false, updateIdx00, topVer0);
+ cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val0, oldVal0,
+ primary0, false, updateIdx00, topVer0);
}
catch (IgniteCheckedException e) {
// No-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 9e2aca6..397024b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.cache;
import javax.cache.processor.EntryProcessor;
-
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index a210a29..d30cc88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -964,8 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts,
- @Nullable Map<Integer, Long> cntrMap) {
+ GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
index 18ac921..de6326e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java
@@ -191,6 +191,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
* @param subjId Subject ID.
* @param taskNameHash Task name hash.
* @param updateIdxs Partition update idxs.
+ * @param addDepInfo Deployment info flag.
*/
public GridDhtTxFinishRequest(
UUID nearNodeId,
@@ -215,11 +216,12 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest {
int txSize,
@Nullable UUID subjId,
int taskNameHash,
+ boolean addDepInfo,
Collection<Long> updateIdxs
) {
this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc,
sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize,
- subjId, taskNameHash);
+ subjId, taskNameHash, addDepInfo);
if (updateIdxs != null && !updateIdxs.isEmpty()) {
partUpdateCnt = new GridLongList(updateIdxs.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index d26ad97..5d64648 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1804,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> f) {
if (f.isDone() && f.error() == null)
- updRes.contQryNtfy().apply(f);
+ updRes.contQryNtfy().apply(f);
}
});
}
@@ -2557,7 +2557,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
+ /*check version*/!req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
replicate ? DR_BACKUP : DR_NONE,
@@ -2614,7 +2614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
}
catch (ClusterTopologyCheckedException ignored) {
U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " +
- nodeId);
+ req.nodeId());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId +
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 4f2caa1..706655b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -353,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> {
/*event*/true,
/*metrics*/true,
/*primary*/false,
- /*check version*/op != TRANSFORM || !req.forceTransformBackups(),
+ /*check version*/!req.forceTransformBackups(),
req.topologyVersion(),
CU.empty0(),
DR_NONE,
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
index 1e9a848..f89c466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -97,7 +97,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
writer.incrementState();
case 4:
- if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+ if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT,
+ MessageCollectionItemType.LONG))
return false;
writer.incrementState();
@@ -127,7 +128,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
reader.incrementState();
case 4:
- updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+ updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG,
+ false);
if (!reader.isLastRead())
return false;
@@ -140,6 +142,11 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage {
}
/** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public byte directType() {
return 114;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 896751e..939f7a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -93,9 +94,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** */
@GridToStringInclude
- @GridDirectTransient
private AffinityTopologyVersion topVer;
+ /** Filtered events. */
+ private GridLongList filteredEvts;
+
/**
* Required by {@link Message}.
*/
@@ -179,6 +182,10 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
*/
void markFiltered() {
flags |= FILTERED_ENTRY;
+ newVal = null;
+ oldVal = null;
+ key = null;
+ depInfo = null;
}
/**
@@ -191,11 +198,25 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/**
* @return {@code True} if entry was filtered.
*/
- boolean filtered() {
+ boolean isFiltered() {
return (flags & FILTERED_ENTRY) != 0;
}
/**
+ * @param idxs Filtered indexes.
+ */
+ void filteredEvents(GridLongList idxs) {
+ filteredEvts = idxs;
+ }
+
+ /**
+ * @return previous filtered events.
+ */
+ long[] filteredEvents() {
+ return filteredEvts == null ? null : filteredEvts.array();
+ }
+
+ /**
* @param cctx Cache context.
* @throws IgniteCheckedException In case of error.
*/
@@ -217,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @throws IgniteCheckedException In case of error.
*/
void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (!isFiltered()) {
+ key.finishUnmarshal(cctx.cacheObjectContext(), ldr);
- if (newVal != null)
- newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (newVal != null)
+ newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
- if (oldVal != null)
- oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ if (oldVal != null)
+ oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr);
+ }
}
/**
@@ -322,6 +345,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
+ case 8:
+ if (!writer.writeMessage("filteredEvts", filteredEvts))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeMessage("topVer", topVer))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -403,6 +438,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
+ case 8:
+ filteredEvts = reader.readMessage("filteredEvts");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ topVer = reader.readMessage("topVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -410,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 8;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index bd44180..8da7ed2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
@@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -130,11 +133,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
/** */
+ private transient ConcurrentMap<Integer, HoleBuffer> snds = new ConcurrentHashMap<>();
+
+ /** */
private transient AcknowledgeBuffer ackBuf;
/** */
private transient int cacheId;
+ /** */
+ private Map<Integer, Long> initUpdIdx;
+
/**
* Required by {@link Externalizable}.
*/
@@ -187,8 +196,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.skipPrimaryCheck = skipPrimaryCheck;
this.localCache = locCache;
- rcvs = new ConcurrentHashMap<>();
-
cacheId = CU.cacheId(cacheName);
}
@@ -213,6 +220,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public void updateIdx(Map<Integer, Long> idx) {
+ this.initUpdIdx = idx;
+ }
+
+ /** {@inheritDoc} */
@Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx)
throws IgniteCheckedException {
assert nodeId != null;
@@ -229,6 +241,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
ackBuf = new AcknowledgeBuffer();
+ rcvs = new ConcurrentHashMap<>();
+
final boolean loc = nodeId.equals(ctx.localNodeId());
assert !skipPrimaryCheck || loc;
@@ -253,8 +267,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
- @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
- boolean primary,
+ @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
boolean recordIgniteEvt) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return;
@@ -288,7 +301,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (primary || skipPrimaryCheck) {
if (loc) {
if (!localCache) {
- Collection<CacheContinuousQueryEntry> entries = handleEntry(ctx, entry);
+ Collection<CacheContinuousQueryEntry> entries = clientHandleEvent(ctx, entry);
if (!entries.isEmpty()) {
final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
@@ -302,7 +315,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
},
new IgnitePredicate<CacheContinuousQueryEntry>() {
@Override public boolean apply(CacheContinuousQueryEntry entry) {
- return !entry.filtered();
+ return !entry.isFiltered();
}
}
);
@@ -314,14 +327,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
else {
- if (!entry.filtered())
+ if (!entry.isFiltered())
locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
}
}
else {
- prepareEntry(cctx, nodeId, entry);
+ if (!entry.isFiltered())
+ prepareEntry(cctx, nodeId, entry);
- ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+ CacheContinuousQueryEntry e = handleEntry(entry);
+
+ if (e != null)
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
}
else {
@@ -388,8 +405,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
try {
GridCacheContext<K, V> cctx = cacheContext(ctx);
- for (CacheContinuousQueryEntry e : backupQueue)
- prepareEntry(cctx, nodeId, e);
+ for (CacheContinuousQueryEntry e : backupQueue) {
+ if (!e.isFiltered())
+ prepareEntry(cctx, nodeId, e);
+ }
ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
@@ -514,7 +533,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
for (CacheContinuousQueryEntry e : entries)
- entries0.addAll(handleEntry(ctx, e));
+ entries0.addAll(clientHandleEvent(ctx, e));
Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
@@ -524,7 +543,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
},
new IgnitePredicate<CacheContinuousQueryEntry>() {
@Override public boolean apply(CacheContinuousQueryEntry entry) {
- return !entry.filtered();
+ return !entry.isFiltered();
}
}
);
@@ -537,7 +556,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @param e entry.
* @return Entry collection.
*/
- private Collection<CacheContinuousQueryEntry> handleEntry(GridKernalContext ctx, CacheContinuousQueryEntry e) {
+ private Collection<CacheContinuousQueryEntry> clientHandleEvent(GridKernalContext ctx,
+ CacheContinuousQueryEntry e) {
assert e != null;
// Initial query entry or evicted entry.
@@ -548,7 +568,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
PartitionRecovery rec = rcvs.get(e.partition());
if (rec == null) {
- rec = new PartitionRecovery(ctx.log(getClass()));
+ rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx), initUpdIdx.get(e.partition()));
PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
@@ -560,26 +580,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
+ * @param e Entry.
+ * @return Entry.
+ */
+ private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) {
+ assert e != null;
+ assert snds != null;
+
+ // Initial query entry.
+ // This events should be fired immediately.
+ if (e.updateIndex() == -1)
+ return e;
+
+ HoleBuffer buf = snds.get(e.partition());
+
+ if (buf == null) {
+ buf = new HoleBuffer();
+
+ HoleBuffer oldRec = snds.putIfAbsent(e.partition(), buf);
+
+ if (oldRec != null)
+ buf = oldRec;
+ }
+
+ return buf.handle(e);
+ }
+
+ /**
*
*/
private static class PartitionRecovery {
+ /** Event which means hole in sequence. */
+ private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry();
+
/** */
private IgniteLogger log;
/** */
- private static final long INIT_VALUE = -100;
+ private GridCacheContext cctx;
+
+ /** */
+ private long lastFiredEvt;
/** */
- private long lastFiredEvt = INIT_VALUE;
+ private AffinityTopologyVersion curTop;
/** */
- private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
+ private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>();
/**
* @param log Logger.
*/
- public PartitionRecovery(IgniteLogger log) {
+ public PartitionRecovery(IgniteLogger log, GridCacheContext cctx, Long initIdx) {
this.log = log;
+ this.cctx = cctx;
+
+ if (initIdx != null) {
+ this.lastFiredEvt = initIdx;
+ this.curTop = cctx.topology().topologyVersion();
+ }
}
/**
@@ -593,26 +652,55 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
List<CacheContinuousQueryEntry> entries;
- synchronized (pendingEnts) {
+ synchronized (pendingEvts) {
// Received first event.
- if (lastFiredEvt == INIT_VALUE) {
+ if (curTop == null) {
lastFiredEvt = entry.updateIndex();
+ curTop = entry.topologyVersion();
+
return F.asList(entry);
}
- // Handle case when nodes owning partition left from topology.
- if (entry.updateIndex() == 1 && !entry.isBackup()) {
- pendingEnts.clear();
+ if (curTop.compareTo(entry.topologyVersion()) < 0) {
+ GridCacheAffinityManager aff = cctx.affinity();
- lastFiredEvt = 1;
+ if (cctx.affinity().backups(entry.partition(), entry.topologyVersion()).isEmpty() &&
+ !aff.primary(entry.partition(), curTop).id().equals(aff.primary(entry.partition(),
+ entry.topologyVersion()).id())) {
+ entries = new ArrayList<>(pendingEvts.size());
- return F.asList(entry);
+ for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
+ if (evt != HOLE && !evt.isFiltered())
+ entries.add(evt);
+ }
+
+ pendingEvts.clear();
+
+ curTop = entry.topologyVersion();
+
+ lastFiredEvt = entry.updateIndex();
+
+ entries.add(entry);
+
+ return entries;
+ }
+
+ curTop = entry.topologyVersion();
}
// Check duplicate.
- if (entry.updateIndex() > lastFiredEvt)
- pendingEnts.put(entry.updateIndex(), entry);
+ if (entry.updateIndex() > lastFiredEvt) {
+ pendingEvts.put(entry.updateIndex(), entry);
+
+ // Put filtered events.
+ if (entry.filteredEvents() != null) {
+ for (long idx : entry.filteredEvents()) {
+ if (idx > lastFiredEvt)
+ pendingEvts.put(idx, HOLE);
+ }
+ }
+ }
else {
if (log.isDebugEnabled())
log.debug("Skip duplicate continuous query message: " + entry);
@@ -620,10 +708,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return Collections.emptyList();
}
- if (pendingEnts.isEmpty())
+ if (pendingEvts.isEmpty())
return Collections.emptyList();
- Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator();
entries = new ArrayList<>();
@@ -634,10 +722,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (e.getKey() == lastFiredEvt + 1) {
++lastFiredEvt;
- entries.add(e.getValue());
+ if (e.getValue() != HOLE && !e.getValue().isFiltered())
+ entries.add(e.getValue());
iter.remove();
}
+ else
+ break;
}
}
@@ -645,6 +736,73 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
+ /**
+ *
+ */
+ private static class HoleBuffer {
+ /** */
+ private final TreeSet<Long> buf = new TreeSet<>();
+
+ /** */
+ private long lastFiredEvt;
+
+ /**
+ * Add continuous entry.
+ *
+ * @param e Cache continuous query entry.
+ * @return Collection entries which will be fired.
+ */
+ public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
+ assert e != null;
+
+ synchronized (buf) {
+ // Handle filtered events.
+ if (e.isFiltered()) {
+ if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1)
+ return e;
+
+ buf.add(e.updateIndex());
+
+ return null;
+ }
+ else {
+ if (lastFiredEvt < e.updateIndex())
+ lastFiredEvt = e.updateIndex();
+
+ // Doesn't have filtered and delayed events.
+ if (buf.isEmpty() || buf.first() > e.updateIndex())
+ return e;
+ else {
+ GridLongList filteredEvts = new GridLongList(buf.size());
+ int size = 0;
+
+ Iterator<Long> iter = buf.iterator();
+
+ while (iter.hasNext()) {
+ long idx = iter.next();
+
+ if (idx < e.updateIndex()) {
+ filteredEvts.add(idx);
+
+ iter.remove();
+
+ ++size;
+ }
+ else
+ break;
+ }
+
+ filteredEvts.truncate(size, true);
+
+ e.filteredEvents(filteredEvts);
+
+ return e;
+ }
+ }
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 14fe195..bdd009a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -256,7 +256,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
throws IgniteCheckedException {
assert e != null;
assert key != null;
- assert Thread.holdsLock(e) : e;
if (e.isInternal())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f5cf501..7d47b3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -182,6 +182,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
private byte flags;
/** Partition update index. */
+ @GridDirectTransient
private long partIdx;
/** */
@@ -953,11 +954,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
writer.incrementState();
- case 12:
- if (!writer.writeLong("partIdx", partIdx))
- return false;
-
- writer.incrementState();
}
return true;
@@ -1067,14 +1063,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
reader.incrementState();
- case 12:
- partIdx = reader.readLong("partIdx");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
}
return reader.afterMessageRead(IgniteTxEntry.class);
@@ -1087,7 +1075,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 13;
+ return 12;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 40fb12a..648ed7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous;
import java.io.Externalizable;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
@@ -145,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
* @return Cache name if this is a continuous query handler.
*/
public String cacheName();
+
+ /**
+ * @param idx Init state for partition indexies.
+ */
+ public void updateIdx(Map<Integer, Long> idx);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 3ed186e..c63a82f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -205,8 +205,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
StartFuture fut = startFuts.remove(msg.routineId());
if (fut != null) {
- if (msg.errs().isEmpty())
+ if (msg.errs().isEmpty()) {
+ LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+ if (routine != null)
+ routine.handler().updateIdx(msg.updateIdxs());
+
fut.onRemoteRegistered();
+ }
else {
IgniteCheckedException firstEx = F.first(msg.errs().values());
@@ -685,7 +691,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
*/
public void addNotification(UUID nodeId,
final UUID routineId,
- Object obj,
+ @Nullable Object obj,
@Nullable Object orderedTopic,
boolean sync,
boolean msg)
@@ -856,6 +862,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
+ if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
+ Map<Integer, Long> idx = ctx.cache().internalCache(hnd.cacheName()).context().topology().updateCounters();
+
+ req.addUpdateIdxs(idx);
+ }
+
if (err != null)
req.addError(ctx.localNodeId(), err);
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index bd4aae3..0b5cfaf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
/** */
private final Map<UUID, IgniteCheckedException> errs;
+ /** */
+ private final Map<Integer, Long> updateIdxs;
+
/**
* @param routineId Routine id.
* @param errs Errs.
*/
- public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+ public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
+ Map<Integer, Long> idx) {
super(routineId);
this.errs = new HashMap<>(errs);
+ this.updateIdxs = idx;
}
/** {@inheritDoc} */
@@ -50,6 +55,11 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
return null;
}
+ /** {@inheritDoc} */
+ public Map<Integer, Long> updateIdxs() {
+ return updateIdxs;
+ }
+
/**
* @return Errs.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 892adac..cfacde4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+ /** */
+ private final Map<Integer, Long> updateIdxes = new HashMap<>();
+
/**
* @param routineId Routine id.
* @param startReqData Start request data.
@@ -63,6 +66,19 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
}
/**
+ * @param idx Update indexes.
+ */
+ public void addUpdateIdxs(Map<Integer, Long> idx) {
+ for (Map.Entry<Integer, Long> e : idx.entrySet()) {
+ Long cntr0 = updateIdxes.get(e.getKey());
+ Long cntr1 = e.getValue();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ updateIdxes.put(e.getKey(), cntr1);
+ }
+ }
+
+ /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
@@ -76,7 +92,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
- return new StartRoutineAckDiscoveryMessage(routineId, errs);
+ return new StartRoutineAckDiscoveryMessage(routineId, errs, updateIdxes);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 90e21ad..0a95036 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -27,7 +27,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -86,10 +90,9 @@ import org.apache.ignite.transactions.Transaction;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
/**
*
@@ -172,7 +175,45 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @return Write order mode for atomic cache.
*/
protected CacheAtomicWriteOrderMode writeOrderMode() {
- return CLOCK;
+ return PRIMARY;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testFirstFilteredEvent() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClnCache = qryClient.cache(null);
+
+ final CacheEventListener3 lsnr = new CacheEventListener3();
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(new CacheEventFilter());
+
+ try (QueryCursor<?> cur = qryClnCache.query(qry)) {
+ List<Integer> keys = testKeys(grid(0).cache(null), 1);
+
+ for (Integer key : keys)
+ qryClnCache.put(key, -1);
+
+ qryClnCache.put(keys.get(0), 100);
+ }
+
+ assertEquals(lsnr.evts.size(), 1);
}
/**
@@ -1222,7 +1263,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
startGrid(idx);
- Thread.sleep(3000);
+ Thread.sleep(200);
log.info("Stop node: " + idx);
@@ -1435,7 +1476,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
startGrid(idx);
- Thread.sleep(3000);
+ Thread.sleep(200);
log.info("Stop node: " + idx);
@@ -1591,7 +1632,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @throws Exception If failed.
*/
public void testFailoverStartStopBackup() throws Exception {
- failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2);
+ failoverStartStopFilter(2);
}
/**
@@ -1698,7 +1739,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
stopGrid(idx);
- Thread.sleep(100);
+ awaitPartitionMapExchange();
+
+ Thread.sleep(200);
log.info("Start node: " + idx);
@@ -1706,6 +1749,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
CountDownLatch latch = new CountDownLatch(1);
+ awaitPartitionMapExchange();
+
assertTrue(checkLatch.compareAndSet(null, latch));
if (!stop.get()) {
@@ -1728,7 +1773,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>();
try {
- long stopTime = System.currentTimeMillis() + 10_000;
+ long stopTime = System.currentTimeMillis() + 60_000;
// Start new filter each 5 sec.
long startFilterTime = System.currentTimeMillis() + 5_000;
@@ -1752,7 +1797,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
if (dinQry != null) {
dinQry.close();
- log.error("Continuous query listener closed.");
+ log.info("Continuous query listener closed.");
checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0);
}
@@ -1767,7 +1812,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
dinQry = qryClnCache.query(newQry);
- log.error("Continuous query listener started.");
+ log.info("Continuous query listener started.");
startFilterTime = System.currentTimeMillis() + 5_000;
}