You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/23 13:52:13 UTC
[11/19] ignite git commit: IGNITE-426 WIP
IGNITE-426 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1cfeacb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1cfeacb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1cfeacb
Branch: refs/heads/ignite-426-2-reb
Commit: b1cfeacb988cb7b600f9bb33bdcc6c9a051d57cd
Parents: 01dfcbb
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue Sep 29 16:12:46 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Oct 23 14:50:08 2015 +0300
----------------------------------------------------------------------
.../communication/GridIoMessageFactory.java | 8 +-
.../continuous/CacheContinuousQueryEntry.java | 26 ++-
.../CacheContinuousQueryFilteredEntry.java | 228 -------------------
.../continuous/CacheContinuousQueryHandler.java | 226 +++++++++---------
.../CacheContinuousQueryListener.java | 9 +-
.../CacheContinuousQueryLostPartition.java | 72 +++---
.../continuous/CacheContinuousQueryManager.java | 15 +-
...acheContinuousQueryFailoverAbstractTest.java | 87 ++++++-
8 files changed, 282 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6eb9e17..3474f84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -92,7 +92,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse;
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
-import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
@@ -693,16 +692,11 @@ public class GridIoMessageFactory implements MessageFactory {
break;
case 115:
- msg = new CacheContinuousQueryFilteredEntry();
-
- break;
-
- case 116:
msg = new CacheContinuousQueryLostPartition();
break;
- // [-3..112] - this
+ // [-3..115] - this
// [120..123] - DR
// [-4..-22] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 470aa09..9e73142 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -83,6 +83,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
private long updateIdx;
/** */
+ private boolean filtered;
+
+ /** */
@GridToStringInclude
@GridDirectTransient
private AffinityTopologyVersion topVer;
@@ -152,6 +155,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * Mark this event as filtered.
+ */
+ void markFiltered() {
+ filtered = true;
+ }
+
+ /**
* @return Update index.
*/
long updateIndex() {
@@ -162,7 +172,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @return Filtered entry.
*/
boolean filtered() {
- return false;
+ return filtered;
}
/**
@@ -286,6 +296,12 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
+ case 7:
+ if (!writer.writeBoolean("filtered", filtered))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -359,6 +375,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
+ case 7:
+ filtered = reader.readBoolean("filtered");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(CacheContinuousQueryEntry.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
deleted file mode 100644
index 14d8f51..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.query.continuous;
-
-import java.nio.ByteBuffer;
-import javax.cache.event.EventType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Continuous query entry.
- */
-public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private EventType evtType;
-
- /** Cache name. */
- private int cacheId;
-
- /** Partition. */
- private int part;
-
- /** Update index. */
- private long updateIdx;
-
- /** */
- @GridToStringInclude
- @GridDirectTransient
- private AffinityTopologyVersion topVer;
-
- /**
- * Required by {@link Message}.
- */
- public CacheContinuousQueryFilteredEntry() {
- // No-op.
- }
-
- /**
- * @param e Cache continuous query entry.
- */
- CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) {
- this.cacheId = e.cacheId();
- this.evtType = e.eventType();
- this.part = e.partition();
- this.updateIdx = e.updateIndex();
- this.topVer = e.topologyVersion();
- }
-
- /**
- * @return Topology version if applicable.
- */
- @Nullable AffinityTopologyVersion topologyVersion() {
- return topVer;
- }
-
- /**
- * @return Cache ID.
- */
- int cacheId() {
- return cacheId;
- }
-
- /**
- * @return Event type.
- */
- EventType eventType() {
- return evtType;
- }
-
- /**
- * @return Partition.
- */
- int partition() {
- return part;
- }
-
- /**
- * @return Update index.
- */
- long updateIndex() {
- return updateIdx;
- }
-
- /** {@inheritDoc} */
- @Override boolean filtered() {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 115;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeInt("cacheId", cacheId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeInt("part", part))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeLong("updateIdx", updateIdx))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cacheId = reader.readInt("cacheId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- byte evtTypeOrd;
-
- evtTypeOrd = reader.readByte("evtType");
-
- if (!reader.isLastRead())
- return false;
-
- evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd);
-
- reader.incrementState();
-
- case 2:
- part = reader.readInt("part");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- updateIdx = reader.readLong("updateIdx");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
- return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class);
- }
-
- /** {@inheritDoc} */
- @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 4;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheContinuousQueryFilteredEntry.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index ca3579e..bb2558c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
@@ -83,6 +84,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private static final int BACKUP_ACK_THRESHOLD = 100;
+ /** */
+ private static final int QUERY_HOLE_THRESHOLD = 5;
+
/** Cache name. */
private String cacheName;
@@ -123,15 +127,12 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient Collection<CacheContinuousQueryEntry> backupQueue;
/** */
- private transient Map<Integer, Long> rcvCntrs;
+ private boolean localCache;
/** */
private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
/** */
- private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
-
- /** */
private transient AcknowledgeBuffer ackBuf;
/** */
@@ -187,16 +188,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.ignoreExpired = ignoreExpired;
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+ this.localCache = locCache;
- if (locCache)
- dupEvtFilter = F.alwaysTrue();
- else {
- rcvCntrs = new ConcurrentHashMap<>();
-
- rcvs = new ConcurrentHashMap<>();
-
- dupEvtFilter = new DuplicateEventFilter();
- }
+ rcvs = new ConcurrentHashMap<>();
cacheId = CU.cacheId(cacheName);
}
@@ -268,7 +262,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return;
- GridCacheContext<K, V> cctx = cacheContext(ctx);
+ final GridCacheContext<K, V> cctx = cacheContext(ctx);
// Check that cache stopped.
if (cctx == null)
@@ -289,12 +283,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
try {
- final CacheContinuousQueryEntry entry = notify ? evt.entry() :
- new CacheContinuousQueryFilteredEntry(evt.entry());
+ final CacheContinuousQueryEntry entry = evt.entry();
+
+ if (!notify)
+ entry.markFiltered();
if (primary || skipPrimaryCheck) {
if (loc) {
- if (dupEvtFilter.apply(entry)) {
+ if (!localCache) {
+ PartitionRecovery rcv = rcvs.get(entry.partition());
+
+ if (rcv == null) {
+ rcv = new PartitionRecovery(ctx.log(getClass()));
+
+ PartitionRecovery oldRec = rcvs.putIfAbsent(entry.partition(), rcv);
+
+ if (oldRec != null)
+ rcv = oldRec;
+ }
+
+ rcv.add(entry);
+
+ Collection<CacheContinuousQueryEntry> entries = rcv.entries();
+
+ if (!entries.isEmpty()) {
+ final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+ new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
+ @Override public CacheEntryEvent<? extends K, ? extends V> apply(
+ CacheContinuousQueryEntry e) {
+ return new CacheContinuousQueryEvent<>(cache, cctx, e);
+ }
+ },
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.filtered();
+ }
+ }
+ );
+
+ locLsnr.onUpdated(evts);
+
+ if (!skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
+ }
+ else {
locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
if (!skipPrimaryCheck)
@@ -343,7 +378,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
- @Override public void partitionLost(String cacheName0, int partId) {
+ @Override public void partitionLost(int partId) {
+ assert rcvs != null;
+
+ PartitionRecovery rcv = rcvs.get(partId);
+
+ if (rcv != null)
+ rcv.reset();
+ }
+
+ @Override public void firePartitionLostEvent(String cacheName0, final int partId) {
GridCacheContext<K, V> cctx = cacheContext(ctx);
// Check that cache stopped.
@@ -352,25 +396,33 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if ((cacheName == null && cacheName0 == null) || // Check default cache.
(cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
- final CacheContinuousQueryEntry entry =
- new CacheContinuousQueryLostPartition(cctx.cacheId(), partId);
+ CacheContinuousQueryLostPartition msg = new CacheContinuousQueryLostPartition(
+ routineId,
+ cctx.cacheId(),
+ partId);
- try {
- prepareEntry(cctx, nodeId, entry);
+ try {
+ cctx.io().send(nodeId, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
- ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
- }
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ if (log.isDebugEnabled())
+ log.debug("Failed to send lost partition message, node left " +
+ "[msg=" + msg + ", nodeId=" + routineId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
+ U.error(log, "Failed to send lost partition message " +
+ "[msg=" + msg + ", nodeId=" + routineId + ']', e);
+ }
+ }
+ });
}
}
@@ -537,14 +589,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
rec = oldRec;
}
- if (e instanceof CacheContinuousQueryLostPartition)
- rec.reset();
- else {
- rec.add(e);
+ rec.add(e);
- if (!parts.containsKey(e.partition()))
- parts.put(e.partition(), rec);
- }
+ if (!parts.containsKey(e.partition()))
+ parts.put(e.partition(), rec);
}
Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
@@ -569,29 +617,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
- * @param e Entry.
- * @return {@code True} if listener should be notified.
- */
- private boolean notifyListener(CacheContinuousQueryEntry e) {
- Integer part = e.partition();
-
- Long cntr = rcvCntrs.get(part);
-
- if (cntr != null) {
- long cntr0 = cntr;
-
- if (e.updateIndex() > cntr0)
- rcvCntrs.put(part, e.updateIndex());
- else
- return false;
- }
- else
- rcvCntrs.put(part, e.updateIndex());
-
- return true;
- }
-
- /**
*
*/
private static class PartitionRecovery {
@@ -617,15 +642,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @param e Cache continuous qeury entry.
*/
public void add(CacheContinuousQueryEntry e) {
- synchronized (pendingEnts) {
- if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt)
- e.cacheId();
- //log.info("Skip duplicate continuous query entry. Entry: " + e);
- else {
- //log.info("Added continuous query entry. Entry: " + e);
+ assert e != null;
+ synchronized (pendingEnts) {
+ if (!pendingEnts.containsKey(e.updateIndex()) && e.updateIndex() > lastFiredEvt)
pendingEnts.put(e.updateIndex(), e);
- }
+ else if (log.isDebugEnabled())
+ log.debug("Skip duplicate continuous query message: " + e);
}
}
@@ -641,45 +664,53 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
- Map.Entry<Long, CacheContinuousQueryEntry> prev = null;
-
- Set<Long> rmvEnts = new HashSet<>();
+ boolean fired = false;
+ // The elements are consistently.
while (iter.hasNext()) {
Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
- // The elements are consistently.
if (e.getKey() == lastFiredEvt + 1) {
++lastFiredEvt;
entries.add(e.getValue());
iter.remove();
+
+ fired = true;
+ }
+ }
+
+ if (!fired && lastFiredEvt == 0 && pendingEnts.size() >= QUERY_HOLE_THRESHOLD) {
+ Long prevCnt = null;
+
+ int orderedCnt = 0;
+
+ for (Long cnt : pendingEnts.keySet()) {
+ if (prevCnt != null) {
+ if (prevCnt + 1 != cnt)
+ break;
+ else
+ ++orderedCnt;
+ }
+
+ prevCnt = cnt;
}
- // Handle hole in sequence.
- else if (prev != null && prev.getKey() + 1 == e.getKey()) {
- entries.add(prev.getValue());
- lastFiredEvt = prev.getKey();
+ if (orderedCnt >= QUERY_HOLE_THRESHOLD) {
+ iter = pendingEnts.entrySet().iterator();
- rmvEnts.add(prev.getKey());
+ while (entries.size() < orderedCnt) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
- if (!iter.hasNext()) {
entries.add(e.getValue());
lastFiredEvt = e.getKey();
- rmvEnts.add(e.getKey());
+ iter.remove();
}
}
- else if (prev != null)
- break;
-
- prev = e;
}
-
- for (Long rmKey : rmvEnts)
- pendingEnts.remove(rmKey);
}
return entries;
@@ -737,12 +768,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** {@inheritDoc} */
@Override public void partitionLost(String cacheName, int partId) {
- if (this.cacheName == null) {
- int z = 0;
-
- ++z;
- }
-
if ((this.cacheName == null && cacheName == null) // Check default caches.
|| (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) {
PartitionRecovery rcv = rcvs.get(partId);
@@ -962,19 +987,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/**
- *
- */
- private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public boolean apply(CacheContinuousQueryEntry e) {
- return notifyListener(e);
- }
- }
-
- /**
* Deployable object.
*/
private static class DeployableObject implements Externalizable {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 735e808..a706105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -60,7 +60,14 @@ interface CacheContinuousQueryListener<K, V> {
* @param cacheName Cache name.
* @param partId Partition ID.
*/
- public void partitionLost(String cacheName, int partId);
+ public void firePartitionLostEvent(String cacheName, int partId);
+
+ /**
+ * Handle partition lost event.
+ *
+ * @param partId Partition ID.
+ */
+ public void partitionLost(int partId);
/**
* Flushes backup queue.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
index 734d072..eeb20cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
@@ -18,27 +18,22 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
import java.nio.ByteBuffer;
-import javax.cache.event.EventType;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import java.util.UUID;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
-import org.jetbrains.annotations.Nullable;
/**
* Continuous query entry.
*/
-public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry {
+public class CacheContinuousQueryLostPartition extends GridCacheMessage {
/** */
private static final long serialVersionUID = 0L;
- /** Cache name. */
- private int cacheId;
+ /** Routine ID. */
+ private UUID routineId;
/** Partition. */
private int part;
@@ -54,34 +49,38 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
* @param cacheId Cache ID.
* @param part Partition ID.
*/
- CacheContinuousQueryLostPartition(int cacheId, int part) {
+ CacheContinuousQueryLostPartition(UUID routineId, int cacheId, int part) {
+ this.routineId = routineId;
this.cacheId = cacheId;
this.part = part;
}
/**
- * @return Cache ID.
+ * @return Partition.
*/
- int cacheId() {
- return cacheId;
+ int partition() {
+ return part;
}
/**
- * @return Partition.
+ * @return Routine ID.
*/
- int partition() {
- return part;
+ UUID routineId() {
+ return routineId;
}
/** {@inheritDoc} */
@Override public byte directType() {
- return 116;
+ return 115;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
+ if (!super.writeTo(buf, writer))
+ return false;
+
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
@@ -90,17 +89,18 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
}
switch (writer.state()) {
- case 0:
- if (!writer.writeInt("cacheId", cacheId))
+ case 3:
+ if (!writer.writeInt("part", part))
return false;
writer.incrementState();
- case 1:
- if (!writer.writeInt("part", part))
+ case 4:
+ if (!writer.writeUuid("routineId", routineId))
return false;
writer.incrementState();
+
}
return true;
@@ -113,44 +113,36 @@ public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry
if (!reader.beforeMessageRead())
return false;
+ if (!super.readFrom(buf, reader))
+ return false;
+
switch (reader.state()) {
- case 0:
- cacheId = reader.readInt("cacheId");
+ case 3:
+ part = reader.readInt("part");
if (!reader.isLastRead())
return false;
reader.incrementState();
- case 1:
- part = reader.readInt("part");
+ case 4:
+ routineId = reader.readUuid("routineId");
if (!reader.isLastRead())
return false;
- reader.incrementState();
}
- return reader.afterMessageRead(CacheContinuousQueryLostPartition.class);
- }
-
- /** {@inheritDoc} */
- @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
- // No-op.
+ return true;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 2;
+ return 5;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheContinuousQueryLostPartition.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 35c6696..d0d877d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -46,7 +46,6 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -128,6 +127,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
});
+ cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryLostPartition.class,
+ new CI2<UUID, CacheContinuousQueryLostPartition>() {
+ @Override public void apply(UUID uuid, CacheContinuousQueryLostPartition msg) {
+ CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+ if (lsnr != null)
+ lsnr.partitionLost(msg.partition());
+ }
+ });
+
cctx.time().schedule(new Runnable() {
@Override public void run() {
for (CacheContinuousQueryListener lsnr : lsnrs.values())
@@ -145,10 +154,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt;
for (CacheContinuousQueryListener lsnr : lsnrs.values())
- lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+ lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
for (CacheContinuousQueryListener lsnr : intLsnrs.values())
- lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+ lsnr.firePartitionLostEvent(evt0.cacheName(), evt0.partition());
}
}, EVT_CACHE_REBALANCE_PART_DATA_LOST);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1cfeacb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 3bba5e6..61fa6cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -264,6 +264,84 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
+ public void testStartStopQuery() throws Exception {
+ this.backups = 1;
+
+ final int SRV_NODES = 3;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+ Ignite igniteSrv = ignite(0);
+
+ IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+ List<Integer> keys = testKeys(srvCache, 1);
+
+ int keyCnt = keys.size();
+
+ for (int j = 0; j < 50; ++j) {
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final TestLocalListener lsnr = new TestLocalListener();
+
+ qry.setLocalListener(lsnr);
+
+ int keyIter = 0;
+
+ for (; keyIter < keyCnt / 2; keyIter++) {
+ int key = keys.get(keyIter);
+
+ clnCache.put(key, key);
+ }
+
+ assert lsnr.evts.isEmpty();
+
+ QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ Affinity<Object> aff = affinity(srvCache);
+
+ for (; keyIter < keys.size(); keyIter++) {
+ int key = keys.get(keyIter);
+
+ log.info("Put [key=" + key + ", part=" + aff.partition(key) + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)key, null));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)key, (Object)key));
+
+ expEvts.add(new T3<>((Object)key, (Object)key, (Object)key));
+ }
+
+ srvCache.put(key, key);
+ }
+
+ checkEvents(expEvts, lsnr);
+
+ query.close();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testLeftPrimaryAndBackupNodes() throws Exception {
this.backups = 1;
@@ -745,8 +823,13 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
* @param expEvts Expected events.
* @param lsnr Listener.
*/
- private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener lsnr) {
- assert lsnr.evts.size() == expEvts.size();
+ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final TestLocalListener lsnr)
+ throws Exception {
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return lsnr.evts.size() == expEvts.size();
+ }
+ }, 2000L);
for (T3<Object, Object, Object> exp : expEvts) {
CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());