You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/23 13:52:10 UTC
[08/19] ignite git commit: IGNITE-426 temp commit.
IGNITE-426 temp commit.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/10ab67ad
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/10ab67ad
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/10ab67ad
Branch: refs/heads/ignite-426-2-reb
Commit: 10ab67ad0908d6f88b5330c07affc26decdd3f3c
Parents: 0bc1d6f
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Wed Sep 2 15:38:50 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Fri Oct 23 14:50:06 2015 +0300
----------------------------------------------------------------------
.../dht/GridClientPartitionTopology.java | 40 +-
.../distributed/dht/GridDhtLocalPartition.java | 61 +-
.../dht/GridDhtPartitionTopology.java | 26 +-
.../dht/GridDhtPartitionTopologyImpl.java | 112 +-
.../CacheContinuousQueryBatchAck.java | 156 +++
.../continuous/CacheContinuousQueryEntry.java | 74 +-
.../continuous/CacheContinuousQueryHandler.java | 345 +++++-
.../CacheContinuousQueryListener.java | 34 +-
.../continuous/CacheContinuousQueryManager.java | 88 +-
.../continuous/GridContinuousBatch.java | 7 +
.../continuous/GridContinuousBatchAdapter.java | 7 +
.../continuous/GridContinuousProcessor.java | 173 ++-
...acheContinuousQueryFailoverAbstractTest.java | 1104 ++++++++++++++++++
...ueryFailoverAtomicPrimaryWriteOrderTest.java | 32 +
...inuousQueryFailoverAtomicReplicatedTest.java | 39 +
.../CacheContinuousQueryFailoverAtomicTest.java | 39 +
...ContinuousQueryFailoverTxReplicatedTest.java | 32 +
.../CacheContinuousQueryFailoverTxTest.java | 39 +
18 files changed, 2311 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 162c116..516b7bd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -94,6 +94,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** Lock. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Partition update counter. */
+ private Map<Integer, Long> cntrMap = new HashMap<>();
+
/**
* @param cctx Context.
* @param cacheId Cache ID.
@@ -527,7 +530,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionFullMap partMap) {
+ GridDhtPartitionFullMap partMap,
+ Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
@@ -602,6 +606,9 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
part2node = p2n;
+ if (cntrMap != null)
+ this.cntrMap = new HashMap<>(cntrMap);
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -617,7 +624,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts) {
+ GridDhtPartitionMap parts,
+ Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -698,6 +706,15 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
}
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+ }
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -852,6 +869,25 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> updateCounters() {
+ lock.readLock().lock();
+
+ try {
+ return new HashMap<>(cntrMap);
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cacheId=" + cacheId + ']');
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 4f124e6..975d76c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -17,6 +17,18 @@
package org.apache.ignite.internal.processors.cache.distributed.dht;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicStampedReference;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.cache.CacheException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -111,11 +123,14 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
private final LongAdder8 mapPubSize = new LongAdder8();
/** Remove queue. */
- private final GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
+ private GridCircularBuffer<T2<KeyCacheObject, GridCacheVersion>> rmvQueue;
/** Group reservations. */
private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
+ /** Continuous query update index. */
+ private final AtomicLong contQryUpdIdx = new AtomicLong();
+
/**
* @param cctx Context.
* @param id Partition ID.
@@ -141,7 +156,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
- rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
+ if (cctx.deferredDelete())
+ rmvQueue = new GridCircularBuffer<>(U.ceilPow2(delQueueSize));
}
/**
@@ -295,6 +311,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
* @throws IgniteCheckedException If failed.
*/
public void onDeferredDelete(KeyCacheObject key, GridCacheVersion ver) throws IgniteCheckedException {
+ assert cctx.deferredDelete();
+
try {
T2<KeyCacheObject, GridCacheVersion> evicted = rmvQueue.add(new T2<>(key, ver));
@@ -496,7 +514,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
- clearDeferredDeletes();
+ if (cctx.deferredDelete())
+ clearDeferredDeletes();
return new GridFinishedFuture<>(true);
}
@@ -541,13 +560,16 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
if (cctx.isDrEnabled())
cctx.dr().partitionEvicted(id);
+ cctx.continuousQueries().onPartitionEvicted(id);
+
cctx.dataStructures().onPartitionEvicted(id);
rent.onDone();
((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
- clearDeferredDeletes();
+ if (cctx.deferredDelete())
+ clearDeferredDeletes();
return true;
}
@@ -604,6 +626,35 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
}
/**
+ * @return Next update index.
+ */
+ public long nextContinuousQueryUpdateIndex() {
+ return contQryUpdIdx.incrementAndGet();
+ }
+
+ /**
+ * @return Current update index.
+ */
+ public long continuousQueryUpdateIndex() {
+ return contQryUpdIdx.get();
+ }
+
+ /**
+ * @param val Update index value.
+ */
+ public void continuousQueryUpdateIndex(long val) {
+ while (true) {
+ long val0 = contQryUpdIdx.get();
+
+ if (val0 >= val)
+ break;
+
+ if (contQryUpdIdx.compareAndSet(val0, val))
+ break;
+ }
+ }
+
+ /**
* Clears values for this partition.
*/
private void clearAll() {
@@ -753,6 +804,8 @@ public class GridDhtLocalPartition implements Comparable<GridDhtLocalPartition>,
*
*/
private void clearDeferredDeletes() {
+ assert cctx.deferredDelete();
+
rmvQueue.forEach(new CI1<T2<KeyCacheObject, GridCacheVersion>>() {
@Override public void apply(T2<KeyCacheObject, GridCacheVersion> t) {
cctx.dht().removeVersionedEntry(t.get1(), t.get2());
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index d642314..3ac2b85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
@@ -51,6 +52,8 @@ public interface GridDhtPartitionTopology {
*
* @param exchId Exchange ID.
* @param exchFut Exchange future.
+ * @param updateSeq Update sequence.
+ * @param stopping Stopping flag.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void updateTopologyVersion(
@@ -193,17 +196,27 @@ public interface GridDhtPartitionTopology {
/**
* @param exchId Exchange ID.
* @param partMap Update partition map.
+ * @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
- public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, GridDhtPartitionFullMap partMap);
+ public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
+ GridDhtPartitionFullMap partMap,
+ @Nullable Map<Integer, Long> cntrMap);
/**
* @param exchId Exchange ID.
* @param parts Partitions.
+ * @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
@Nullable public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts);
+ GridDhtPartitionMap parts,
+ @Nullable Map<Integer, Long> cntrMap);
+
+ /**
+ * @return Partition update counters.
+ */
+ public Map<Integer, Long> updateCounters();
/**
* @param part Partition to own.
@@ -213,6 +226,7 @@ public interface GridDhtPartitionTopology {
/**
* @param part Evicted partition.
+ * @param updateSeq Update sequence increment flag.
*/
public void onEvicted(GridDhtLocalPartition part, boolean updateSeq);
@@ -228,4 +242,10 @@ public interface GridDhtPartitionTopology {
* @param threshold Threshold for number of entries.
*/
public void printMemoryStats(int threshold);
-}
\ No newline at end of file
+
+ /**
+ * @param topVer Topology version.
+ * @return {@code True} if rebalance process finished.
+ */
+ public boolean rebalanceFinished(AffinityTopologyVersion topVer);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 6bd283a..5d312b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -102,6 +102,12 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** Lock. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Partition update counter. */
+ private Map<Integer, Long> cntrMap = new HashMap<>();
+
+ /** */
+ private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
+
/**
* @param cctx Context.
*/
@@ -131,6 +137,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
topReadyFut = null;
topVer = AffinityTopologyVersion.NONE;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;
}
finally {
lock.writeLock().unlock();
@@ -220,6 +228,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
+
+ rebalancedTopVer = AffinityTopologyVersion.NONE;;
}
finally {
lock.writeLock().unlock();
@@ -525,6 +535,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ updateRebalanceVersion();
+
consistencyCheck();
}
finally {
@@ -732,7 +744,10 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
* @param states Additional partition states.
* @return List of nodes for the partition.
*/
- private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
+ private List<ClusterNode> nodes(int p,
+ AffinityTopologyVersion topVer,
+ GridDhtPartitionState state,
+ GridDhtPartitionState... states) {
Collection<UUID> allIds = topVer.topologyVersion() > 0 ? F.nodeIds(CU.affinityNodes(cctx, topVer)) : null;
lock.readLock().lock();
@@ -831,7 +846,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionFullMap partMap) {
+ GridDhtPartitionFullMap partMap,
+ @Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchId=" + exchId + ", parts=" + fullMapString() + ']');
@@ -911,8 +927,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
part2node = p2n;
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr = cntrMap.get(part.id());
+
+ if (cntr != null)
+ part.continuousQueryUpdateIndex(cntr);
+ }
+ }
+
boolean changed = checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -928,7 +962,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
@Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId,
- GridDhtPartitionMap parts) {
+ GridDhtPartitionMap parts,
+ @Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
@@ -1006,8 +1041,26 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
}
+ if (cntrMap != null) {
+ for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
+ Long cntr = this.cntrMap.get(e.getKey());
+
+ if (cntr == null || cntr < e.getValue())
+ this.cntrMap.put(e.getKey(), e.getValue());
+ }
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr = cntrMap.get(part.id());
+
+ if (cntr != null)
+ part.continuousQueryUpdateIndex(cntr);
+ }
+ }
+
changed |= checkEvictions(updateSeq);
+ updateRebalanceVersion();
+
consistencyCheck();
if (log.isDebugEnabled())
@@ -1204,6 +1257,8 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
if (part.own()) {
updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+ updateRebalanceVersion();
+
consistencyCheck();
return true;
@@ -1254,14 +1309,61 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
+ @Override public Map<Integer, Long> updateCounters() {
+ lock.readLock().lock();
+
+ try {
+ Map<Integer, Long> res = new HashMap<>(cntrMap);
+
+ for (GridDhtLocalPartition part : locParts.values()) {
+ Long cntr0 = res.get(part.id());
+ Long cntr1 = part.continuousQueryUpdateIndex();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ res.put(part.id(), cntr1);
+ }
+
+ return res;
+ }
+ finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
+ return topVer.equals(rebalancedTopVer);
+ }
+
+ /** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
- X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
+ X.println(">>> Cache partition topology stats [grid=" + cctx.gridName() + ", cache=" + cctx.name() + ']');
for (GridDhtLocalPartition part : locParts.values()) {
int size = part.size();
if (size >= threshold)
- X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
+ X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
+ }
+ }
+
+ /**
+ *
+ */
+ private void updateRebalanceVersion() {
+ if (!rebalancedTopVer.equals(topVer)) {
+ for (int i = 0; i < cctx.affinity().partitions(); i++) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer);
+ List<ClusterNode> owners = owners(i);
+
+ if (affNodes.size() != owners.size() || !owners.containsAll(affNodes))
+ return;
+ }
+
+ rebalancedTopVer = topVer;
+
+ if (log.isDebugEnabled())
+ log.debug("Updated rebalanced version [cache=" + cctx.name() + ", ver=" + rebalancedTopVer + ']');
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
new file mode 100644
index 0000000..1e9a848
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Batch acknowledgement.
+ */
+public class CacheContinuousQueryBatchAck extends GridCacheMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Routine ID. */
+ private UUID routineId;
+
+ /** Update indexes. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = Integer.class, valueType = Long.class)
+ private Map<Integer, Long> updateIdxs;
+
+ /**
+ * Default constructor.
+ */
+ public CacheContinuousQueryBatchAck() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param routineId Routine ID.
+ * @param updateIdxs Update indexes.
+ */
+ CacheContinuousQueryBatchAck(int cacheId, UUID routineId, Map<Integer, Long> updateIdxs) {
+ this.cacheId = cacheId;
+ this.routineId = routineId;
+ this.updateIdxs = updateIdxs;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ UUID routineId() {
+ return routineId;
+ }
+
+ /**
+ * @return Update indexes.
+ */
+ Map<Integer, Long> updateIndexes() {
+ return updateIdxs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!super.writeTo(buf, writer))
+ return false;
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 3:
+ if (!writer.writeUuid("routineId", routineId))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ if (!super.readFrom(buf, reader))
+ return false;
+
+ switch (reader.state()) {
+ case 3:
+ routineId = reader.readUuid("routineId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 4:
+ updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 114;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 5;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryBatchAck.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index a4b35eb..9ea9b73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -22,6 +22,7 @@ import javax.cache.event.EventType;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -75,6 +76,17 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
@GridDirectTransient
private GridDeploymentInfo depInfo;
+ /** Partition. */
+ private int part;
+
+ /** Update index. */
+ private long updateIdx;
+
+ /** */
+ @GridToStringInclude
+ @GridDirectTransient
+ private AffinityTopologyVersion topVer;
+
/**
* Required by {@link org.apache.ignite.plugin.extensions.communication.Message}.
*/
@@ -88,18 +100,34 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param part Partition.
+ * @param updateIdx Update index.
+ * @param topVer Topology version if applicable.
*/
CacheContinuousQueryEntry(
int cacheId,
EventType evtType,
KeyCacheObject key,
@Nullable CacheObject newVal,
- @Nullable CacheObject oldVal) {
+ @Nullable CacheObject oldVal,
+ int part,
+ long updateIdx,
+ @Nullable AffinityTopologyVersion topVer) {
this.cacheId = cacheId;
this.evtType = evtType;
this.key = key;
this.newVal = newVal;
this.oldVal = oldVal;
+ this.part = part;
+ this.updateIdx = updateIdx;
+ this.topVer = topVer;
+ }
+
+ /**
+ * @return Topology version if applicable.
+ */
+ @Nullable AffinityTopologyVersion topologyVersion() {
+ return topVer;
}
/**
@@ -117,6 +145,20 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
}
/**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /**
+ * @return Update index.
+ */
+ long updateIndex() {
+ return updateIdx;
+ }
+
+ /**
* @param cctx Cache context.
* @throws IgniteCheckedException In case of error.
*/
@@ -225,6 +267,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
writer.incrementState();
+ case 5:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeLong("updateIdx", updateIdx))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -282,6 +336,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
reader.incrementState();
+ case 5:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ updateIdx = reader.readLong("updateIdx");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(CacheContinuousQueryEntry.class);
@@ -289,7 +359,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 5;
+ return 7;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 1990e18..3253dda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -22,7 +22,12 @@ import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -35,13 +40,17 @@ import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeployment;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager;
import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
+import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
import org.apache.ignite.internal.util.typedef.C1;
@@ -49,7 +58,10 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
@@ -61,6 +73,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** */
private static final long serialVersionUID = 0L;
+ /** */
+ private static final int BACKUP_ACK_THRESHOLD = 100;
+
/** Cache name. */
private String cacheName;
@@ -97,6 +112,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
/** Whether to skip primary check for REPLICATED cache. */
private transient boolean skipPrimaryCheck;
+ /** Backup queue. */
+ private transient Collection<CacheContinuousQueryEntry> backupQueue;
+
+ /** */
+ private transient Map<Integer, Long> rcvCntrs;
+
+ /** */
+ private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
+
+ /** */
+ private transient AcknowledgeBuffer ackBuf;
+
/** */
private transient int cacheId;
@@ -121,6 +148,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
* @param ignoreExpired Ignore expired events flag.
* @param skipPrimaryCheck Whether to skip primary check for REPLICATED cache.
* @param taskHash Task name hash code.
+ * @param locCache {@code True} if local cache.
*/
public CacheContinuousQueryHandler(
String cacheName,
@@ -133,7 +161,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
boolean sync,
boolean ignoreExpired,
int taskHash,
- boolean skipPrimaryCheck) {
+ boolean skipPrimaryCheck,
+ boolean locCache) {
assert topic != null;
assert locLsnr != null;
@@ -149,6 +178,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
this.taskHash = taskHash;
this.skipPrimaryCheck = skipPrimaryCheck;
+ if (locCache)
+ dupEvtFilter = F.alwaysTrue();
+ else {
+ rcvCntrs = new ConcurrentHashMap<>();
+
+ dupEvtFilter = new DuplicateEventFilter();
+ }
+
cacheId = CU.cacheId(cacheName);
}
@@ -185,8 +222,14 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (rmtFilter != null)
ctx.resource().injectGeneric(rmtFilter);
+ backupQueue = new ConcurrentLinkedDeque8<>();
+
+ ackBuf = new AcknowledgeBuffer();
+
final boolean loc = nodeId.equals(ctx.localNodeId());
+ assert !skipPrimaryCheck || loc;
+
CacheContinuousQueryListener<K, V> lsnr = new CacheContinuousQueryListener<K, V>() {
@Override public void onExecution() {
if (ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED)) {
@@ -207,15 +250,16 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
- @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary,
+ @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
+ boolean primary,
boolean recordIgniteEvt) {
if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
return;
GridCacheContext<K, V> cctx = cacheContext(ctx);
- if (cctx.isReplicated() && !skipPrimaryCheck && !primary)
- return;
+ // skipPrimaryCheck is set only when listen locally for replicated cache events.
+ assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
boolean notify = true;
@@ -229,32 +273,36 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
if (notify) {
- if (loc)
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- else {
- try {
- if (ctx.config().isPeerClassLoadingEnabled() && ctx.discovery().node(nodeId) != null) {
- evt.entry().prepareMarshal(cctx);
+ try {
+ final CacheContinuousQueryEntry entry = evt.entry();
- GridCacheDeploymentManager depMgr = cctx.deploy();
+ if (primary || skipPrimaryCheck) {
+ if (loc) {
+ if (dupEvtFilter.apply(entry)) {
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- depMgr.prepare(evt.entry());
+ if (!skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
+ }
}
- else
- evt.entry().prepareMarshal(cctx);
+ else {
+ prepareEntry(cctx, nodeId, entry);
- ctx.continuous().addNotification(nodeId, routineId, evt.entry(), topic, sync, true);
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
+ }
}
- catch (ClusterTopologyCheckedException ex) {
- IgniteLogger log = ctx.log(getClass());
+ else
+ backupQueue.add(entry);
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
- if (log.isDebugEnabled())
- log.debug("Failed to send event notification to node, node left cluster " +
- "[node=" + nodeId + ", err=" + ex + ']');
- }
- catch (IgniteCheckedException ex) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
- }
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
}
if (recordIgniteEvt) {
@@ -285,6 +333,49 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
((PlatformContinuousQueryFilter)rmtFilter).onQueryUnregister();
}
+ @Override public void cleanupBackupQueue(Map<Integer, Long> updateIdxs) {
+ Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
+
+ while (it.hasNext()) {
+ CacheContinuousQueryEntry backupEntry = it.next();
+
+ Long updateIdx = updateIdxs.get(backupEntry.partition());
+
+ if (updateIdx != null && backupEntry.updateIndex() <= updateIdx)
+ it.remove();
+ }
+ }
+
+ @Override public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer) {
+ if (backupQueue.isEmpty())
+ return;
+
+ try {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ for (CacheContinuousQueryEntry e : backupQueue)
+ prepareEntry(cctx, nodeId, e);
+
+ ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic);
+
+ backupQueue.clear();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(ctx.log(getClass()), "Failed to send backup event notification to node: " + nodeId, e);
+ }
+ }
+
+ @Override public void acknowledgeBackupOnTimeout(GridKernalContext ctx) {
+ sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
+ }
+
+ @Override public void onPartitionEvicted(int part) {
+ for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator(); it.hasNext();) {
+ if (it.next().partition() == part)
+ it.remove();
+ }
+ }
+
@Override public boolean oldValueRequired() {
return oldValRequired;
}
@@ -306,6 +397,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return mgr.registerListener(routineId, lsnr, internal);
}
+ /**
+ * @param cctx Context.
+ * @param nodeId ID of the node that started routine.
+ * @param entry Entry.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void prepareEntry(GridCacheContext cctx, UUID nodeId, CacheContinuousQueryEntry entry)
+ throws IgniteCheckedException {
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled() && cctx.discovery().node(nodeId) != null) {
+ entry.prepareMarshal(cctx);
+
+ cctx.deploy().prepare(entry);
+ }
+ else
+ entry.prepareMarshal(cctx);
+ }
+
/** {@inheritDoc} */
@Override public void onListenerRegistered(UUID routineId, GridKernalContext ctx) {
// No-op.
@@ -373,12 +481,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
@Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
return new CacheContinuousQueryEvent<>(cache, cctx, e);
}
- }
+ },
+ dupEvtFilter
);
locLsnr.onUpdated(evts);
}
+ /**
+ * @param e Entry.
+ * @return {@code True} if listener should be notified.
+ */
+ private boolean notifyListener(CacheContinuousQueryEntry e) {
+ Integer part = e.partition();
+
+ Long cntr = rcvCntrs.get(part);
+
+ if (cntr != null) {
+ long cntr0 = cntr;
+
+ if (e.updateIndex() > cntr0) {
+ // TODO IGNITE-426: remove assert.
+ assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
+
+ rcvCntrs.put(part, e.updateIndex());
+ }
+ else
+ return false;
+ }
+ else
+ rcvCntrs.put(part, e.updateIndex());
+
+ return true;
+ }
+
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
@@ -399,6 +535,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public GridContinuousBatch createBatch() {
+ return new GridContinuousBatchAdapter();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBatchAcknowledged(final UUID routineId,
+ GridContinuousBatch batch,
+ final GridKernalContext ctx) {
+ sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
+ }
+
+ /**
+ * @param t Acknowledge information.
+ * @param routineId Routine ID.
+ * @param ctx Context.
+ */
+ private void sendBackupAcknowledge(final IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> t,
+ final UUID routineId,
+ final GridKernalContext ctx) {
+ if (t != null) {
+ ctx.closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ CacheContinuousQueryBatchAck msg = new CacheContinuousQueryBatchAck(cctx.cacheId(),
+ routineId,
+ t.get1());
+
+ Collection<ClusterNode> nodes = new HashSet<>();
+
+ for (AffinityTopologyVersion topVer : t.get2())
+ nodes.addAll(ctx.discovery().cacheNodes(topVer));
+
+ for (ClusterNode node : nodes) {
+ if (!node.id().equals(ctx.localNodeId())) {
+ try {
+ cctx.io().send(node, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send acknowledge message, node left " +
+ "[msg=" + msg + ", node=" + node + ']');
+ }
+ catch (IgniteCheckedException e) {
+ IgniteLogger log = ctx.log(getClass());
+
+ U.error(log, "Failed to send acknowledge message " +
+ "[msg=" + msg + ", node=" + node + ']', e);
+ }
+ }
+ }
+ }
+ });
+ }
+ }
+
+ /** {@inheritDoc} */
@Nullable @Override public Object orderedTopic() {
return topic;
}
@@ -473,6 +668,106 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return ctx.cache().<K, V>context().cacheContext(cacheId);
}
+ /** */
+ private static class AcknowledgeBuffer {
+ /** */
+ private int size;
+
+ /** */
+ @GridToStringInclude
+ private Map<Integer, Long> updateIdxs = new HashMap<>();
+
+ /** */
+ @GridToStringInclude
+ private Set<AffinityTopologyVersion> topVers = U.newHashSet(1);
+
+ /**
+ * @param batch Batch.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(GridContinuousBatch batch) {
+ size += batch.size();
+
+ Collection<CacheContinuousQueryEntry> entries = (Collection)batch.collect();
+
+ for (CacheContinuousQueryEntry e : entries)
+ addEntry(e);
+
+ return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ onAcknowledged(CacheContinuousQueryEntry e) {
+ size++;
+
+ addEntry(e);
+
+ return size >= BACKUP_ACK_THRESHOLD ? acknowledgeData() : null;
+ }
+
+ /**
+ * @param e Entry.
+ */
+ private void addEntry(CacheContinuousQueryEntry e) {
+ topVers.add(e.topologyVersion());
+
+ Long cntr0 = updateIdxs.get(e.partition());
+
+ if (cntr0 == null || e.updateIndex() > cntr0)
+ updateIdxs.put(e.partition(), e.updateIndex());
+ }
+
+ /**
+ * @return Non-null tuple if acknowledge should be sent to backups.
+ */
+ @Nullable synchronized IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>>
+ acknowledgeOnTimeout() {
+ return size > 0 ? acknowledgeData() : null;
+ }
+
+ /**
+ * @return Tuple with acknowledge information.
+ */
+ private IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> acknowledgeData() {
+ assert size > 0;
+
+ Map<Integer, Long> idxs = new HashMap<>(updateIdxs);
+
+ IgniteBiTuple<Map<Integer, Long>, Set<AffinityTopologyVersion>> res =
+ new IgniteBiTuple<>(idxs, topVers);
+
+ topVers = U.newHashSet(1);
+
+ size = 0;
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AcknowledgeBuffer.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class DuplicateEventFilter implements IgnitePredicate<CacheContinuousQueryEntry> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(CacheContinuousQueryEntry e) {
+ return notifyListener(e);
+ }
+ }
+
/**
* Deployable object.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index a3c19a9..2f9e111 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -17,6 +17,11 @@
package org.apache.ignite.internal.processors.cache.query.continuous;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.affinity.*;
+
+import java.util.*;
+
/**
* Continuous query listener.
*/
@@ -33,7 +38,9 @@ interface CacheContinuousQueryListener<K, V> {
* @param primary Primary flag.
* @param recordIgniteEvt Whether to record event.
*/
- public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt);
+ public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
+ boolean primary,
+ boolean recordIgniteEvt);
/**
* Listener unregistered callback.
@@ -41,6 +48,31 @@ interface CacheContinuousQueryListener<K, V> {
public void onUnregister();
/**
+ * Cleans backup queue.
+ *
+ * @param updateIdxs Update indexes map.
+ */
+ public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
+
+ /**
+ * Flushes backup queue.
+ *
+ * @param ctx Context.
+ * @param topVer Topology version.
+ */
+ public void flushBackupQueue(GridKernalContext ctx, AffinityTopologyVersion topVer);
+
+ /**
+ * @param ctx Context.
+ */
+ public void acknowledgeBackupOnTimeout(GridKernalContext ctx);
+
+ /**
+ * @param part Partition.
+ */
+ public void onPartitionEvicted(int part);
+
+ /**
* @return Whether old value is required.
*/
public boolean oldValueRequired();
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 6a151a5..c9fb656 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
@@ -82,6 +83,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** */
private static final byte EXPIRED_FLAG = 0b1000;
+ /** */
+ private static final long BACKUP_ACK_FREQ = 5000;
+
/** Listeners. */
private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
@@ -108,6 +112,26 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
@Override protected void start0() throws IgniteCheckedException {
// Append cache name to the topic.
topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
+
+ cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+ new CI2<UUID, CacheContinuousQueryBatchAck>() {
+ @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+ CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+ if (lsnr != null)
+ lsnr.cleanupBackupQueue(msg.updateIndexes());
+ }
+ });
+
+ cctx.time().schedule(new Runnable() {
+ @Override public void run() {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
+ }
+ }, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
}
/** {@inheritDoc} */
@@ -141,18 +165,25 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param primary {@code True} if called on primary node.
* @param preload Whether update happened during preloading.
+ * @param updateIdx Update index.
+ * @param topVer Topology version.
* @throws IgniteCheckedException In case of error.
*/
public void onEntryUpdated(GridCacheEntryEx e,
KeyCacheObject key,
CacheObject newVal,
CacheObject oldVal,
- boolean preload)
+ boolean primary,
+ boolean preload,
+ long updateIdx,
+ AffinityTopologyVersion topVer)
throws IgniteCheckedException
{
assert e != null;
assert key != null;
+ assert Thread.holdsLock(e) : e;
boolean internal = e.isInternal() || !e.context().userCache();
@@ -179,8 +210,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean initialized = false;
- boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
- boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+ boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
if (preload && !lsnr.notifyExisting())
@@ -205,7 +235,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
evtType,
key,
newVal,
- lsnr.oldValueRequired() ? oldVal : null);
+ lsnr.oldValueRequired() ? oldVal : null,
+ e.partition(),
+ updateIdx,
+ topVer);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -224,6 +257,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
throws IgniteCheckedException {
assert e != null;
assert key != null;
+ assert Thread.holdsLock(e) : e;
if (e.isInternal())
return;
@@ -255,7 +289,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
EXPIRED,
key,
null,
- lsnr.oldValueRequired() ? oldVal : null);
+ lsnr.oldValueRequired() ? oldVal : null,
+ e.partition(),
+ 0,
+ null);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -373,6 +410,30 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param topVer Topology version.
+ */
+ public void beforeExchange(AffinityTopologyVersion topVer) {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+ }
+
+ /**
+ * Partition evicted callback.
+ *
+ * @param part Partition number.
+ */
+ public void onPartitionEvicted(int part) {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.onPartitionEvicted(part);
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.onPartitionEvicted(part);
+ }
+
+ /**
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
* @param bufSize Buffer size.
@@ -417,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
sync,
ignoreExpired,
taskNameHash,
- skipPrimaryCheck);
+ skipPrimaryCheck,
+ cctx.isLocal());
IgnitePredicate<ClusterNode> pred = null;
@@ -471,10 +533,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
GridCacheEntryEx e = it.next();
+ CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ CREATED,
+ e.key(),
+ e.rawGet(),
+ null,
+ 0,
+ 0,
+ null);
+
next = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()),
- cctx,
- new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
+ cctx, entry);
if (rmtFilter != null && !rmtFilter.evaluate(next))
next = null;
@@ -639,6 +710,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param impl Listener.
+ * @param log Logger.
*/
JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
assert impl != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
new file mode 100644
index 0000000..2fef161
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.processors.continuous;
+
+/**
+ * Created by Nikolay on 02.09.2015.
+ */
+public interface GridContinuousBatch {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
new file mode 100644
index 0000000..8e29e29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.processors.continuous;
+
+/**
+ * Created by Nikolay on 02.09.2015.
+ */
+public class GridContinuousBatchAdapter {
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/10ab67ad/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d1cb3a9..15c9dd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -651,6 +651,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId ID of the node that started routine.
* @param routineId Routine ID.
+ * @param objs Notification objects.
+ * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void addBackupNotification(UUID nodeId,
+ final UUID routineId,
+ Collection<?> objs,
+ @Nullable Object orderedTopic)
+ throws IgniteCheckedException {
+ if (processorStopped)
+ return;
+
+ final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+ if (info != null) {
+ final GridContinuousBatch batch = info.addAll(objs);
+
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null);
+ }
+ }
+
+ /**
+ * @param nodeId ID of the node that started routine.
+ * @param routineId Routine ID.
* @param obj Notification object.
* @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
* @param sync If {@code true} then waits for event acknowledgment.
@@ -658,8 +682,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void addNotification(UUID nodeId,
- UUID routineId,
- @Nullable Object obj,
+ final UUID routineId,
+ Object obj,
@Nullable Object orderedTopic,
boolean sync,
boolean msg)
@@ -673,7 +697,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (processorStopped)
return;
- RemoteRoutineInfo info = rmtInfos.get(routineId);
+ final RemoteRoutineInfo info = rmtInfos.get(routineId);
if (info != null) {
assert info.interval == 0 || !sync;
@@ -686,7 +710,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
syncMsgFuts.put(futId, fut);
try {
- sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
+ sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
}
catch (IgniteCheckedException e) {
syncMsgFuts.remove(futId);
@@ -697,10 +721,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
fut.get();
}
else {
- Collection<Object> toSnd = info.add(obj);
+ final GridContinuousBatch batch = info.add(obj);
+
+ if (batch != null) {
+ CI1<IgniteException> ackC = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ if (e == null)
+ info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+ }
+ };
- if (toSnd != null)
- sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC);
+ }
}
}
}
@@ -725,6 +757,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
* @param msg If {@code true} then sent data is collection of messages.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
private void sendNotification(UUID nodeId,
@@ -732,7 +765,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Nullable IgniteUuid futId,
Collection<Object> toSnd,
@Nullable Object orderedTopic,
- boolean msg) throws IgniteCheckedException {
+ boolean msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
assert toSnd != null;
@@ -740,7 +774,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
sendWithRetries(nodeId,
new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
- orderedTopic);
+ orderedTopic,
+ ackC);
}
/**
@@ -859,6 +894,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
sendWithRetries(nodeId,
new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
+ null,
null);
}
catch (IgniteCheckedException e) {
@@ -922,15 +958,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
break;
}
- IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval();
+ IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval();
- Collection<Object> toSnd = t.get1();
+ final GridContinuousBatch batch = t.get1();
- if (toSnd != null && !toSnd.isEmpty()) {
+ if (batch != null && batch.size() > 0) {
try {
+ Collection<Object> toSnd = batch.collect();
+
boolean msg = toSnd.iterator().next() instanceof Message;
- sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
+ CI1<IgniteException> ackC = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ if (e == null)
+ info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+ }
+ };
+
+ sendNotification(nodeId,
+ routineId,
+ null,
+ toSnd,
+ hnd.orderedTopic(),
+ msg,
+ ackC);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1013,9 +1064,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
- private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic)
+ private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic,
+ IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
assert nodeId != null;
assert msg != null;
@@ -1023,7 +1076,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null)
- sendWithRetries(node, msg, orderedTopic);
+ sendWithRetries(node, msg, orderedTopic, ackC);
else
throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId);
}
@@ -1033,14 +1086,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
- private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic)
- throws IgniteCheckedException {
+ private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert node != null;
assert msg != null;
- sendWithRetries(F.asList(node), msg, orderedTopic);
+ sendWithRetries(F.asList(node), msg, orderedTopic, ackC);
}
/**
@@ -1048,10 +1102,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg,
- @Nullable Object orderedTopic) throws IgniteCheckedException {
+ @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert !F.isEmpty(nodes);
assert msg != null;
@@ -1074,10 +1129,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
msg,
SYSTEM_POOL,
0,
- true);
+ true,
+ ackC);
}
else
- ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+ ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
break;
}
@@ -1178,8 +1234,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Lock. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- /** Buffer. */
- private ConcurrentLinkedDeque8<Object> buf;
+ /** Batch. */
+ private GridContinuousBatch batch;
/** Last send time. */
private long lastSndTime = U.currentTimeMillis();
@@ -1210,7 +1266,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.interval = interval;
this.autoUnsubscribe = autoUnsubscribe;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
}
/**
@@ -1238,21 +1294,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param objs Objects to add.
+ * @return Batch to send.
+ */
+ GridContinuousBatch addAll(Collection<?> objs) {
+ assert objs != null;
+ assert objs.size() > 0;
+
+ GridContinuousBatch toSnd = null;
+
+ lock.writeLock().lock();
+
+ try {
+ for (Object obj : objs)
+ batch.add(obj);
+
+ toSnd = batch;
+
+ batch = hnd.createBatch();
+
+ if (interval > 0)
+ lastSndTime = U.currentTimeMillis();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return toSnd;
+ }
+
+ /**
* @param obj Object to add.
- * @return Object to send or {@code null} if there is nothing to send for now.
+ * @return Batch to send or {@code null} if there is nothing to send for now.
*/
- @Nullable Collection<Object> add(@Nullable Object obj) {
- ConcurrentLinkedDeque8 buf0 = null;
+ @Nullable GridContinuousBatch add(Object obj) {
+ assert obj != null;
- if (buf.sizex() >= bufSize - 1) {
+ GridContinuousBatch toSnd = null;
+
+ if (batch.size() >= bufSize - 1) {
lock.writeLock().lock();
try {
- buf.add(obj);
+ batch.add(obj);
- buf0 = buf;
+ toSnd = batch;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
if (interval > 0)
lastSndTime = U.currentTimeMillis();
@@ -1265,34 +1353,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
lock.readLock().lock();
try {
- buf.add(obj);
+ batch.add(obj);
}
finally {
lock.readLock().unlock();
}
}
- Collection<Object> toSnd = null;
-
- if (buf0 != null) {
- toSnd = new ArrayList<>(buf0.sizex());
-
- for (Object o : buf0)
- toSnd.add(o);
- }
-
return toSnd;
}
/**
- * @return Tuple with objects to sleep (or {@code null} if there is nothing to
+ * @return Tuple with batch to send (or {@code null} if there is nothing to
* send for now) and time interval after next check is needed.
*/
@SuppressWarnings("TooBroadScope")
- IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+ IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
assert interval > 0;
- Collection<Object> toSnd = null;
+ GridContinuousBatch toSnd = null;
long diff;
long now = U.currentTimeMillis();
@@ -1302,10 +1381,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
diff = now - lastSndTime;
- if (diff >= interval && !buf.isEmpty()) {
- toSnd = buf;
+ if (diff >= interval && batch.size() > 0) {
+ toSnd = batch;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
lastSndTime = now;
}