You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by nt...@apache.org on 2015/10/22 12:34:29 UTC
[3/9] ignite git commit: IGNITE-426 WIP
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 3253dda..ca3579e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -21,13 +21,19 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -53,6 +59,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch;
import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -119,6 +126,9 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
private transient Map<Integer, Long> rcvCntrs;
/** */
+ private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
+
+ /** */
private transient IgnitePredicate<CacheContinuousQueryEntry> dupEvtFilter;
/** */
@@ -183,6 +193,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
else {
rcvCntrs = new ConcurrentHashMap<>();
+ rcvs = new ConcurrentHashMap<>();
+
dupEvtFilter = new DuplicateEventFilter();
}
@@ -258,6 +270,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
GridCacheContext<K, V> cctx = cacheContext(ctx);
+ // Check that cache stopped.
+ if (cctx == null)
+ return;
+
// skipPrimaryCheck is set only when listen locally for replicated cache events.
assert !skipPrimaryCheck || (cctx.isReplicated() && ctx.localNodeId().equals(nodeId));
@@ -272,27 +288,78 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
}
- if (notify) {
- try {
- final CacheContinuousQueryEntry entry = evt.entry();
+ try {
+ final CacheContinuousQueryEntry entry = notify ? evt.entry() :
+ new CacheContinuousQueryFilteredEntry(evt.entry());
- if (primary || skipPrimaryCheck) {
- if (loc) {
- if (dupEvtFilter.apply(entry)) {
- locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
+ if (primary || skipPrimaryCheck) {
+ if (loc) {
+ if (dupEvtFilter.apply(entry)) {
+ locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt));
- if (!skipPrimaryCheck)
- sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
- }
+ if (!skipPrimaryCheck)
+ sendBackupAcknowledge(ackBuf.onAcknowledged(entry), routineId, ctx);
}
- else {
- prepareEntry(cctx, nodeId, entry);
+ }
+ else {
+ prepareEntry(cctx, nodeId, entry);
- ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
- }
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
- else
- backupQueue.add(entry);
+ }
+ else
+ backupQueue.add(entry);
+ }
+ catch (ClusterTopologyCheckedException ex) {
+ IgniteLogger log = ctx.log(getClass());
+
+ if (log.isDebugEnabled())
+ log.debug("Failed to send event notification to node, node left cluster " +
+ "[node=" + nodeId + ", err=" + ex + ']');
+ }
+ catch (IgniteCheckedException ex) {
+ U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
+ }
+
+ if (recordIgniteEvt && notify) {
+ ctx.event().record(new CacheQueryReadEvent<>(
+ ctx.discovery().localNode(),
+ "Continuous query executed.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.CONTINUOUS.name(),
+ cacheName,
+ null,
+ null,
+ null,
+ rmtFilter,
+ null,
+ nodeId,
+ taskName(),
+ evt.getKey(),
+ evt.getValue(),
+ evt.getOldValue(),
+ null
+ ));
+ }
+ }
+
+ @Override public void partitionLost(String cacheName0, int partId) {
+ GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+ // Check that cache stopped.
+ if (cctx == null)
+ return;
+
+ if ((cacheName == null && cacheName0 == null) || // Check default cache.
+ (cacheName0 != null && cacheName != null && cacheName0.equals(cacheName))) {
+
+ final CacheContinuousQueryEntry entry =
+ new CacheContinuousQueryLostPartition(cctx.cacheId(), partId);
+
+ try {
+ prepareEntry(cctx, nodeId, entry);
+
+ ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true);
}
catch (ClusterTopologyCheckedException ex) {
IgniteLogger log = ctx.log(getClass());
@@ -304,27 +371,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
catch (IgniteCheckedException ex) {
U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, ex);
}
-
- if (recordIgniteEvt) {
- ctx.event().record(new CacheQueryReadEvent<>(
- ctx.discovery().localNode(),
- "Continuous query executed.",
- EVT_CACHE_QUERY_OBJECT_READ,
- CacheQueryType.CONTINUOUS.name(),
- cacheName,
- null,
- null,
- null,
- rmtFilter,
- null,
- nodeId,
- taskName(),
- evt.getKey(),
- evt.getValue(),
- evt.getOldValue(),
- null
- ));
- }
}
}
@@ -476,13 +522,47 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
- Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries,
+ Map<Integer, PartitionRecovery> parts = new HashMap<>();
+
+ for (CacheContinuousQueryEntry e : entries) {
+ PartitionRecovery rec = parts.containsKey(e.partition()) ?
+ parts.get(e.partition()) : rcvs.get(e.partition());
+
+ if (rec == null) {
+ rec = new PartitionRecovery(ctx.log(getClass()));
+
+ PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec);
+
+ if (oldRec != null)
+ rec = oldRec;
+ }
+
+ if (e instanceof CacheContinuousQueryLostPartition)
+ rec.reset();
+ else {
+ rec.add(e);
+
+ if (!parts.containsKey(e.partition()))
+ parts.put(e.partition(), rec);
+ }
+ }
+
+ Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+
+ for (PartitionRecovery rec : parts.values())
+ entries0.addAll(rec.entries());
+
+ Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() {
@Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry e) {
return new CacheContinuousQueryEvent<>(cache, cctx, e);
}
},
- dupEvtFilter
+ new IgnitePredicate<CacheContinuousQueryEntry>() {
+ @Override public boolean apply(CacheContinuousQueryEntry entry) {
+ return !entry.filtered();
+ }
+ }
);
locLsnr.onUpdated(evts);
@@ -500,12 +580,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
if (cntr != null) {
long cntr0 = cntr;
- if (e.updateIndex() > cntr0) {
- // TODO IGNITE-426: remove assert.
- assert e.updateIndex() == cntr0 + 1 : "Invalid entry [cntr=" + cntr + ", e=" + e + ']';
-
+ if (e.updateIndex() > cntr0)
rcvCntrs.put(part, e.updateIndex());
- }
else
return false;
}
@@ -515,6 +591,119 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
return true;
}
+ /**
+ *
+ */
+ private static class PartitionRecovery {
+ /** */
+ private IgniteLogger log;
+
+ /** */
+ private long lastFiredEvt = 0;
+
+ /** */
+ private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>();
+
+ /**
+ * @param log Logger.
+ */
+ public PartitionRecovery(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /**
+ * Add continuous entry.
+ *
+ * @param e Cache continuous qeury entry.
+ */
+ public void add(CacheContinuousQueryEntry e) {
+ synchronized (pendingEnts) {
+ if (pendingEnts.containsKey(e.updateIndex()) || e.updateIndex() <= lastFiredEvt)
+ e.cacheId();
+ //log.info("Skip duplicate continuous query entry. Entry: " + e);
+ else {
+ //log.info("Added continuous query entry. Entry: " + e);
+
+ pendingEnts.put(e.updateIndex(), e);
+ }
+ }
+ }
+
+ /**
+ * @return Ordered continuous query entries.
+ */
+ public Collection<CacheContinuousQueryEntry> entries() {
+ List<CacheContinuousQueryEntry> entries = new ArrayList<>();
+
+ synchronized (pendingEnts) {
+ if (pendingEnts.isEmpty())
+ return Collections.emptyList();
+
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+
+ Map.Entry<Long, CacheContinuousQueryEntry> prev = null;
+
+ Set<Long> rmvEnts = new HashSet<>();
+
+ while (iter.hasNext()) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ // The elements are consistently.
+ if (e.getKey() == lastFiredEvt + 1) {
+ ++lastFiredEvt;
+
+ entries.add(e.getValue());
+
+ iter.remove();
+ }
+ // Handle hole in sequence.
+ else if (prev != null && prev.getKey() + 1 == e.getKey()) {
+ entries.add(prev.getValue());
+
+ lastFiredEvt = prev.getKey();
+
+ rmvEnts.add(prev.getKey());
+
+ if (!iter.hasNext()) {
+ entries.add(e.getValue());
+
+ lastFiredEvt = e.getKey();
+
+ rmvEnts.add(e.getKey());
+ }
+ }
+ else if (prev != null)
+ break;
+
+ prev = e;
+ }
+
+ for (Long rmKey : rmvEnts)
+ pendingEnts.remove(rmKey);
+ }
+
+ return entries;
+ }
+
+ /**
+ * Reset internal state.
+ */
+ public void reset() {
+ synchronized (pendingEnts) {
+ Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator();
+
+ while (iter.hasNext()) {
+ Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
+
+ if (e.getKey() >= lastFiredEvt)
+ iter.remove();
+ }
+
+ lastFiredEvt = 0;
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException {
assert ctx != null;
@@ -546,6 +735,23 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
sendBackupAcknowledge(ackBuf.onAcknowledged(batch), routineId, ctx);
}
+ /** {@inheritDoc} */
+ @Override public void partitionLost(String cacheName, int partId) {
+ if (this.cacheName == null) {
+ int z = 0;
+
+ ++z;
+ }
+
+ if ((this.cacheName == null && cacheName == null) // Check default caches.
+ || (cacheName != null && this.cacheName != null && cacheName.equals(this.cacheName))) {
+ PartitionRecovery rcv = rcvs.get(partId);
+
+ if (rcv != null)
+ rcv.reset();
+ }
+ }
+
/**
* @param t Acknowledge information.
* @param routineId Routine ID.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 2f9e111..735e808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -55,6 +55,14 @@ interface CacheContinuousQueryListener<K, V> {
public void cleanupBackupQueue(Map<Integer, Long> updateIdxs);
/**
+ * Fire event that partition lost.
+ *
+ * @param cacheName Cache name.
+ * @param partId Partition ID.
+ */
+ public void partitionLost(String cacheName, int partId);
+
+ /**
* Flushes backup queue.
*
* @param ctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
new file mode 100644
index 0000000..734d072
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartition.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.nio.ByteBuffer;
+import javax.cache.event.EventType;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Continuous query entry.
+ */
+public class CacheContinuousQueryLostPartition extends CacheContinuousQueryEntry {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cache name. */
+ private int cacheId;
+
+ /** Partition. */
+ private int part;
+
+ /**
+ * Required by {@link Message}.
+ */
+ public CacheContinuousQueryLostPartition() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param part Partition ID.
+ */
+ CacheContinuousQueryLostPartition(int cacheId, int part) {
+ this.cacheId = cacheId;
+ this.part = part;
+ }
+
+ /**
+ * @return Cache ID.
+ */
+ int cacheId() {
+ return cacheId;
+ }
+
+ /**
+ * @return Partition.
+ */
+ int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 116;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("cacheId", cacheId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("part", part))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cacheId = reader.readInt("cacheId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ part = reader.readInt("part");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(CacheContinuousQueryLostPartition.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryLostPartition.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c9fb656..35c6696 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -46,6 +46,10 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.events.CacheRebalancingEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -65,6 +69,7 @@ import static javax.cache.event.EventType.EXPIRED;
import static javax.cache.event.EventType.REMOVED;
import static javax.cache.event.EventType.UPDATED;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
/**
@@ -132,6 +137,20 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
lsnr.acknowledgeBackupOnTimeout(cctx.kernalContext());
}
}, BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
+
+ cctx.kernalContext().event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ assert evt instanceof CacheRebalancingEvent;
+
+ CacheRebalancingEvent evt0 = (CacheRebalancingEvent)evt;
+
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.partitionLost(evt0.cacheName(), evt0.partition());
+ }
+ }, EVT_CACHE_REBALANCE_PART_DATA_LOST);
}
/** {@inheritDoc} */
@@ -666,7 +685,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create();
if (!(fltr instanceof Serializable))
- throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr);
+ throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
+ + fltr);
}
CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
index 2fef161..67b8c82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -1,7 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.continuous;
+import java.util.Collection;
+
/**
- * Created by Nikolay on 02.09.2015.
+ * Continuous routine batch.
*/
public interface GridContinuousBatch {
+ /**
+ * Adds element to this batch.
+ *
+ * @param obj Element to add.
+ */
+ public void add(Object obj);
+
+ /**
+ * Collects elements that are currently in this batch.
+ *
+ * @return Elements in this batch.
+ */
+ public Collection<Object> collect();
+
+ /**
+ * @return Current batch size.
+ */
+ public int size();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
index 8e29e29..4540de1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -1,7 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.ignite.internal.processors.continuous;
+import java.util.Collection;
+import org.jsr166.ConcurrentLinkedDeque8;
+
/**
- * Created by Nikolay on 02.09.2015.
+ * Continuous routine batch adapter.
*/
-public class GridContinuousBatchAdapter {
+public class GridContinuousBatchAdapter implements GridContinuousBatch {
+ /** Buffer. */
+ private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+
+ /** {@inheritDoc} */
+ @Override public void add(Object obj) {
+ assert obj != null;
+
+ buf.add(obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Object> collect() {
+ return buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return buf.sizex();
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 30e596a..975cd2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -98,6 +98,28 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException;
/**
+ * Creates new batch.
+ *
+ * @return New batch.
+ */
+ public GridContinuousBatch createBatch();
+
+ /**
+ * Called when ack for a batch is received from client.
+ *
+ * @param routineId Routine ID.
+ * @param batch Acknowledged batch.
+ * @param ctx Kernal context.
+ */
+ public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
+
+ /**
+ * @param cacheName Cache name.
+ * @param partId Partition ID.
+ */
+ public void partitionLost(String cacheName, int partId);
+
+ /**
* @return Topic for ordered notifications. If {@code null}, notifications
* will be sent in non-ordered messages.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 15c9dd2..c7676d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,7 +35,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
@@ -57,12 +59,14 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
@@ -72,6 +76,7 @@ import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import org.jsr166.ConcurrentLinkedDeque8;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index 1fef4d5..26d1f91 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -507,7 +507,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean conflictResolve,
boolean intercept,
UUID subjId,
- String taskName) throws IgniteCheckedException,
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateIdx) throws IgniteCheckedException,
GridCacheEntryRemovedException {
assert false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index ed856a5..3bba5e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -25,23 +25,28 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.query.ContinuousQuery;
@@ -59,12 +64,17 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.typedef.C1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -254,6 +264,302 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
/**
* @throws Exception If failed.
*/
+ public void testLeftPrimaryAndBackupNodes() throws Exception {
+ this.backups = 1;
+
+ final int SRV_NODES = 3;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ final Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final TestLocalListener lsnr = new TestLocalListener();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(lsnr);
+
+ IgniteCache<Object, Object> clnCache = qryClient.cache(null);
+
+ QueryCursor<Cache.Entry<Object, Object>> query = clnCache.query(qry);
+
+ Ignite igniteSrv = ignite(0);
+
+ IgniteCache<Object, Object> srvCache = igniteSrv.cache(null);
+
+ Affinity<Object> aff = affinity(srvCache);
+
+ List<Integer> keys = testKeys(srvCache, 1);
+
+ Collection<ClusterNode> nodes = aff.mapPartitionToPrimaryAndBackups(keys.get(0));
+
+ Collection<UUID> ids = F.transform(nodes, new C1<ClusterNode, UUID>() {
+ @Override public UUID apply(ClusterNode node) {
+ return node.id();
+ }
+ });
+
+ int keyIter = 0;
+
+ boolean filtered = false;
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ for (; keyIter < keys.size() / 2; keyIter++) {
+ int key = keys.get(keyIter);
+
+ log.info("Put [key=" + key + ", part=" + aff.partition(key)
+ + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ srvCache.put(key, val);
+
+ filtered = !filtered;
+ }
+
+ checkEvents(expEvts, lsnr);
+
+ List<Thread> stopThreads = new ArrayList<>(3);
+
+ // Stop nodes which owning this partition.
+ for (int i = 0; i < SRV_NODES; i++) {
+ Ignite ignite = ignite(i);
+
+ if (ids.contains(ignite.cluster().localNode().id())) {
+ final int i0 = i;
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite.configuration().getCommunicationSpi();
+
+ spi.skipAllMsg = true;
+
+ stopThreads.add(new Thread() {
+ @Override public void run() {
+ stopGrid(i0, true);
+ }
+ });
+ }
+ }
+
+ // Stop and join threads.
+ for (Thread t : stopThreads)
+ t.start();
+
+ for (Thread t : stopThreads)
+ t.join();
+
+ assert GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ // (SRV_NODES + 1 client node) - 1 primary - backup nodes.
+ return qryClient.cluster().nodes().size() == (SRV_NODES + 1 /** client node */)
+ - 1 /** Primary node */ - backups;
+ }
+ }, 10000L);
+
+ for (; keyIter < keys.size(); keyIter++) {
+ int key = keys.get(keyIter);
+
+ log.info("Put [key=" + key + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ clnCache.put(key, val);
+
+ filtered = !filtered;
+ }
+
+ checkEvents(expEvts, lsnr);
+
+ query.close();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoteFilter() throws Exception {
+ this.backups = 2;
+
+ final int SRV_NODES = 4;
+
+ startGridsMultiThreaded(SRV_NODES);
+
+ client = true;
+
+ Ignite qryClient = startGrid(SRV_NODES);
+
+ client = false;
+
+ IgniteCache<Object, Object> qryClientCache = qryClient.cache(null);
+
+ if (cacheMode() != REPLICATED)
+ assertEquals(backups, qryClientCache.getConfiguration(CacheConfiguration.class).getBackups());
+
+ Affinity<Object> aff = qryClient.affinity(null);
+
+ ContinuousQuery<Object, Object> qry = new ContinuousQuery<>();
+
+ final TestLocalListener lsnr = new TestLocalListener();
+
+ qry.setLocalListener(lsnr);
+
+ qry.setRemoteFilter(lsnr);
+
+ int PARTS = 10;
+
+ QueryCursor<?> cur = qryClientCache.query(qry);
+
+ Map<Object, T2<Object, Object>> updates = new HashMap<>();
+
+ final List<T3<Object, Object, Object>> expEvts = new ArrayList<>();
+
+ for (int i = 0; i < SRV_NODES - 1; i++) {
+ log.info("Stop iteration: " + i);
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
+
+ Ignite ignite = ignite(i);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null);
+
+ List<Integer> keys = testKeys(cache, PARTS);
+
+ boolean first = true;
+
+ boolean filtered = false;
+
+ for (Integer key : keys) {
+ log.info("Put [node=" + ignite.name() + ", key=" + key + ", part=" + aff.partition(key)
+ + ", filtered=" + filtered + ']');
+
+ T2<Object, Object> t = updates.get(key);
+
+ Integer val = filtered ?
+ (key % 2 == 0 ? key + 1 : key) :
+ key * 2;
+
+ if (t == null) {
+ updates.put(key, new T2<>((Object)val, null));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, null));
+ }
+ else {
+ updates.put(key, new T2<>((Object)val, (Object)key));
+
+ if (!filtered)
+ expEvts.add(new T3<>((Object)key, (Object)val, (Object)key));
+ }
+
+ cache.put(key, val);
+
+ if (first) {
+ spi.skipMsg = true;
+
+ first = false;
+ }
+
+ filtered = !filtered;
+ }
+
+ stopGrid(i);
+
+ boolean check = GridTestUtils.waitForCondition(new PAX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ return expEvts.size() == lsnr.keys.size();
+ }
+ }, 5000L);
+
+ if (!check) {
+ Set<Integer> keys0 = new HashSet<>(keys);
+
+ keys0.removeAll(lsnr.keys);
+
+ log.info("Missed events for keys: " + keys0);
+
+ fail("Failed to wait for notifications [exp=" + keys.size() + ", left=" + keys0.size() + ']');
+ }
+
+ checkEvents(expEvts, lsnr);
+ }
+
+ cur.close();
+ }
+
+ /**
+ *
+ */
+ public static class TestLocalListener implements CacheEntryUpdatedListener<Object, Object>,
+ CacheEntryEventSerializableFilter<Object, Object> {
+ /** Keys. */
+ GridConcurrentHashSet<Integer> keys = new GridConcurrentHashSet<>();
+
+ /** Events. */
+ private final ConcurrentHashMap<Object, CacheEntryEvent<?, ?>> evts = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> events) throws CacheEntryListenerException {
+ for (CacheEntryEvent<?, ?> e : events) {
+ System.err.println("Update entry: " + e);
+
+ Integer key = (Integer)e.getKey();
+
+ keys.add(key);
+
+ evts.put(key, e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> e) throws CacheEntryListenerException {
+ return (Integer)e.getValue() % 2 == 0;
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testThreeBackups() throws Exception {
if (cacheMode() == REPLICATED)
return;
@@ -261,6 +567,11 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
checkBackupQueue(3, false);
}
+ /** {@inheritDoc} */
+ @Override public boolean isDebug() {
+ return true;
+ }
+
/**
* @param backups Number of backups.
* @param updateFromClient If {@code true} executes cache update from client node.
@@ -423,7 +734,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
assertNotNull("No event for key: " + exp.get1(), e);
assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
- assertEquals("Unexpected old value: " + e, exp.get3(), e.getOldValue());
}
expEvts.clear();
@@ -432,6 +742,26 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
}
/**
+ * @param expEvts Expected events.
+ * @param lsnr Listener.
+ */
+ private void checkEvents(List<T3<Object, Object, Object>> expEvts, TestLocalListener lsnr) {
+ assert lsnr.evts.size() == expEvts.size();
+
+ for (T3<Object, Object, Object> exp : expEvts) {
+ CacheEntryEvent<?, ?> e = lsnr.evts.get(exp.get1());
+
+ assertNotNull("No event for key: " + exp.get1(), e);
+ assertEquals("Unexpected value: " + e, exp.get2(), e.getValue());
+ }
+
+ expEvts.clear();
+
+ lsnr.evts.clear();
+ lsnr.keys.clear();
+ }
+
+ /**
* @param cache Cache.
* @param parts Number of partitions.
* @return Keys.
@@ -447,7 +777,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
int[] nodeParts = aff.primaryPartitions(node);
- final int KEYS_PER_PART = 3;
+ final int KEYS_PER_PART = 50;
for (int i = 0; i < parts; i++) {
int part = nodeParts[i];
@@ -919,7 +1249,6 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
assertEquals(key, rcvdEvt.getKey());
assertEquals(expEvt.get1(), rcvdEvt.getValue());
- assertEquals(expEvt.get2(), rcvdEvt.getOldValue());
}
}
}
@@ -1012,7 +1341,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
private final ConcurrentHashMap<Integer, List<CacheEntryEvent<?, ?>>> evts = new ConcurrentHashMap<>();
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
+ @Override public synchronized void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts)
throws CacheEntryListenerException {
try {
for (CacheEntryEvent<?, ?> evt : evts) {
@@ -1026,18 +1355,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
boolean dup = false;
- if (prevVal != null) {
- if (prevVal.equals(val)) // Can get this event with automatic put retry.
- dup = true;
- else {
- assertEquals("Unexpected event: " + evt, (Integer)(prevVal + 1), val);
- assertEquals("Unexpected event: " + evt, prevVal, evt.getOldValue());
- }
- }
- else {
- assertEquals("Unexpected event: " + evt, (Object)0, val);
- assertNull("Unexpected event: " + evt, evt.getOldValue());
- }
+ if (prevVal != null && prevVal.equals(val))
+ dup = true;
if (!dup) {
vals.put(key, val);
@@ -1074,6 +1393,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
private volatile boolean skipMsg;
/** */
+ private volatile boolean skipAllMsg;
+
+ /** */
private volatile AtomicBoolean sndFirstOnly;
/** {@inheritDoc} */
@@ -1081,6 +1403,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo
throws IgniteSpiException {
Object msg0 = ((GridIoMessage)msg).message();
+ if (skipAllMsg)
+ return;
+
if (msg0 instanceof GridContinuousMessage) {
if (skipMsg) {
log.info("Skip continuous message: " + msg0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1a05948f/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d133a84..503b992 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1232,7 +1232,7 @@ public abstract class GridAbstractTest extends TestCase {
if (isDebug()) {
discoSpi.setMaxMissedHeartbeats(Integer.MAX_VALUE);
- cfg.setNetworkTimeout(Long.MAX_VALUE);
+ cfg.setNetworkTimeout(Long.MAX_VALUE / 3);
}
else {
// Set network timeout to 10 sec to avoid unexpected p2p class loading errors.