You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 16:02:10 UTC
[08/47] incubator-ignite git commit: # IGNITE-831 Done.
# IGNITE-831 Done.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e028de86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e028de86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e028de86
Branch: refs/heads/ignite-709_3
Commit: e028de86ee375ad7c96a8eeac333487258534cff
Parents: ac7597e
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 5 19:14:43 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 5 19:14:43 2015 +0300
----------------------------------------------------------------------
.../continuous/GridContinuousMessageType.java | 12 -
.../continuous/GridContinuousProcessor.java | 544 +++----------------
.../StartRoutineAckDiscoveryMessage.java | 62 +++
.../StartRoutineDiscoveryMessage.java | 89 +++
.../StopRoutineAckDiscoveryMessage.java | 49 ++
.../continuous/StopRoutineDiscoveryMessage.java | 56 ++
.../ignite/spi/discovery/DiscoverySpi.java | 4 +-
...ridCacheContinuousQueryAbstractSelfTest.java | 6 +-
.../tcp/TcpClientDiscoverySelfTest.java | 4 +-
9 files changed, 338 insertions(+), 488 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
index eb33613..1b79430 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessageType.java
@@ -23,18 +23,6 @@ import org.jetbrains.annotations.*;
* Continuous processor message types.
*/
enum GridContinuousMessageType {
- /** Consume start request. */
- MSG_START_REQ,
-
- /** Consume start acknowledgement. */
- MSG_START_ACK,
-
- /** Consume stop request. */
- MSG_STOP_REQ,
-
- /** Consume stop acknowledgement. */
- MSG_STOP_ACK,
-
/** Remote event notification. */
MSG_EVT_NOTIFICATION,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 41f5940..d71609b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.managers.deployment.*;
+import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.*;
@@ -64,21 +65,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Start futures. */
private final ConcurrentMap<UUID, StartFuture> startFuts = new ConcurrentHashMap8<>();
- /** Start ack wait lists. */
- private final ConcurrentMap<UUID, Collection<UUID>> waitForStartAck = new ConcurrentHashMap8<>();
-
/** Stop futures. */
private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>();
- /** Stop ack wait lists. */
- private final ConcurrentMap<UUID, Collection<UUID>> waitForStopAck = new ConcurrentHashMap8<>();
-
/** Threads started by this processor. */
private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>();
- /** Pending start requests. */
- private final Map<UUID, Collection<GridContinuousMessage>> pending = new HashMap<>();
-
/** */
private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
@@ -91,18 +83,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Lock for stop process. */
private final Lock stopLock = new ReentrantLock();
+ /** Marshaller. */
+ private Marshaller marsh;
+
/** Delay in milliseconds between retries. */
private long retryDelay = 1000;
/** Number of retries using to send messages. */
private int retryCnt = 3;
- /** Acknowledgement timeout. */
- private long ackTimeout;
-
- /** Marshaller. */
- private Marshaller marsh;
-
/**
* @param ctx Kernal context.
*/
@@ -117,15 +106,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
retryDelay = ctx.config().getNetworkSendRetryDelay();
retryCnt = ctx.config().getNetworkSendRetryCount();
- ackTimeout = ctx.config().getNetworkTimeout();
-
- if (ackTimeout < retryDelay * retryCnt) {
- U.warn(log, "Acknowledgement timeout for continuous operations is less than message send " +
- "retry delay multiplied by retries count (will increase timeout value) [ackTimeout=" +
- ackTimeout + ", retryDelay=" + retryDelay + ", retryCnt=" + retryCnt + ']');
-
- ackTimeout = retryDelay * retryCnt;
- }
marsh = ctx.config().getMarshaller();
@@ -136,111 +116,82 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
- Collection<GridContinuousMessage> reqs;
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
- pendingLock.lock();
+ // Unregister handlers created by left node.
+ for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
+ UUID routineId = e.getKey();
+ RemoteRoutineInfo info = e.getValue();
- try {
- // Remove pending requests to send to joined node
- // (if node is left or failed, they are dropped).
- reqs = pending.remove(nodeId);
- }
- finally {
- pendingLock.unlock();
+ if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
+ unregisterRemote(routineId);
}
- switch (evt.type()) {
- case EVT_NODE_JOINED:
- if (reqs != null) {
- UUID routineId = null;
+ for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
+ SyncMessageAckFuture fut = e.getValue();
- // Send pending requests.
- try {
- for (GridContinuousMessage req : reqs) {
- routineId = req.routineId();
+ if (fut.nodeId().equals(nodeId)) {
+ SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
- sendWithRetries(nodeId, req, null);
- }
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send pending start request to node (is node alive?): " +
- nodeId);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send pending start request to node: " + nodeId, e);
+ if (fut0 != null) {
+ ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
+ "Node left grid while sending message to: " + nodeId);
- completeStartFuture(routineId);
- }
- }
-
- break;
-
- case EVT_NODE_LEFT:
- case EVT_NODE_FAILED:
- // Do not wait for start acknowledgements from left node.
- for (Map.Entry<UUID, Collection<UUID>> e : waitForStartAck.entrySet()) {
- Collection<UUID> nodeIds = e.getValue();
-
- for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
- if (nodeId.equals(it.next())) {
- it.remove();
-
- break;
- }
- }
-
- if (nodeIds.isEmpty())
- completeStartFuture(e.getKey());
+ fut0.onDone(err);
}
+ }
+ }
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
- // Do not wait for stop acknowledgements from left node.
- for (Map.Entry<UUID, Collection<UUID>> e : waitForStopAck.entrySet()) {
- Collection<UUID> nodeIds = e.getValue();
+ ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessage.class,
+ new CustomEventListener<StartRoutineDiscoveryMessage>() {
+ @Override public void onCustomEvent(ClusterNode snd, StartRoutineDiscoveryMessage msg) {
+ if (!snd.id().equals(ctx.localNodeId()))
+ processStartRequest(snd.id(), msg);
+ }
+ });
- for (Iterator<UUID> it = nodeIds.iterator(); it.hasNext();) {
- if (nodeId.equals(it.next())) {
- it.remove();
+ ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class,
+ new CustomEventListener<StartRoutineAckDiscoveryMessage>() {
+ @Override public void onCustomEvent(ClusterNode snd, StartRoutineAckDiscoveryMessage msg) {
+ StartFuture fut = startFuts.remove(msg.routineId());
- break;
- }
- }
+ if (fut != null) {
+ if (msg.errs().isEmpty())
+ fut.onRemoteRegistered();
+ else {
+ IgniteCheckedException firstEx = F.first(msg.errs().values());
- if (nodeIds.isEmpty())
- completeStopFuture(e.getKey());
- }
-
- // Unregister handlers created by left node.
- for (Map.Entry<UUID, RemoteRoutineInfo> e : rmtInfos.entrySet()) {
- UUID routineId = e.getKey();
- RemoteRoutineInfo info = e.getValue();
+ fut.onDone(firstEx);
- if (info.autoUnsubscribe && nodeId.equals(info.nodeId))
- unregisterRemote(routineId);
+ stopRoutine(msg.routineId());
}
- for (Map.Entry<IgniteUuid, SyncMessageAckFuture> e : syncMsgFuts.entrySet()) {
- SyncMessageAckFuture fut = e.getValue();
-
- if (fut.nodeId().equals(nodeId)) {
- SyncMessageAckFuture fut0 = syncMsgFuts.remove(e.getKey());
+ }
+ }
+ });
- if (fut0 != null) {
- ClusterTopologyCheckedException err = new ClusterTopologyCheckedException(
- "Node left grid while sending message to: " + nodeId);
+ ctx.discovery().setCustomEventListener(StopRoutineDiscoveryMessage.class,
+ new CustomEventListener<StopRoutineDiscoveryMessage>() {
+ @Override public void onCustomEvent(ClusterNode snd, StopRoutineDiscoveryMessage msg) {
+ if (!snd.id().equals(ctx.localNodeId())) {
+ UUID routineId = msg.routineId();
- fut0.onDone(err);
- }
- }
- }
+ unregisterRemote(routineId);
+ }
+ }
+ });
- break;
+ ctx.discovery().setCustomEventListener(StopRoutineAckDiscoveryMessage.class,
+ new CustomEventListener<StopRoutineAckDiscoveryMessage>() {
+ @Override public void onCustomEvent(ClusterNode snd, StopRoutineAckDiscoveryMessage msg) {
+ StopFuture fut = stopFuts.remove(msg.routineId());
- default:
- assert false : "Unexpected event received: " + evt.shortDisplay();
+ if (fut != null)
+ fut.onDone();
}
- }
- }, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ });
ctx.io().addMessageListener(TOPIC_CONTINUOUS, new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object obj) {
@@ -258,26 +209,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
switch (msg.type()) {
- case MSG_START_REQ:
- processStartRequest(nodeId, msg);
-
- break;
-
- case MSG_START_ACK:
- processStartAck(nodeId, msg);
-
- break;
-
- case MSG_STOP_REQ:
- processStopRequest(nodeId, msg);
-
- break;
-
- case MSG_STOP_ACK:
- processStopAck(nodeId, msg);
-
- break;
-
case MSG_EVT_NOTIFICATION:
processNotification(nodeId, msg);
@@ -323,9 +254,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
pendingLock.lock();
try {
- // Create empty pending set.
- pending.put(nodeId, new HashSet<GridContinuousMessage>());
-
DiscoveryData data = new DiscoveryData(ctx.localNodeId());
// Collect listeners information (will be sent to
@@ -486,32 +414,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
});
}
- Collection<? extends ClusterNode> nodes;
- Collection<UUID> nodeIds;
-
pendingLock.lock();
try {
- // Nodes that participate in routine (request will be sent to these nodes directly).
- nodes = F.view(ctx.discovery().allNodes(), F.and(prjPred, F.remoteNodes(ctx.localNodeId())));
-
- // Stop with exception if projection is empty.
- if (nodes.isEmpty() && !locIncluded) {
- return new GridFinishedFuture<>(
- new ClusterTopologyCheckedException("Failed to register remote continuous listener (projection is empty)."));
- }
-
- // IDs of nodes where request will be sent.
- nodeIds = new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id()));
-
- // If there are currently joining nodes, add request to their pending lists.
- // Node IDs set is updated to make sure that we wait for acknowledgement from
- // these nodes.
- for (Map.Entry<UUID, Collection<GridContinuousMessage>> e : pending.entrySet()) {
- if (nodeIds.add(e.getKey()))
- e.getValue().add(new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false));
- }
-
// Register routine locally.
locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
}
@@ -521,68 +426,19 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
StartFuture fut = new StartFuture(ctx, routineId);
- if (!nodeIds.isEmpty()) {
- // Wait for acknowledgements.
- waitForStartAck.put(routineId, nodeIds);
-
- startFuts.put(routineId, fut);
-
- // Register acknowledge timeout (timeout object will be removed when
- // future is completed).
- fut.addTimeoutObject(new GridTimeoutObjectAdapter(ackTimeout) {
- @Override public void onTimeout() {
- // Stop waiting for acknowledgements.
- Collection<UUID> ids = waitForStartAck.remove(routineId);
-
- if (ids != null) {
- StartFuture f = startFuts.remove(routineId);
+ startFuts.put(routineId, fut);
- assert f != null;
-
- // If there are still nodes without acknowledgements,
- // Stop routine with exception. Continue and complete
- // future otherwise.
- if (!ids.isEmpty()) {
- f.onDone(new IgniteCheckedException("Failed to get start acknowledgement from nodes (timeout " +
- "expired): " + ids + ". Will unregister all continuous listeners."));
-
- stopRoutine(routineId);
- }
- else
- f.onRemoteRegistered();
- }
- }
- });
+ try {
+ ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
}
+ catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+ startFuts.remove(routineId);
- if (!nodes.isEmpty()) {
- // Do not send projection predicate (nodes already filtered).
- reqData.projectionPredicate(null);
- reqData.projectionPredicateBytes(null);
-
- // Send start requests.
- try {
- GridContinuousMessage req = new GridContinuousMessage(MSG_START_REQ, routineId, null, reqData, false);
-
- sendWithRetries(nodes, req, null);
- }
- catch (IgniteCheckedException e) {
- startFuts.remove(routineId);
- waitForStartAck.remove(routineId);
-
- fut.onDone(e);
-
- stopRoutine(routineId);
+ locInfos.remove(routineId);
- locIncluded = false;
- }
- }
- else {
- // There are no remote nodes, but we didn't throw topology exception.
- assert locIncluded;
+ fut.onDone(e);
- // Do not wait anything from remote nodes.
- fut.onRemoteRegistered();
+ return fut;
}
// Register local handler if needed.
@@ -640,61 +496,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
- pendingLock.lock();
-
- try {
- // Remove pending requests for this routine.
- for (Collection<GridContinuousMessage> msgs : pending.values()) {
- Iterator<GridContinuousMessage> it = msgs.iterator();
-
- while (it.hasNext()) {
- if (it.next().routineId().equals(routineId))
- it.remove();
- }
- }
- }
- finally {
- pendingLock.unlock();
- }
-
- // Nodes where to send stop requests.
- Collection<? extends ClusterNode> nodes = F.view(ctx.discovery().allNodes(),
- F.and(routine.prjPred, F.remoteNodes(ctx.localNodeId())));
-
- if (!nodes.isEmpty()) {
- // Wait for acknowledgements.
- waitForStopAck.put(routineId, new GridConcurrentHashSet<>(F.viewReadOnly(nodes, F.node2id())));
-
- // Register acknowledge timeout (timeout object will be removed when
- // future is completed).
- fut.addTimeoutObject(new StopTimeoutObject(ackTimeout, routineId,
- new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false)));
-
- // Send stop requests.
- try {
- for (ClusterNode node : nodes) {
- try {
- sendWithRetries(node.id(),
- new GridContinuousMessage(MSG_STOP_REQ, routineId, null, null, false),
- null);
- }
- catch (ClusterTopologyCheckedException ignored) {
- U.warn(log, "Failed to send stop request (node left topology): " + node.id());
- }
- }
- }
- catch (IgniteCheckedException e) {
- stopFuts.remove(routineId);
- waitForStopAck.remove(routineId);
-
- fut.onDone(e);
- }
- }
- else {
- stopFuts.remove(routineId);
-
- fut.onDone();
- }
+ ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
}
return fut;
@@ -727,7 +529,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
assert info.interval == 0 || !sync;
if (sync) {
- SyncMessageAckFuture fut = new SyncMessageAckFuture(ctx, nodeId);
+ SyncMessageAckFuture fut = new SyncMessageAckFuture(nodeId);
IgniteUuid futId = IgniteUuid.randomUuid();
@@ -782,12 +584,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param nodeId Sender ID.
* @param req Start request.
*/
- private void processStartRequest(UUID nodeId, GridContinuousMessage req) {
+ private void processStartRequest(UUID nodeId, StartRoutineDiscoveryMessage req) {
assert nodeId != null;
assert req != null;
UUID routineId = req.routineId();
- StartRequestData data = req.data();
+ StartRequestData data = req.startRequestData();
GridContinuousHandler hnd = data.handler();
@@ -836,100 +638,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
- try {
- sendWithRetries(nodeId, new GridContinuousMessage(MSG_START_ACK, routineId, null, err, false), null);
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send start acknowledgement to node (is node alive?): " + nodeId);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send start acknowledgement to node: " + nodeId, e);
- }
+ if (err != null)
+ req.addError(ctx.localNodeId(), err);
if (registered)
hnd.onListenerRegistered(routineId, ctx);
}
/**
- * @param nodeId Sender ID.
- * @param ack Start acknowledgement.
- */
- private void processStartAck(UUID nodeId, GridContinuousMessage ack) {
- assert nodeId != null;
- assert ack != null;
-
- UUID routineId = ack.routineId();
-
- final IgniteCheckedException err = ack.data();
-
- if (err != null) {
- if (waitForStartAck.remove(routineId) != null) {
- final StartFuture fut = startFuts.remove(routineId);
-
- if (fut != null) {
- fut.onDone(err);
-
- stopRoutine(routineId);
- }
- }
- }
-
- Collection<UUID> nodeIds = waitForStartAck.get(routineId);
-
- if (nodeIds != null) {
- nodeIds.remove(nodeId);
-
- if (nodeIds.isEmpty())
- completeStartFuture(routineId);
- }
- }
-
- /**
- * @param nodeId Sender ID.
- * @param req Stop request.
- */
- private void processStopRequest(UUID nodeId, GridContinuousMessage req) {
- assert nodeId != null;
- assert req != null;
-
- UUID routineId = req.routineId();
-
- unregisterRemote(routineId);
-
- try {
- sendWithRetries(nodeId, new GridContinuousMessage(MSG_STOP_ACK, routineId, null, null, false), null);
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to send stop acknowledgement to node (is node alive?): " + nodeId);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to send stop acknowledgement to node: " + nodeId, e);
- }
- }
-
- /**
- * @param nodeId Sender ID.
- * @param ack Stop acknowledgement.
- */
- private void processStopAck(UUID nodeId, GridContinuousMessage ack) {
- assert nodeId != null;
- assert ack != null;
-
- UUID routineId = ack.routineId();
-
- Collection<UUID> nodeIds = waitForStopAck.get(routineId);
-
- if (nodeIds != null) {
- nodeIds.remove(nodeId);
-
- if (nodeIds.isEmpty())
- completeStopFuture(routineId);
- }
- }
-
- /**
* @param msg Message.
*/
private void processMessageAck(GridContinuousMessage msg) {
@@ -972,36 +688,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
- * @param routineId Consume ID.
- */
- private void completeStartFuture(UUID routineId) {
- assert routineId != null;
-
- if (waitForStartAck.remove(routineId) != null) {
- StartFuture fut = startFuts.remove(routineId);
-
- assert fut != null;
-
- fut.onRemoteRegistered();
- }
- }
-
- /**
- * @param routineId Consume ID.
- */
- private void completeStopFuture(UUID routineId) {
- assert routineId != null;
-
- if (waitForStopAck.remove(routineId) != null) {
- GridFutureAdapter <?> fut = stopFuts.remove(routineId);
-
- assert fut != null;
-
- fut.onDone();
- }
- }
-
- /**
* @param nodeId Node ID.
* @param routineId Consume ID.
* @param hnd Handler.
@@ -1589,13 +1275,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private volatile GridTimeoutObject timeoutObj;
/**
- * Required by {@link Externalizable}.
- */
- public StartFuture() {
- // No-op.
- }
-
- /**
* @param ctx Kernal context.
* @param routineId Consume ID.
*/
@@ -1706,10 +1385,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private UUID nodeId;
/**
- * @param ctx Kernal context.
* @param nodeId Master node ID.
*/
- SyncMessageAckFuture(GridKernalContext ctx, UUID nodeId) {
+ SyncMessageAckFuture(UUID nodeId) {
this.nodeId = nodeId;
}
@@ -1725,76 +1403,4 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
return S.toString(SyncMessageAckFuture.class, this);
}
}
-
- /**
- * Timeout object for stop process.
- */
- private class StopTimeoutObject extends GridTimeoutObjectAdapter {
- /** Timeout. */
- private final long timeout;
-
- /** Routine ID. */
- private final UUID routineId;
-
- /** Request. */
- private final GridContinuousMessage req;
-
- /**
- * @param timeout Timeout.
- * @param routineId Routine ID.
- * @param req Request.
- */
- protected StopTimeoutObject(long timeout, UUID routineId, GridContinuousMessage req) {
- super(timeout);
-
- assert routineId != null;
- assert req != null;
-
- this.timeout = timeout;
- this.routineId = routineId;
- this.req = req;
- }
-
- /** {@inheritDoc} */
- @Override public void onTimeout() {
- Collection<UUID> ids = waitForStopAck.remove(routineId);
-
- if (ids != null) {
- U.warn(log, "Failed to get stop acknowledgement from nodes (timeout expired): " + ids +
- ". Will retry.");
-
- StopFuture f = stopFuts.get(routineId);
-
- if (f != null) {
- if (!ids.isEmpty()) {
- waitForStopAck.put(routineId, ids);
-
- // Resend requests.
- for (UUID id : ids) {
- try {
- sendWithRetries(id, req, null);
- }
- catch (ClusterTopologyCheckedException ignored) {
- if (log.isDebugEnabled())
- log.debug("Failed to resend stop request to node (is node alive?): " + id);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to resend stop request to node: " + id, e);
-
- ids.remove(id);
-
- if (ids.isEmpty())
- f.onDone(e);
- }
- }
-
- // Reschedule timeout.
- ctx.timeout().addTimeoutObject(new StopTimeoutObject(timeout, routineId, req));
- }
- else if (stopFuts.remove(routineId) != null)
- f.onDone();
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..4e5bb9c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+ /** Routine ID. */
+ private final UUID routineId;
+
+ /** */
+ private final Map<UUID, IgniteCheckedException> errs;
+
+ /**
+ * @param routineId Routine id.
+ * @param errs Errs.
+ */
+ public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) {
+ this.routineId = routineId;
+ this.errs = new HashMap<>(errs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean forwardMinorVersion() {
+ return false;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+
+ /**
+ * @return Errs.
+ */
+ public Map<UUID, IgniteCheckedException> errs() {
+ return errs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..7fa78b6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.spi.*;
+import org.apache.ignite.spi.discovery.tcp.messages.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StartRoutineDiscoveryMessage implements RingEndAwareCustomMessage {
+ /** Routine ID. */
+ private final UUID routineId;
+
+ /** */
+ private final StartRequestData startReqData;
+
+ /** */
+ private final Map<UUID, IgniteCheckedException> errs = new HashMap<>();
+
+ /**
+ * @param routineId Routine id.
+ * @param startReqData Start request data.
+ */
+ public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
+ this.routineId = routineId;
+ this.startReqData = startReqData;
+ }
+
+
+
+ /** {@inheritDoc} */
+ @Override public boolean forwardMinorVersion() {
+ return false;
+ }
+
+ /**
+ * @return Start request data.
+ */
+ public StartRequestData startRequestData() {
+ return startReqData;
+ }
+
+ /**
+ * @param nodeId Node id.
+ * @param e Exception.
+ */
+ public void addError(UUID nodeId, IgniteCheckedException e) {
+ errs.put(nodeId, e);
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+
+ /**
+ * @return Errs.
+ */
+ public Map<UUID, IgniteCheckedException> errs() {
+ return errs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) {
+ return new StartRoutineAckDiscoveryMessage(routineId, errs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
new file mode 100644
index 0000000..755552b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineAckDiscoveryMessage implements DiscoveryCustomMessage {
+ /** Routine ID. */
+ private final UUID routineId;
+
+ /**
+ * @param routineId Routine id.
+ */
+ public StopRoutineAckDiscoveryMessage(UUID routineId) {
+ this.routineId = routineId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean forwardMinorVersion() {
+ return false;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
new file mode 100644
index 0000000..9c480a0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineDiscoveryMessage.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.continuous;
+
+import org.apache.ignite.internal.managers.discovery.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ *
+ */
+public class StopRoutineDiscoveryMessage implements RingEndAwareCustomMessage {
+ /** Routine ID. */
+ private final UUID routineId;
+
+ /**
+ * @param routineId Routine id.
+ */
+ public StopRoutineDiscoveryMessage(UUID routineId) {
+ this.routineId = routineId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean forwardMinorVersion() {
+ return false;
+ }
+
+ /**
+ * @return Routine ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage newMessageOnRingEnd(IgniteSpiContext ctx) {
+ return new StopRoutineAckDiscoveryMessage(routineId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
index 247ff67..84a5f41 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpi.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi.discovery;
+import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.lang.*;
@@ -142,8 +143,9 @@ public interface DiscoverySpi extends IgniteSpi {
/**
* Sends custom message across the ring.
* @param evt Event.
+ * @throws IgniteException if failed to marshal evt.
*/
- public void sendCustomEvent(DiscoveryCustomMessage evt);
+ public void sendCustomEvent(DiscoveryCustomMessage evt) throws IgniteException;
/**
* Initiates failure of provided node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5a78f9f..378d5a3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -42,6 +42,7 @@ import javax.cache.*;
import javax.cache.configuration.*;
import javax.cache.event.*;
import javax.cache.integration.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -57,7 +58,7 @@ import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.*
/**
* Continuous queries tests.
*/
-public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest {
+public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommonAbstractTest implements Serializable {
/** IP finder. */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
@@ -177,10 +178,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
assertEquals(String.valueOf(i), 3, ((Map)U.field(proc, "locInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
- assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStartAck")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
- assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "waitForStopAck")).size());
- assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "pending")).size());
CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e028de86/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index 05fb52b..d1b25df 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -539,7 +539,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
try {
id = msg.remoteListen(null, new MessageListener());
- msgLatch = new CountDownLatch(4);
+ msgLatch = new CountDownLatch(2);
msg.send(null, "Message 1");
@@ -550,7 +550,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
checkNodes(3, 3);
- msgLatch = new CountDownLatch(6);
+ msgLatch = new CountDownLatch(3);
msg.send(null, "Message 2");