You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/31 09:23:10 UTC
[20/51] ignite git commit: Continuous queries fixes: - flush backup
queue on exchange end (otherwise we don't really wait for all current
operations) - on coordinator apply counters after all single messages
received (otherwise extra counter increments a
Continuous queries fixes:
- flush backup queue on exchange end (otherwise we don't really wait for all current operations)
- on coordinator apply counters after all single messages received (otherwise extra counter increments are possible)
- do not send info about filtered entries if do not have non-filtered entry
- added system properties for hardcoded constants
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42293fac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42293fac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42293fac
Branch: refs/heads/ignite-5075-pds
Commit: 42293fac88c29544b7c55c0340224afbf474a301
Parents: 827b7f6
Author: sboikov <sb...@gridgain.com>
Authored: Mon May 29 16:41:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon May 29 16:41:23 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheIoManager.java | 6 +-
.../processors/cache/GridCacheMapEntry.java | 5 +-
.../GridCachePartitionExchangeManager.java | 2 +-
.../dht/GridClientPartitionTopology.java | 31 +-
.../dht/GridDhtPartitionTopology.java | 9 +-
.../dht/GridDhtPartitionTopologyImpl.java | 59 +-
.../GridDhtPartitionsExchangeFuture.java | 51 +-
.../CacheContinuousQueryAcknowledgeBuffer.java | 120 +++
.../CacheContinuousQueryDeployableObject.java | 110 +++
.../continuous/CacheContinuousQueryEntry.java | 117 ++-
.../CacheContinuousQueryEventBuffer.java | 483 ++++++++++++
.../continuous/CacheContinuousQueryHandler.java | 733 +++----------------
.../CacheContinuousQueryHandlerV2.java | 6 +-
.../continuous/CacheContinuousQueryManager.java | 16 +-
.../CacheContinuousQueryPartitionRecovery.java | 267 +++++++
.../continuous/GridContinuousBatchAdapter.java | 2 +-
.../continuous/GridContinuousProcessor.java | 19 +-
.../continuous/GridContinuousQueryBatch.java | 16 +-
...tinuousQueryAsyncFailoverAtomicSelfTest.java | 1 -
...nuousQueryConcurrentPartitionUpdateTest.java | 304 ++++++++
.../CacheContinuousQueryEventBufferTest.java | 217 ++++++
...ContinuousQueryFailoverAbstractSelfTest.java | 79 +-
...niteCacheContinuousQueryBackupQueueTest.java | 13 +-
...eCacheContinuousQueryImmutableEntryTest.java | 6 +-
.../IgniteCacheQuerySelfTestSuite3.java | 5 +
25 files changed, 1885 insertions(+), 792 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index fdd29e4..bb31645 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -874,7 +874,11 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
finally {
// Reset thread local context.
cctx.tm().resetContext();
- cctx.mvcc().contextReset();
+
+ GridCacheMvccManager mvcc = cctx.mvcc();
+
+ if (mvcc != null)
+ mvcc.contextReset();
// Unwind eviction notifications.
if (msg instanceof IgniteTxStateAware) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 4f87658..7c7fc99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -29,7 +28,6 @@ import javax.cache.Cache;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorResult;
-
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -62,8 +60,8 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConfl
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.processors.dr.GridDrType;
-import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitorClosure;
+import org.apache.ignite.internal.util.IgniteTree;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.lang.GridTuple;
@@ -76,7 +74,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5314088..2eec8f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1312,7 +1312,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
top = cacheCtx.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue()) != null;
cctx.affinity().checkRebalanceState(top, cacheId);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 1de64c5..43bc609 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -650,11 +650,29 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+ assert cntrMap != null;
+
+ lock.writeLock().lock();
+
+ try {
+ for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+ T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr.get2() < e.getValue().get2())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(
@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts,
- Map<Integer, T2<Long, Long>> cntrMap
+ GridDhtPartitionMap parts
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -733,15 +751,6 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
- if (cntrMap != null) {
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
- if (cntr == null || cntr.get2() < e.getValue().get2())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
- }
-
consistencyCheck();
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index f9fd852..ffc1d63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -234,12 +234,15 @@ public interface GridDhtPartitionTopology {
/**
* @param exchId Exchange ID.
* @param parts Partitions.
- * @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
@Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts,
- @Nullable Map<Integer, T2<Long, Long>> cntrMap);
+ GridDhtPartitionMap parts);
+
+ /**
+ * @param cntrMap Counters map.
+ */
+ public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap);
/**
* Checks if there is at least one owner for each partition in the cache topology.
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8e79eda..7adce6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -1256,11 +1256,45 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public void applyUpdateCounters(Map<Integer, T2<Long, Long>> cntrMap) {
+ assert cntrMap != null;
+
+ lock.writeLock().lock();
+
+ try {
+ if (stopping)
+ return;
+
+ for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
+ T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr.get2() < e.getValue().get2())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+
+ for (int i = 0; i < locParts.length(); i++) {
+ GridDhtLocalPartition part = locParts.get(i);
+
+ if (part == null)
+ continue;
+
+ T2<Long, Long> cntr = cntrMap.get(part.id());
+
+ if (cntr != null && cntr.get2() > part.updateCounter())
+ part.updateCounter(cntr.get2());
+ }
+ }
+ finally {
+ lock.writeLock().unlock();
+
+ }
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(
@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts,
- @Nullable Map<Integer, T2<Long, Long>> cntrMap
+ GridDhtPartitionMap parts
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -1279,27 +1313,6 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (stopping)
return null;
- if (cntrMap != null) {
- for (Map.Entry<Integer, T2<Long, Long>> e : cntrMap.entrySet()) {
- T2<Long, Long> cntr = this.cntrMap.get(e.getKey());
-
- if (cntr == null || cntr.get2() < e.getValue().get2())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
-
- for (int i = 0; i < locParts.length(); i++) {
- GridDhtLocalPartition part = locParts.get(i);
-
- if (part == null)
- continue;
-
- T2<Long, Long> cntr = cntrMap.get(part.id());
-
- if (cntr != null && cntr.get2() > part.updateCounter())
- part.updateCounter(cntr.get2());
- }
- }
-
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 544f847..72c5bbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.PartitionLossPolicy;
@@ -47,18 +46,19 @@ import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
-import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.snapshot.StartFullSnapshotAckDiscoveryMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
-import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -789,14 +789,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
- //todo check
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
continue;
if (topChanged) {
- cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
-
// Partition release future is done so we can flush the write-behind store.
cacheCtx.store().forceFlush();
}
@@ -1101,10 +1098,31 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ /**
+ * @return {@code True} if exchange triggered by server node join or fail.
+ */
+ private boolean serverNodeDiscoveryEvent() {
+ assert discoEvt != null;
+
+ return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
+ }
+
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
boolean realExchange = !dummy && !forcePreload;
+ if (err == null &&
+ realExchange &&
+ !cctx.kernalContext().clientNode() &&
+ (serverNodeDiscoveryEvent() || affChangeMsg != null)) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
+ continue;
+
+ cacheCtx.continuousQueries().flushBackupQueue(exchId.topologyVersion());
+ }
+ }
+
if (err == null && realExchange) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
@@ -1554,6 +1572,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
+ for (GridDhtPartitionsAbstractMessage msg : msgs.values()) {
+ if (msg instanceof GridDhtPartitionsSingleMessage) {
+ GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg;
+
+ for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) {
+ Integer cacheId = entry.getKey();
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
+ cctx.exchange().clientTopology(cacheId, this);
+
+ Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(cacheId);
+
+ if (cntrs != null)
+ top.applyUpdateCounters(cntrs);
+ }
+ }
+ }
+
if (discoEvt.type() == EVT_NODE_JOINED) {
if (cctx.kernalContext().state().active())
assignPartitionsStates();
@@ -1785,7 +1822,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() :
cctx.exchange().clientTopology(cacheId, this);
- top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId));
+ top.update(exchId, entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
new file mode 100644
index 0000000..c95dc42
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryAcknowledgeBuffer.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousQueryBatch;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+class CacheContinuousQueryAcknowledgeBuffer {
+ /** */
+ private int size;
+
+ /** */
+ @GridToStringInclude
+ private Map<Integer, Long> updateCntrs = new HashMap<>();
+
+ /** */
+ @GridToStringInclude
+ private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+ /**
+ * @param batch Batch.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>onAcknowledged(
+ GridContinuousBatch batch) {
+ assert batch instanceof GridContinuousQueryBatch;
+
+ size += ((GridContinuousQueryBatch)batch).entriesCount();
+
+ Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+ for (CacheContinuousQueryEntry e : entries)
+ addEntry(e);
+
+ return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(CacheContinuousQueryEntry e) {
+ size++;
+
+ addEntry(e);
+
+ return size >= CacheContinuousQueryHandler.BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ */
+ private void addEntry(CacheContinuousQueryEntry e) {
+ topVers.add(e.topologyVersion());
+
+ Long cntr0 = updateCntrs.get(e.partition());
+
+ if (cntr0 == null || e.updateCounter() > cntr0)
+ updateCntrs.put(e.partition(), e.updateCounter());
+ }
+
+ /**
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ acknowledgeOnTimeout() {
+ return size > 0 ? acknowledgeData() : null;
+ }
+
+ /**
+ * @return Tuple with acknowledge information.
+ */
+ private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+ assert size > 0;
+
+ Map<Integer, Long> cntrs = new HashMap<>(updateCntrs);
+
+ IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+ new IgniteBiTuple<>(cntrs, topVers);
+
+ topVers = U.newHashSet(1);
+
+ size = 0;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryAcknowledgeBuffer.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
new file mode 100644
index 0000000..f888467
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryDeployableObject.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.managers.deployment.GridDeployment;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Deployable object.
+ */
+class CacheContinuousQueryDeployableObject implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Serialized object. */
+ private byte[] bytes;
+
+ /** Deployment class name. */
+ private String clsName;
+
+ /** Deployment info. */
+ private GridDeploymentInfo depInfo;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public CacheContinuousQueryDeployableObject() {
+ // No-op.
+ }
+
+ /**
+ * @param obj Object.
+ * @param ctx Kernal context.
+ * @throws IgniteCheckedException In case of error.
+ */
+ protected CacheContinuousQueryDeployableObject(Object obj, GridKernalContext ctx) throws IgniteCheckedException {
+ assert obj != null;
+ assert ctx != null;
+
+ Class cls = U.detectClass(obj);
+
+ clsName = cls.getName();
+
+ GridDeployment dep = ctx.deploy().deploy(cls, U.detectClassLoader(cls));
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException("Failed to deploy object: " + obj);
+
+ depInfo = new GridDeploymentInfoBean(dep);
+
+ bytes = U.marshal(ctx, obj);
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param ctx Kernal context.
+ * @return Deserialized object.
+ * @throws IgniteCheckedException In case of error.
+ */
+ <T> T unmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException {
+ assert ctx != null;
+
+ GridDeployment dep = ctx.deploy().getGlobalDeployment(depInfo.deployMode(), clsName, clsName,
+ depInfo.userVersion(), nodeId, depInfo.classLoaderId(), depInfo.participants(), null);
+
+ if (dep == null)
+ throw new IgniteDeploymentCheckedException("Failed to obtain deployment for class: " + clsName);
+
+ return U.unmarshal(ctx, bytes, U.resolveClassLoader(dep.classLoader(), ctx.config()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeByteArray(out, bytes);
+ U.writeString(out, clsName);
+ out.writeObject(depInfo);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ bytes = U.readByteArray(in);
+ clsName = U.readString(in);
+ depInfo = (GridDeploymentInfo)in.readObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index bf2a691..7e3f0b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -51,6 +51,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
private static final byte FILTERED_ENTRY = 0b0010;
/** */
+ private static final byte KEEP_BINARY = 0b0100;
+
+ /** */
private static final EventType[] EVT_TYPE_VALS = EventType.values();
/**
@@ -105,11 +108,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
@GridToStringInclude
private AffinityTopologyVersion topVer;
- /** Filtered events. */
- private GridLongList filteredEvts;
-
- /** Keep binary. */
- private boolean keepBinary;
+ /** */
+ private long filteredCnt;
/**
* Required by {@link Message}.
@@ -124,9 +124,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param keepBinary Keep binary flag.
* @param part Partition.
* @param updateCntr Update partition counter.
* @param topVer Topology version if applicable.
+ * @param flags Flags.
*/
CacheContinuousQueryEntry(
int cacheId,
@@ -137,7 +139,8 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
boolean keepBinary,
int part,
long updateCntr,
- @Nullable AffinityTopologyVersion topVer) {
+ @Nullable AffinityTopologyVersion topVer,
+ byte flags) {
this.cacheId = cacheId;
this.evtType = evtType;
this.key = key;
@@ -146,7 +149,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
this.part = part;
this.updateCntr = updateCntr;
this.topVer = topVer;
- this.keepBinary = keepBinary;
+ this.flags = flags;
+
+ if (keepBinary)
+ this.flags |= KEEP_BINARY;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte flags() {
+ return flags;
}
/**
@@ -207,26 +220,40 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
- * @return Size include this event and filtered.
+ * @param filteredCnt Number of entries filtered before this entry.
+ */
+ void filteredCount(long filteredCnt) {
+ assert filteredCnt >= 0 : filteredCnt;
+
+ this.filteredCnt = filteredCnt;
+ }
+
+ /**
+ * @return Number of entries filtered before this entry.
*/
- public int size() {
- return filteredEvts != null ? filteredEvts.size() + 1 : 1;
+ long filteredCount() {
+ return filteredCnt;
}
/**
* @return If entry filtered then will return light-weight <i><b>new entry</b></i> without values and key
* (avoid to huge memory consumption), otherwise {@code this}.
*/
- CacheContinuousQueryEntry forBackupQueue() {
+ CacheContinuousQueryEntry copyWithDataReset() {
if (!isFiltered())
return this;
- CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(
- cacheId, null, null, null, null, keepBinary, part, updateCntr, topVer);
-
- e.flags = flags;
-
- return e;
+ return new CacheContinuousQueryEntry(
+ cacheId,
+ null,
+ null,
+ null,
+ null,
+ false,
+ part,
+ updateCntr,
+ topVer,
+ flags);
}
/**
@@ -247,21 +274,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @return Keep binary flag.
*/
boolean isKeepBinary() {
- return keepBinary;
- }
-
- /**
- * @param cntrs Filtered events.
- */
- void filteredEvents(GridLongList cntrs) {
- filteredEvts = cntrs;
- }
-
- /**
- * @return previous filtered events.
- */
- long[] filteredEvents() {
- return filteredEvts == null ? null : filteredEvts.array();
+ return (flags & KEEP_BINARY) != 0;
}
/**
@@ -363,7 +376,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
case 2:
- if (!writer.writeMessage("filteredEvts", filteredEvts))
+ if (!writer.writeLong("filteredCnt", filteredCnt))
return false;
writer.incrementState();
@@ -375,42 +388,36 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
case 4:
- if (!writer.writeBoolean("keepBinary", keepBinary))
- return false;
-
- writer.incrementState();
-
- case 5:
if (!writer.writeMessage("key", isFiltered() ? null : key))
return false;
writer.incrementState();
- case 6:
+ case 5:
if (!writer.writeMessage("newVal", isFiltered() ? null : newVal))
return false;
writer.incrementState();
- case 7:
+ case 6:
if (!writer.writeMessage("oldVal", isFiltered() ? null : oldVal))
return false;
writer.incrementState();
- case 8:
+ case 7:
if (!writer.writeInt("part", part))
return false;
writer.incrementState();
- case 9:
+ case 8:
if (!writer.writeMessage("topVer", topVer))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeLong("updateCntr", updateCntr))
return false;
@@ -446,7 +453,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 2:
- filteredEvts = reader.readMessage("filteredEvts");
+ filteredCnt = reader.readLong("filteredCnt");
if (!reader.isLastRead())
return false;
@@ -462,14 +469,6 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
case 4:
- keepBinary = reader.readBoolean("keepBinary");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
key = reader.readMessage("key");
if (!reader.isLastRead())
@@ -477,7 +476,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 6:
+ case 5:
newVal = reader.readMessage("newVal");
if (!reader.isLastRead())
@@ -485,7 +484,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 7:
+ case 6:
oldVal = reader.readMessage("oldVal");
if (!reader.isLastRead())
@@ -493,7 +492,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 8:
+ case 7:
part = reader.readInt("part");
if (!reader.isLastRead())
@@ -501,7 +500,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 9:
+ case 8:
topVer = reader.readMessage("topVer");
if (!reader.isLastRead())
@@ -509,7 +508,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
- case 10:
+ case 9:
updateCntr = reader.readLong("updateCntr");
if (!reader.isLastRead())
@@ -524,7 +523,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 11;
+ return 10;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/42293fac/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
new file mode 100644
index 0000000..336f650
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -0,0 +1,483 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ *
+ */
+public class CacheContinuousQueryEventBuffer {
+ /** */
+ private static final int BUF_SIZE =
+ IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 1000);
+
+ /** */
+ private static final Object RETRY = new Object();
+
+ /** */
+ protected final int part;
+
+ /** */
+ private AtomicReference<Batch> curBatch = new AtomicReference<>();
+
+ /** */
+ private ConcurrentLinkedDeque8<CacheContinuousQueryEntry> backupQ = new ConcurrentLinkedDeque8<>();
+
+ /** */
+ private ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending = new ConcurrentSkipListMap<>();
+
+ /**
+ * @param part Partition number.
+ */
+ CacheContinuousQueryEventBuffer(int part) {
+ this.part = part;
+ }
+
+ /**
+ * @param updateCntr Acknowledged counter.
+ */
+ void cleanupBackupQueue(Long updateCntr) {
+ Iterator<CacheContinuousQueryEntry> it = backupQ.iterator();
+
+ while (it.hasNext()) {
+ CacheContinuousQueryEntry backupEntry = it.next();
+
+ if (backupEntry.updateCounter() <= updateCntr)
+ it.remove();
+ }
+ }
+
+ /**
+ * @return Backup entries.
+ */
+ @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() {
+ TreeMap<Long, CacheContinuousQueryEntry> ret = null;
+
+ int size = backupQ.sizex();
+
+ if (size > 0) {
+ ret = new TreeMap<>();
+
+ for (int i = 0; i < size; i++) {
+ CacheContinuousQueryEntry e = backupQ.pollFirst();
+
+ if (e != null)
+ ret.put(e.updateCounter(), e);
+ else
+ break;
+ }
+ }
+
+ Batch batch = curBatch.get();
+
+ if (batch != null)
+ ret = batch.flushCurrentEntries(ret);
+
+ if (!pending.isEmpty()) {
+ if (ret == null)
+ ret = new TreeMap<>();
+
+ for (CacheContinuousQueryEntry e : pending.values())
+ ret.put(e.updateCounter(), e);
+ }
+
+ return ret != null ? ret.values() : null;
+ }
+
+ /**
+ * @return Initial partition counter.
+ */
+ protected long currentPartitionCounter() {
+ return 0;
+ }
+
+ /**
+ * For test purpose only.
+ *
+ * @return Current number of filtered events.
+ */
+ long currentFiltered() {
+ Batch batch = curBatch.get();
+
+ return batch != null ? batch.filtered : 0;
+ }
+
+ /**
+ * @param e Entry to process.
+ * @param backup Backup entry flag.
+ * @return Collected entries to pass to listener (single entry or entries list).
+ */
+ @Nullable Object processEntry(CacheContinuousQueryEntry e, boolean backup) {
+ return process0(e.updateCounter(), e, backup);
+ }
+
+ /**
+ * @param backup Backup entry flag.
+ * @param cntr Entry counter.
+ * @param entry Entry.
+ * @return Collected entries.
+ */
+ private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
+ assert cntr >= 0 : cntr;
+
+ Batch batch;
+ Object res = null;
+
+ for (;;) {
+ batch = initBatch(entry.topologyVersion());
+
+ if (batch == null || cntr < batch.startCntr) {
+ if (backup)
+ backupQ.add(entry);
+
+ return entry;
+ }
+
+ if (cntr <= batch.endCntr) {
+ res = batch.processEntry0(null, cntr, entry, backup);
+
+ if (res == RETRY)
+ continue;
+ }
+ else
+ pending.put(cntr, entry);
+
+ break;
+ }
+
+ Batch batch0 = curBatch.get();
+
+ if (batch0 != batch) {
+ do {
+ batch = batch0;
+
+ res = processPending(res, batch, backup);
+
+ batch0 = initBatch(entry.topologyVersion());
+ }
+ while (batch != batch0);
+ }
+
+ return res;
+ }
+
+ /**
+ * @param topVer Current event topology version.
+ * @return Current batch.
+ */
+ @Nullable private Batch initBatch(AffinityTopologyVersion topVer) {
+ Batch batch = curBatch.get();
+
+ if (batch != null)
+ return batch;
+
+ for (;;) {
+ long curCntr = currentPartitionCounter();
+
+ if (curCntr == -1)
+ return null;
+
+ batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
+
+ if (curBatch.compareAndSet(null, batch))
+ return batch;
+
+ batch = curBatch.get();
+
+ if (batch != null)
+ return batch;
+ }
+ }
+
+ /**
+ * @param res Current result.
+ * @param batch Current batch.
+ * @param backup Backup entry flag.
+ * @return New result.
+ */
+ @Nullable private Object processPending(@Nullable Object res, Batch batch, boolean backup) {
+ if (pending.floorKey(batch.endCntr) != null) {
+ for (Map.Entry<Long, CacheContinuousQueryEntry> p : pending.headMap(batch.endCntr, true).entrySet()) {
+ long cntr = p.getKey();
+
+ assert cntr <= batch.endCntr;
+
+ if (pending.remove(p.getKey()) != null) {
+ if (cntr < batch.startCntr)
+ res = addResult(res, p.getValue(), backup);
+ else
+ res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
+ }
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * @param res Current result.
+ * @param entry Entry to add.
+ * @param backup Backup entry flag.
+ * @return Updated result.
+ */
+ @Nullable private Object addResult(@Nullable Object res, CacheContinuousQueryEntry entry, boolean backup) {
+ if (res == null) {
+ if (backup)
+ backupQ.add(entry);
+ else
+ res = entry;
+ }
+ else {
+ assert !backup;
+
+ List<CacheContinuousQueryEntry> resList;
+
+ if (res instanceof CacheContinuousQueryEntry) {
+ resList = new ArrayList<>();
+
+ resList.add((CacheContinuousQueryEntry)res);
+ }
+ else {
+ assert res instanceof List : res;
+
+ resList = (List<CacheContinuousQueryEntry>)res;
+ }
+
+ resList.add(entry);
+
+ res = resList;
+ }
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ private class Batch {
+ /** */
+ private long filtered;
+
+ /** */
+ private final long startCntr;
+
+ /** */
+ private final long endCntr;
+
+ /** */
+ private int lastProc = -1;
+
+ /** */
+ private CacheContinuousQueryEntry[] entries;
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /**
+ * @param filtered Number of filtered events before this batch.
+ * @param entries Entries array.
+ * @param topVer Current event topology version.
+ * @param startCntr Start counter.
+ */
+ Batch(long startCntr, long filtered, CacheContinuousQueryEntry[] entries, AffinityTopologyVersion topVer) {
+ assert startCntr >= 0;
+ assert filtered >= 0;
+
+ this.startCntr = startCntr;
+ this.filtered = filtered;
+ this.entries = entries;
+ this.topVer = topVer;
+
+ endCntr = startCntr + BUF_SIZE - 1;
+ }
+
+ /**
+ * @param res Current entries.
+ * @return Entries to send as part of backup queue.
+ */
+ @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+ @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
+ if (entries == null)
+ return res;
+
+ long filtered = this.filtered;
+ long cntr = startCntr;
+
+ for (int i = 0; i < entries.length; i++) {
+ CacheContinuousQueryEntry e = entries[i];
+
+ CacheContinuousQueryEntry flushEntry = null;
+
+ if (e == null) {
+ if (filtered != 0) {
+ flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+ filtered = 0;
+ }
+ }
+ else {
+ if (e.isFiltered())
+ filtered++;
+ else {
+ flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
+ e.eventType(),
+ e.key(),
+ e.value(),
+ e.oldValue(),
+ e.isKeepBinary(),
+ e.partition(),
+ e.updateCounter(),
+ e.topologyVersion(),
+ e.flags());
+
+ flushEntry.filteredCount(filtered);
+
+ filtered = 0;
+ }
+ }
+
+ if (flushEntry != null) {
+ if (res == null)
+ res = new TreeMap<>();
+
+ res.put(flushEntry.updateCounter(), flushEntry);
+ }
+
+ cntr++;
+ }
+
+ if (filtered != 0L) {
+ if (res == null)
+ res = new TreeMap<>();
+
+ CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered - 1);
+
+ res.put(flushEntry.updateCounter(), flushEntry);
+ }
+
+ return res;
+ }
+
+ /**
+ * @param cntr Entry counter.
+ * @param filtered Number of entries filtered before this entry.
+ * @return Entry.
+ */
+ private CacheContinuousQueryEntry filteredEntry(long cntr, long filtered) {
+ CacheContinuousQueryEntry e = new CacheContinuousQueryEntry(0,
+ null,
+ null,
+ null,
+ null,
+ false,
+ part,
+ cntr,
+ topVer,
+ (byte)0);
+
+ e.markFiltered();
+
+ e.filteredCount(filtered);
+
+ return e;
+ }
+
+ /**
+ * @param res Current result.
+ * @param cntr Entry counter.
+ * @param entry Entry.
+ * @param backup Backup entry flag.
+ * @return New result.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable private Object processEntry0(
+ @Nullable Object res,
+ long cntr,
+ CacheContinuousQueryEntry entry,
+ boolean backup) {
+ int pos = (int)(cntr - startCntr);
+
+ synchronized (this) {
+ if (entries == null)
+ return RETRY;
+
+ entry = entry.copyWithDataReset();
+
+ entries[pos] = entry;
+
+ int next = lastProc + 1;
+
+ if (next == pos) {
+ for (int i = next; i < entries.length; i++) {
+ CacheContinuousQueryEntry entry0 = entries[i];
+
+ if (entry0 != null) {
+ if (!entry0.isFiltered()) {
+ entry0.filteredCount(filtered);
+
+ filtered = 0;
+
+ res = addResult(res, entry0, backup);
+ }
+ else
+ filtered++;
+
+ pos = i;
+ }
+ else
+ break;
+ }
+
+ lastProc = pos;
+
+ if (pos == entries.length - 1) {
+ Arrays.fill(entries, null);
+
+ Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
+ filtered,
+ entries,
+ entry.topologyVersion());
+
+ entries = null;
+
+ assert curBatch.get() == this;
+
+ curBatch.set(nextBatch);
+ }
+ }
+ else
+ return res;
+ }
+
+ return res;
+ }
+ }
+}
\ No newline at end of file