You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/11/20 02:49:28 UTC
[18/22] ignite git commit: IGNITE-426 Implemented failover for
Continuous query.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index c7bf091..b2e7490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -23,6 +23,7 @@ import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Map;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -46,12 +47,14 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
@@ -82,6 +85,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/** */
private static final byte EXPIRED_FLAG = 0b1000;
+ /** */
+ private static final long BACKUP_ACK_FREQ = 5000;
+
/** Listeners. */
private final ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrs = new ConcurrentHashMap8<>();
@@ -108,6 +114,18 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
@Override protected void start0() throws IgniteCheckedException {
// Append cache name to the topic.
topicPrefix = "CONTINUOUS_QUERY" + (cctx.name() == null ? "" : "_" + cctx.name());
+
+ cctx.io().addHandler(cctx.cacheId(), CacheContinuousQueryBatchAck.class,
+ new CI2<UUID, CacheContinuousQueryBatchAck>() {
+ @Override public void apply(UUID uuid, CacheContinuousQueryBatchAck msg) {
+ CacheContinuousQueryListener lsnr = lsnrs.get(msg.routineId());
+
+ if (lsnr != null)
+ lsnr.cleanupBackupQueue(msg.updateCntrs());
+ }
+ });
+
+ cctx.time().schedule(new BackupCleaner(lsnrs, cctx.kernalContext()), BACKUP_ACK_FREQ, BACKUP_ACK_FREQ);
}
/** {@inheritDoc} */
@@ -137,25 +155,55 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
- * @param e Cache entry.
+ * @param partId Partition id.
+ * @param updCntr Updated counter.
+ * @param topVer Topology version.
+ */
+ public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) {
+ if (lsnrCnt.get() > 0) {
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ UPDATED,
+ key,
+ null,
+ null,
+ partId,
+ updCntr,
+ topVer);
+
+ CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+ cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.skipUpdateEvent(evt, topVer);
+ }
+ }
+
+ /**
* @param key Key.
* @param newVal New value.
* @param oldVal Old value.
+ * @param internal Internal entry (internal key or not user cache),
+ * @param primary {@code True} if called on primary node.
* @param preload Whether update happened during preloading.
+ * @param updateCntr Update counter.
+ * @param topVer Topology version.
* @throws IgniteCheckedException In case of error.
*/
- public void onEntryUpdated(GridCacheEntryEx e,
+ public void onEntryUpdated(
KeyCacheObject key,
CacheObject newVal,
CacheObject oldVal,
- boolean preload)
+ boolean internal,
+ int partId,
+ boolean primary,
+ boolean preload,
+ long updateCntr,
+ AffinityTopologyVersion topVer)
throws IgniteCheckedException
{
- assert e != null;
assert key != null;
- boolean internal = e.isInternal() || !e.context().userCache();
-
if (preload && !internal)
return;
@@ -179,8 +227,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
boolean initialized = false;
- boolean primary = cctx.affinity().primary(cctx.localNode(), key, AffinityTopologyVersion.NONE);
- boolean recordIgniteEvt = !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
+ boolean recordIgniteEvt = primary && !internal && cctx.gridEvents().isRecordable(EVT_CACHE_QUERY_OBJECT_READ);
for (CacheContinuousQueryListener lsnr : lsnrCol.values()) {
if (preload && !lsnr.notifyExisting())
@@ -205,7 +252,10 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
evtType,
key,
newVal,
- lsnr.oldValueRequired() ? oldVal : null);
+ lsnr.oldValueRequired() ? oldVal : null,
+ partId,
+ updateCntr,
+ topVer);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -250,12 +300,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
initialized = true;
}
- CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
- cctx.cacheId(),
- EXPIRED,
- key,
- null,
- lsnr.oldValueRequired() ? oldVal : null);
+ CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ EXPIRED,
+ key,
+ null,
+ lsnr.oldValueRequired() ? oldVal : null,
+ e.partition(),
+ -1,
+ null);
CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent(
cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
@@ -373,6 +426,27 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
/**
+ * @param topVer Topology version.
+ */
+ public void beforeExchange(AffinityTopologyVersion topVer) {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.flushBackupQueue(cctx.kernalContext(), topVer);
+ }
+
+ /**
+ * Partition evicted callback.
+ *
+ * @param part Partition number.
+ */
+ public void onPartitionEvicted(int part) {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.onPartitionEvicted(part);
+
+ for (CacheContinuousQueryListener lsnr : intLsnrs.values())
+ lsnr.onPartitionEvicted(part);
+ }
+
+ /**
* @param locLsnr Local listener.
* @param rmtFilter Remote filter.
* @param bufSize Buffer size.
@@ -417,7 +491,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
sync,
ignoreExpired,
taskNameHash,
- skipPrimaryCheck);
+ skipPrimaryCheck,
+ cctx.isLocal());
IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) ?
F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -469,10 +544,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
GridCacheEntryEx e = it.next();
+ CacheContinuousQueryEntry entry = new CacheContinuousQueryEntry(
+ cctx.cacheId(),
+ CREATED,
+ e.key(),
+ e.rawGet(),
+ null,
+ 0,
+ -1,
+ null);
+
next = new CacheContinuousQueryEvent<>(
cctx.kernalContext().cache().jcache(cctx.name()),
- cctx,
- new CacheContinuousQueryEntry(cctx.cacheId(), CREATED, e.key(), e.rawGet(), null));
+ cctx, entry);
if (rmtFilter != null && !rmtFilter.evaluate(next))
next = null;
@@ -590,10 +674,11 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
CacheEntryEventFilter fltr = null;
if (cfg.getCacheEntryEventFilterFactory() != null) {
- fltr = (CacheEntryEventFilter) cfg.getCacheEntryEventFilterFactory().create();
+ fltr = (CacheEntryEventFilter)cfg.getCacheEntryEventFilterFactory().create();
if (!(fltr instanceof Serializable))
- throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: " + fltr);
+ throw new IgniteCheckedException("Cache entry event filter must implement java.io.Serializable: "
+ + fltr);
}
CacheEntryEventSerializableFilter rmtFilter = new JCacheQueryRemoteFilter(fltr, types);
@@ -637,6 +722,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
/**
* @param impl Listener.
+ * @param log Logger.
*/
JCacheQueryLocalListener(CacheEntryListener<K, V> impl, IgniteLogger log) {
assert impl != null;
@@ -789,4 +875,29 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter {
}
}
}
+
+ /**
+ * Task flash backup queue.
+ */
+ private static final class BackupCleaner implements Runnable {
+ /** Listeners. */
+ private final Map<UUID, CacheContinuousQueryListener> lsnrs;
+
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /**
+ * @param lsnrs Listeners.
+ */
+ public BackupCleaner(Map<UUID, CacheContinuousQueryListener> lsnrs, GridKernalContext ctx) {
+ this.lsnrs = lsnrs;
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ lsnr.acknowledgeBackupOnTimeout(ctx);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 23f83be..ff413d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -181,6 +181,10 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
*/
private byte flags;
+ /** Partition update counter. */
+ @GridDirectTransient
+ private long partUpdateCntr;
+
/** */
private GridCacheVersion serReadVer;
@@ -373,6 +377,22 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
}
/**
+ * Sets partition counter.
+ *
+ * @param partCntr Partition counter.
+ */
+ public void updateCounter(long partCntr) {
+ this.partUpdateCntr = partCntr;
+ }
+
+ /**
+ * @return Partition index.
+ */
+ public long updateCounter() {
+ return partUpdateCntr;
+ }
+
+ /**
* @param val Value to set.
*/
void setAndMarkValid(CacheObject val) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
index a9846ef..63a4cbe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
@@ -964,6 +964,9 @@ public class IgniteTxHandler {
// Complete remote candidates.
tx.doneRemote(req.baseVersion(), null, null, null);
+ tx.setPartitionUpdateCounters(
+ req.partUpdateCounters() != null ? req.partUpdateCounters().array() : null);
+
tx.commit();
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ecb0595..cff62d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -1012,7 +1012,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
+
+ if (updRes.success())
+ txEntry.updateCounter(updRes.updatePartitionCounter());
if (nearCached != null && updRes.success()) {
nearCached.innerSet(
@@ -1032,7 +1036,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
}
}
else if (op == DELETE) {
@@ -1049,7 +1054,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
cached.isNear() ? null : explicitVer,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
+
+ if (updRes.success())
+ txEntry.updateCounter(updRes.updatePartitionCounter());
if (nearCached != null && updRes.success()) {
nearCached.innerRemove(
@@ -1065,7 +1074,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
null,
CU.subjectId(this, cctx),
resolveTaskName(),
- dhtVer);
+ dhtVer,
+ null);
}
}
else if (op == RELOAD) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
index b80909f..8ceca3f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java
@@ -34,4 +34,9 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx {
Collection<GridCacheVersion> committedVers,
Collection<GridCacheVersion> rolledbackVers,
Collection<GridCacheVersion> pendingVers);
-}
\ No newline at end of file
+
+ /**
+ * @param cntrs Partition update indexes.
+ */
+ public void setPartitionUpdateCounters(long[] cntrs);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
new file mode 100644
index 0000000..67b8c82
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatch.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.Collection;
+
+/**
+ * Continuous routine batch.
+ */
+public interface GridContinuousBatch {
+ /**
+ * Adds element to this batch.
+ *
+ * @param obj Element to add.
+ */
+ public void add(Object obj);
+
+ /**
+ * Collects elements that are currently in this batch.
+ *
+ * @return Elements in this batch.
+ */
+ public Collection<Object> collect();
+
+ /**
+ * @return Current batch size.
+ */
+ public int size();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
new file mode 100644
index 0000000..4540de1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousBatchAdapter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import java.util.Collection;
+import org.jsr166.ConcurrentLinkedDeque8;
+
+/**
+ * Continuous routine batch adapter.
+ */
+public class GridContinuousBatchAdapter implements GridContinuousBatch {
+ /** Buffer. */
+ private final ConcurrentLinkedDeque8<Object> buf = new ConcurrentLinkedDeque8<>();
+
+ /** {@inheritDoc} */
+ @Override public void add(Object obj) {
+ assert obj != null;
+
+ buf.add(obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Object> collect() {
+ return buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return buf.sizex();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index 30e596a..d8698b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous;
import java.io.Externalizable;
import java.util.Collection;
+import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridKernalContext;
@@ -98,6 +99,22 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
public void p2pUnmarshal(UUID nodeId, GridKernalContext ctx) throws IgniteCheckedException;
/**
+ * Creates new batch.
+ *
+ * @return New batch.
+ */
+ public GridContinuousBatch createBatch();
+
+ /**
+ * Called when ack for a batch is received from client.
+ *
+ * @param routineId Routine ID.
+ * @param batch Acknowledged batch.
+ * @param ctx Kernal context.
+ */
+ public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx);
+
+ /**
* @return Topic for ordered notifications. If {@code null}, notifications
* will be sent in non-ordered messages.
*/
@@ -129,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable {
* @return Cache name if this is a continuous query handler.
*/
public String cacheName();
+
+ /**
+ * @param cntrs Init state for partition counters.
+ */
+ public void updateCounters(Map<Integer, Long> cntrs);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d1cb3a9..c07cc13 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -52,17 +53,21 @@ import org.apache.ignite.internal.managers.discovery.CustomEventListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
@@ -70,7 +75,6 @@ import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import org.jsr166.ConcurrentLinkedDeque8;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -203,8 +207,38 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
StartFuture fut = startFuts.remove(msg.routineId());
if (fut != null) {
- if (msg.errs().isEmpty())
+ if (msg.errs().isEmpty()) {
+ LocalRoutineInfo routine = locInfos.get(msg.routineId());
+
+ if (routine != null) {
+ try {
+ Map<Integer, Long> cntrs = msg.updateCounters();
+
+ GridCacheAdapter<Object, Object> interCache =
+ ctx.cache().internalCache(routine.handler().cacheName());
+
+ if (interCache != null && cntrs != null && interCache.context() != null
+ && !interCache.isLocal() && !CU.clientNode(ctx.grid().localNode())) {
+ Map<Integer, Long> map = interCache.context().topology().updateCounters();
+
+ for (Map.Entry<Integer, Long> e : map.entrySet()) {
+ Long cntr0 = cntrs.get(e.getKey());
+ Long cntr1 = e.getValue();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ cntrs.put(e.getKey(), cntr1);
+ }
+ }
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to load update counters.", e);
+ }
+
+ routine.handler().updateCounters(msg.updateCounters());
+ }
+
fut.onRemoteRegistered();
+ }
else {
IgniteCheckedException firstEx = F.first(msg.errs().values());
@@ -651,6 +685,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId ID of the node that started routine.
* @param routineId Routine ID.
+ * @param objs Notification objects.
+ * @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void addBackupNotification(UUID nodeId,
+ final UUID routineId,
+ Collection<?> objs,
+ @Nullable Object orderedTopic)
+ throws IgniteCheckedException {
+ if (processorStopped)
+ return;
+
+ final RemoteRoutineInfo info = rmtInfos.get(routineId);
+
+ if (info != null) {
+ final GridContinuousBatch batch = info.addAll(objs);
+
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, true, null);
+ }
+ }
+
+ /**
+ * @param nodeId ID of the node that started routine.
+ * @param routineId Routine ID.
* @param obj Notification object.
* @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
* @param sync If {@code true} then waits for event acknowledgment.
@@ -658,7 +716,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException In case of error.
*/
public void addNotification(UUID nodeId,
- UUID routineId,
+ final UUID routineId,
@Nullable Object obj,
@Nullable Object orderedTopic,
boolean sync,
@@ -673,7 +731,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (processorStopped)
return;
- RemoteRoutineInfo info = rmtInfos.get(routineId);
+ final RemoteRoutineInfo info = rmtInfos.get(routineId);
if (info != null) {
assert info.interval == 0 || !sync;
@@ -686,7 +744,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
syncMsgFuts.put(futId, fut);
try {
- sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg);
+ sendNotification(nodeId, routineId, futId, F.asList(obj), orderedTopic, msg, null);
}
catch (IgniteCheckedException e) {
syncMsgFuts.remove(futId);
@@ -697,10 +755,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
fut.get();
}
else {
- Collection<Object> toSnd = info.add(obj);
+ final GridContinuousBatch batch = info.add(obj);
+
+ if (batch != null) {
+ CI1<IgniteException> ackC = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ if (e == null)
+ info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+ }
+ };
- if (toSnd != null)
- sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+ sendNotification(nodeId, routineId, null, batch.collect(), orderedTopic, msg, ackC);
+ }
}
}
}
@@ -725,6 +791,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
* @param msg If {@code true} then sent data is collection of messages.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
private void sendNotification(UUID nodeId,
@@ -732,7 +799,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Nullable IgniteUuid futId,
Collection<Object> toSnd,
@Nullable Object orderedTopic,
- boolean msg) throws IgniteCheckedException {
+ boolean msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert nodeId != null;
assert routineId != null;
assert toSnd != null;
@@ -740,7 +808,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
sendWithRetries(nodeId,
new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
- orderedTopic);
+ orderedTopic,
+ ackC);
}
/**
@@ -819,6 +888,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
+ try {
+ if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) {
+ Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName())
+ .context().topology().updateCounters();
+
+ req.addUpdateCounters(cntrs);
+ }
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to load partition counters.");
+ }
+
if (err != null)
req.addError(ctx.localNodeId(), err);
@@ -859,6 +940,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
sendWithRetries(nodeId,
new GridContinuousMessage(MSG_EVT_ACK, null, msg.futureId(), null, false),
+ null,
null);
}
catch (IgniteCheckedException e) {
@@ -922,15 +1004,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
break;
}
- IgniteBiTuple<Collection<Object>, Long> t = info.checkInterval();
+ IgniteBiTuple<GridContinuousBatch, Long> t = info.checkInterval();
- Collection<Object> toSnd = t.get1();
+ final GridContinuousBatch batch = t.get1();
- if (toSnd != null && !toSnd.isEmpty()) {
+ if (batch != null && batch.size() > 0) {
try {
+ Collection<Object> toSnd = batch.collect();
+
boolean msg = toSnd.iterator().next() instanceof Message;
- sendNotification(nodeId, routineId, null, toSnd, hnd.orderedTopic(), msg);
+ CI1<IgniteException> ackC = new CI1<IgniteException>() {
+ @Override public void apply(IgniteException e) {
+ if (e == null)
+ info.hnd.onBatchAcknowledged(routineId, batch, ctx);
+ }
+ };
+
+ sendNotification(nodeId,
+ routineId,
+ null,
+ toSnd,
+ hnd.orderedTopic(),
+ msg,
+ ackC);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
@@ -1013,9 +1110,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
- private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic)
+ private void sendWithRetries(UUID nodeId, GridContinuousMessage msg, @Nullable Object orderedTopic,
+ IgniteInClosure<IgniteException> ackC)
throws IgniteCheckedException {
assert nodeId != null;
assert msg != null;
@@ -1023,7 +1122,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null)
- sendWithRetries(node, msg, orderedTopic);
+ sendWithRetries(node, msg, orderedTopic, ackC);
else
throw new ClusterTopologyCheckedException("Node for provided ID doesn't exist (did it leave the grid?): " + nodeId);
}
@@ -1033,14 +1132,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
- private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic)
- throws IgniteCheckedException {
+ private void sendWithRetries(ClusterNode node, GridContinuousMessage msg, @Nullable Object orderedTopic,
+ IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert node != null;
assert msg != null;
- sendWithRetries(F.asList(node), msg, orderedTopic);
+ sendWithRetries(F.asList(node), msg, orderedTopic, ackC);
}
/**
@@ -1048,10 +1148,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param msg Message.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param ackC Ack closure.
* @throws IgniteCheckedException In case of error.
*/
private void sendWithRetries(Collection<? extends ClusterNode> nodes, GridContinuousMessage msg,
- @Nullable Object orderedTopic) throws IgniteCheckedException {
+ @Nullable Object orderedTopic, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException {
assert !F.isEmpty(nodes);
assert msg != null;
@@ -1074,10 +1175,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
msg,
SYSTEM_POOL,
0,
- true);
+ true,
+ ackC);
}
else
- ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL);
+ ctx.io().send(node, TOPIC_CONTINUOUS, msg, SYSTEM_POOL, ackC);
break;
}
@@ -1178,8 +1280,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Lock. */
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- /** Buffer. */
- private ConcurrentLinkedDeque8<Object> buf;
+ /** Batch. */
+ private GridContinuousBatch batch;
/** Last send time. */
private long lastSndTime = U.currentTimeMillis();
@@ -1210,7 +1312,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.interval = interval;
this.autoUnsubscribe = autoUnsubscribe;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
}
/**
@@ -1238,21 +1340,53 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param objs Objects to add.
+ * @return Batch to send.
+ */
+ GridContinuousBatch addAll(Collection<?> objs) {
+ assert objs != null;
+ assert objs.size() > 0;
+
+ GridContinuousBatch toSnd = null;
+
+ lock.writeLock().lock();
+
+ try {
+ for (Object obj : objs)
+ batch.add(obj);
+
+ toSnd = batch;
+
+ batch = hnd.createBatch();
+
+ if (interval > 0)
+ lastSndTime = U.currentTimeMillis();
+ }
+ finally {
+ lock.writeLock().unlock();
+ }
+
+ return toSnd;
+ }
+
+ /**
* @param obj Object to add.
- * @return Object to send or {@code null} if there is nothing to send for now.
+ * @return Batch to send or {@code null} if there is nothing to send for now.
*/
- @Nullable Collection<Object> add(@Nullable Object obj) {
- ConcurrentLinkedDeque8 buf0 = null;
+ @Nullable GridContinuousBatch add(Object obj) {
+ assert obj != null;
+
+ GridContinuousBatch toSnd = null;
- if (buf.sizex() >= bufSize - 1) {
+ if (batch.size() >= bufSize - 1) {
lock.writeLock().lock();
try {
- buf.add(obj);
+ batch.add(obj);
- buf0 = buf;
+ toSnd = batch;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
if (interval > 0)
lastSndTime = U.currentTimeMillis();
@@ -1265,34 +1399,25 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
lock.readLock().lock();
try {
- buf.add(obj);
+ batch.add(obj);
}
finally {
lock.readLock().unlock();
}
}
- Collection<Object> toSnd = null;
-
- if (buf0 != null) {
- toSnd = new ArrayList<>(buf0.sizex());
-
- for (Object o : buf0)
- toSnd.add(o);
- }
-
return toSnd;
}
/**
- * @return Tuple with objects to sleep (or {@code null} if there is nothing to
+ * @return Tuple with batch to send (or {@code null} if there is nothing to
* send for now) and time interval after next check is needed.
*/
@SuppressWarnings("TooBroadScope")
- IgniteBiTuple<Collection<Object>, Long> checkInterval() {
+ IgniteBiTuple<GridContinuousBatch, Long> checkInterval() {
assert interval > 0;
- Collection<Object> toSnd = null;
+ GridContinuousBatch toSnd = null;
long diff;
long now = U.currentTimeMillis();
@@ -1302,10 +1427,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
diff = now - lastSndTime;
- if (diff >= interval && !buf.isEmpty()) {
- toSnd = buf;
+ if (diff >= interval && batch.size() > 0) {
+ toSnd = batch;
- buf = new ConcurrentLinkedDeque8<>();
+ batch = hnd.createBatch();
lastSndTime = now;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
index bd4aae3..9644372 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
/** */
private final Map<UUID, IgniteCheckedException> errs;
+ /** */
+ private final Map<Integer, Long> updateCntrs;
+
/**
* @param routineId Routine id.
* @param errs Errs.
*/
- public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+ public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs,
+ Map<Integer, Long> cntrs) {
super(routineId);
this.errs = new HashMap<>(errs);
+ this.updateCntrs = cntrs;
}
/** {@inheritDoc} */
@@ -51,6 +56,13 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage {
}
/**
+ * @return Update counters for partitions.
+ */
+ public Map<Integer, Long> updateCounters() {
+ return updateCntrs;
+ }
+
+ /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 892adac..82c0377 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** */
private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+ /** */
+ private Map<Integer, Long> updateCntrs;
+
/**
* @param routineId Routine id.
* @param startReqData Start request data.
@@ -63,6 +66,22 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
}
/**
+ * @param cntrs Update counters.
+ */
+ public void addUpdateCounters(Map<Integer, Long> cntrs) {
+ if (updateCntrs == null)
+ updateCntrs = new HashMap<>();
+
+ for (Map.Entry<Integer, Long> e : cntrs.entrySet()) {
+ Long cntr0 = updateCntrs.get(e.getKey());
+ Long cntr1 = e.getValue();
+
+ if (cntr0 == null || cntr1 > cntr0)
+ updateCntrs.put(e.getKey(), cntr1);
+ }
+ }
+
+ /**
* @return Errs.
*/
public Map<UUID, IgniteCheckedException> errs() {
@@ -76,7 +95,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage {
/** {@inheritDoc} */
@Override public DiscoveryCustomMessage ackMessage() {
- return new StartRoutineAckDiscoveryMessage(routineId, errs);
+ return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ce636372/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index b93acf5..97696bb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -479,7 +479,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable GridCacheVersion drVer,
UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer)
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr)
throws IgniteCheckedException, GridCacheEntryRemovedException {
return new GridCacheUpdateTxResult(true, rawPut(val, ttl));
}
@@ -529,7 +530,9 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
boolean conflictResolve,
boolean intercept,
UUID subjId,
- String taskName) throws IgniteCheckedException,
+ String taskName,
+ @Nullable CacheObject prevVal,
+ @Nullable Long updateCntr) throws IgniteCheckedException,
GridCacheEntryRemovedException {
assert false;
@@ -550,7 +553,8 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr
@Nullable GridCacheVersion drVer,
UUID subjId,
String taskName,
- @Nullable GridCacheVersion dhtVer
+ @Nullable GridCacheVersion dhtVer,
+ @Nullable Long updateCntr
) throws IgniteCheckedException, GridCacheEntryRemovedException {
obsoleteVer = ver;