You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/16 15:43:50 UTC
[42/50] [abbrv] incubator-ignite git commit: # ignite-901 client
reconnect support
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index dd04bf4..daa9494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -153,21 +153,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
- for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
- StartFuture fut = itr.next();
-
- itr.remove();
-
- fut.onDone(new IgniteException("Topology segmented"));
- }
-
- for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
- StopFuture fut = itr.next();
-
- itr.remove();
-
- fut.onDone(new IgniteException("Topology segmented"));
- }
+ cancelFutures(new IgniteCheckedException("Topology segmented"));
}
}, EVT_NODE_SEGMENTED);
@@ -263,6 +249,27 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param e Error.
+ */
+ private void cancelFutures(IgniteCheckedException e) {
+ for (Iterator<StartFuture> itr = startFuts.values().iterator(); itr.hasNext(); ) {
+ StartFuture fut = itr.next();
+
+ itr.remove();
+
+ fut.onDone(e);
+ }
+
+ for (Iterator<StopFuture> itr = stopFuts.values().iterator(); itr.hasNext(); ) {
+ StopFuture fut = itr.next();
+
+ itr.remove();
+
+ fut.onDone(e);
+ }
+ }
+
+ /**
* @return {@code true} if lock successful, {@code false} if processor already stopped.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@@ -318,27 +325,30 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- if (!nodeId.equals(ctx.localNodeId())) {
+ if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
- // Collect listeners information (will be sent to
- // joining node during discovery process).
+ // Collect listeners information (will be sent to joining node during discovery process).
for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
UUID routineId = e.getKey();
LocalRoutineInfo info = e.getValue();
- data.addItem(new DiscoveryDataItem(routineId, info.prjPred,
- info.hnd, info.bufSize, info.interval));
+ data.addItem(new DiscoveryDataItem(routineId,
+ info.prjPred,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe));
}
return data;
}
- else
- return null;
+
+ return null;
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable obj) {
+ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
DiscoveryData data = (DiscoveryData)obj;
if (!ctx.isDaemon() && data != null) {
@@ -377,6 +387,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* Callback invoked when cache is started.
*
* @param ctx Cache context.
+ * @throws IgniteCheckedException If failed.
*/
public void onCacheStart(GridCacheContext ctx) throws IgniteCheckedException {
for (Map.Entry<UUID, RemoteRoutineInfo> entry : rmtInfos.entrySet()) {
@@ -491,7 +502,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
// Register routine locally.
- locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval));
+ locInfos.put(routineId, new LocalRoutineInfo(prjPred, hnd, bufSize, interval, autoUnsubscribe));
StartFuture fut = new StartFuture(ctx, routineId);
@@ -500,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
}
- catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+ catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
startFuts.remove(routineId);
locInfos.remove(routineId);
@@ -565,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
- ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ try {
+ ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
if (ctx.isStopping())
fut.onDone();
@@ -580,6 +596,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param obj Notification object.
* @param orderedTopic Topic for ordered notifications. If {@code null}, non-ordered message will be sent.
* @param sync If {@code true} then waits for event acknowledgment.
+ * @param msg If {@code true} then sent data is message.
* @throws IgniteCheckedException In case of error.
*/
public void addNotification(UUID nodeId,
@@ -630,6 +647,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected."));
+
+ for (UUID rmtId : rmtInfos.keySet())
+ unregisterRemote(rmtId);
+
+ rmtInfos.clear();
+
+ clientInfos.clear();
+ }
+
/**
* @param nodeId Node ID.
* @param routineId Routine ID.
@@ -637,6 +666,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param toSnd Notification object to send.
* @param orderedTopic Topic for ordered notifications.
* If {@code null}, non-ordered message will be sent.
+ * @param msg If {@code true} then sent data is collection of messages.
* @throws IgniteCheckedException In case of error.
*/
private void sendNotification(UUID nodeId,
@@ -703,8 +733,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
assert old == null;
}
- clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(), hnd, data.bufferSize(),
- data.interval()));
+ clientRouteMap.put(routineId, new LocalRoutineInfo(data.projectionPredicate(),
+ hnd,
+ data.bufferSize(),
+ data.interval(),
+ data.autoUnsubscribe()));
}
boolean registered = false;
@@ -1022,14 +1055,22 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Time interval. */
private final long interval;
+ /** Automatic unsubscribe flag. */
+ private boolean autoUnsubscribe;
+
/**
* @param prjPred Projection predicate.
* @param hnd Continuous routine handler.
* @param bufSize Buffer size.
* @param interval Interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
*/
- LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred, GridContinuousHandler hnd, int bufSize,
- long interval) {
+ LocalRoutineInfo(@Nullable IgnitePredicate<ClusterNode> prjPred,
+ GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe)
+ {
assert hnd != null;
assert bufSize > 0;
assert interval >= 0;
@@ -1038,6 +1079,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.hnd = hnd;
this.bufSize = bufSize;
this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
}
/**
@@ -1046,6 +1088,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
GridContinuousHandler handler() {
return hnd;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(LocalRoutineInfo.class, this);
+ }
}
/**
@@ -1053,7 +1100,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
*/
private static class RemoteRoutineInfo {
/** Master node ID. */
- private final UUID nodeId;
+ private UUID nodeId;
/** Continuous routine handler. */
private final GridContinuousHandler hnd;
@@ -1205,6 +1252,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
return F.t(toSnd, diff < interval ? interval - diff : interval);
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(RemoteRoutineInfo.class, this);
+ }
}
/**
@@ -1221,6 +1273,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@GridToStringInclude
private Collection<DiscoveryDataItem> items;
+ /** */
private Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
/**
@@ -1232,6 +1285,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId Node ID.
+ * @param clientInfos Client information.
*/
DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
assert nodeId != null;
@@ -1308,9 +1362,15 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
* @param hnd Handler.
* @param bufSize Buffer size.
* @param interval Time interval.
+ * @param autoUnsubscribe Automatic unsubscribe flag.
*/
- DiscoveryDataItem(UUID routineId, @Nullable IgnitePredicate<ClusterNode> prjPred,
- GridContinuousHandler hnd, int bufSize, long interval) {
+ DiscoveryDataItem(UUID routineId,
+ @Nullable IgnitePredicate<ClusterNode> prjPred,
+ GridContinuousHandler hnd,
+ int bufSize,
+ long interval,
+ boolean autoUnsubscribe)
+ {
assert routineId != null;
assert hnd != null;
assert bufSize > 0;
@@ -1321,6 +1381,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
this.hnd = hnd;
this.bufSize = bufSize;
this.interval = interval;
+ this.autoUnsubscribe = autoUnsubscribe;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 54478f8..4f75e0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.stream.*;
import org.apache.ignite.thread.*;
@@ -63,13 +64,15 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
public DataStreamProcessor(GridKernalContext ctx) {
super(ctx);
- ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof DataStreamerRequest;
+ if (!ctx.clientNode()) {
+ ctx.io().addMessageListener(TOPIC_DATASTREAM, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof DataStreamerRequest;
- processRequest(nodeId, (DataStreamerRequest)msg);
- }
- });
+ processRequest(nodeId, (DataStreamerRequest)msg);
+ }
+ });
+ }
marsh = ctx.config().getMarshaller();
}
@@ -113,7 +116,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
if (ctx.config().isDaemon())
return;
- ctx.io().removeMessageListener(TOPIC_DATASTREAM);
+ if (!ctx.clientNode())
+ ctx.io().removeMessageListener(TOPIC_DATASTREAM);
busyLock.block();
@@ -139,6 +143,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
log.debug("Stopped data streamer processor.");
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ for (DataStreamerImpl<?, ?> ldr : ldrs)
+ ldr.onDisconnected(reconnectFut);
+ }
+
/**
* @param cacheName Cache name ({@code null} for default cache).
* @return Data loader.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 26b0568..605f478 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -145,6 +145,9 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
/** Busy lock. */
private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** */
+ private CacheException disconnectErr;
+
/** Closed flag. */
private final AtomicBoolean closed = new AtomicBoolean();
@@ -245,7 +248,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
fut = new DataStreamerFuture(this);
- publicFut = new IgniteFutureImpl<>(fut);
+ publicFut = new IgniteCacheFutureImpl<>(fut);
}
/**
@@ -284,8 +287,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* Enters busy lock.
*/
private void enterBusy() {
- if (!busyLock.enterBusy())
+ if (!busyLock.enterBusy()) {
+ if (disconnectErr != null)
+ throw disconnectErr;
+
throw new IllegalStateException("Data streamer has been closed.");
+ }
}
/**
@@ -435,7 +442,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
load0(entries0, resFut, keys, 0);
- return new IgniteFutureImpl<>(resFut);
+ return new IgniteCacheFutureImpl<>(resFut);
}
catch (IgniteException e) {
return new IgniteFinishedFutureImpl<>(e);
@@ -487,7 +494,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
load0(entries, resFut, keys, 0);
- return new IgniteFutureImpl<>(resFut);
+ return new IgniteCacheFutureImpl<>(resFut);
}
catch (Throwable e) {
resFut.onDone(e);
@@ -631,6 +638,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
resFut.onDone();
}
}
+ catch (IgniteClientDisconnectedCheckedException e1) {
+ if (log.isDebugEnabled())
+ log.debug("Future finished with disconnect error [nodeId=" + nodeId + ", err=" + e1 + ']');
+
+ resFut.onDone(e1);
+ }
catch (IgniteCheckedException e1) {
if (log.isDebugEnabled())
log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
@@ -757,6 +770,12 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
try {
fut.get();
}
+ catch (IgniteClientDisconnectedCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to flush buffer: " + e);
+
+ throw CU.convertToCacheException(e);
+ }
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to flush buffer: " + e);
@@ -802,7 +821,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
doFlush();
}
catch (IgniteCheckedException e) {
- throw GridCacheUtils.convertToCacheException(e);
+ throw CU.convertToCacheException(e);
}
finally {
leaveBusy();
@@ -843,7 +862,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
closeEx(cancel);
}
catch (IgniteCheckedException e) {
- throw GridCacheUtils.convertToCacheException(e);
+ throw CU.convertToCacheException(e);
}
}
@@ -852,6 +871,15 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
* @throws IgniteCheckedException If failed.
*/
public void closeEx(boolean cancel) throws IgniteCheckedException {
+ closeEx(cancel, null);
+ }
+
+ /**
+ * @param cancel {@code True} to close with cancellation.
+ * @param err Error.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void closeEx(boolean cancel, IgniteCheckedException err) throws IgniteCheckedException {
if (!closed.compareAndSet(false, true))
return;
@@ -868,7 +896,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
cancelled = true;
for (Buffer buf : bufMappings.values())
- buf.cancelAll();
+ buf.cancelAll(err);
}
else
doFlush();
@@ -881,13 +909,29 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
e = e0;
}
- fut.onDone(null, e);
+ fut.onDone(null, e != null ? e : err);
if (e != null)
throw e;
}
/**
+ * @param reconnectFut Reconnect future.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Data streamer has been closed, client node disconnected.");
+
+ disconnectErr = (CacheException)CU.convertToCacheException(err);
+
+ for (Buffer buf : bufMappings.values())
+ buf.cancelAll(err);
+
+ closeEx(true, err);
+ }
+
+ /**
* @return {@code true} If the loader is closed.
*/
boolean isClosed() {
@@ -1027,7 +1071,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
submit(entries0, topVer, curFut0);
if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this));
+ curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
+ DataStreamerImpl.this));
+ else if (ctx.clientDisconnected())
+ curFut0.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Client node disconnected."));
}
return curFut0;
@@ -1227,11 +1275,18 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
}
catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
- ((GridFutureAdapter<Object>)fut).onDone(e);
- else
- ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
- "request (node has left): " + node.id()));
+ GridFutureAdapter<Object> fut0 = ((GridFutureAdapter<Object>)fut);
+
+ try {
+ if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
+ fut0.onDone(e);
+ else
+ fut0.onDone(new ClusterTopologyCheckedException("Failed to send request (node has left): "
+ + node.id()));
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ fut0.onDone(e0);
+ }
}
}
}
@@ -1304,10 +1359,11 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
}
/**
- *
+ * @param err Error.
*/
- void cancelAll() {
- IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
+ void cancelAll(@Nullable IgniteCheckedException err) {
+ if (err == null)
+ err = new IgniteCheckedException("Data streamer has been cancelled: " + DataStreamerImpl.this);
for (IgniteInternalFuture<?> f : locFuts) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 5c171e8..57b16f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -185,6 +185,32 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
}
/**
+ * @param key Key.
+ * @param obj Object.
+ */
+ void onRemoved(GridCacheInternal key, GridCacheRemovable obj) {
+ dsMap.remove(key, obj);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ for (Map.Entry<GridCacheInternal, GridCacheRemovable> e : dsMap.entrySet()) {
+ GridCacheRemovable obj = e.getValue();
+
+ if (clusterRestarted) {
+ obj.onRemoved();
+
+ dsMap.remove(e.getKey(), obj);
+ }
+ else
+ obj.needCheckNotRemoved();
+ }
+
+ for (GridCacheContext cctx : ctx.cache().context().cacheContexts())
+ cctx.dataStructures().onReconnected(clusterRestarted);
+ }
+
+ /**
* Gets a sequence from cache or creates one if it's not cached.
*
* @param name Sequence name.
@@ -1001,8 +1027,11 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsView.put(key, val);
}
- latch = new GridCacheCountDownLatchImpl(name, val.get(), val.initialCount(),
- val.autoDelete(), key, cntDownLatchView, dsCacheCtx);
+ latch = new GridCacheCountDownLatchImpl(name, val.initialCount(),
+ val.autoDelete(),
+ key,
+ cntDownLatchView,
+ dsCacheCtx);
dsMap.put(key, latch);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index 5e9245d..1d6e735 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -57,6 +57,9 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
/** Removed flag.*/
private volatile boolean rmvd;
+ /** Check removed flag. */
+ private boolean rmvCheck;
+
/** Atomic long key. */
private GridCacheInternalKey key;
@@ -336,7 +339,31 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
*/
private void checkRemoved() throws IllegalStateException {
if (rmvd)
- throw new IllegalStateException("Atomic long was removed from cache: " + name);
+ throw removedError();
+
+ if (rmvCheck) {
+ try {
+ rmvd = atomicView.get(key) == null;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ rmvCheck = false;
+
+ if (rmvd) {
+ ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+ throw removedError();
+ }
+ }
+ }
+
+ /**
+ * @return Error.
+ */
+ private IllegalStateException removedError() {
+ return new IllegalStateException("Atomic long was removed from cache: " + name);
}
/** {@inheritDoc} */
@@ -345,8 +372,8 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ext
}
/** {@inheritDoc} */
- @Override public void onInvalid(@Nullable Exception err) {
- // No-op.
+ @Override public void needCheckNotRemoved() {
+ rmvCheck = true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 0c4e5e6..f740c4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -24,7 +24,6 @@ import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.jetbrains.annotations.*;
import java.io.*;
import java.util.concurrent.*;
@@ -56,6 +55,9 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** Status.*/
private volatile boolean rmvd;
+ /** Check removed flag. */
+ private boolean rmvCheck;
+
/** Atomic reference key. */
private GridCacheInternalKey key;
@@ -156,8 +158,8 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
/** {@inheritDoc} */
- @Override public void onInvalid(@Nullable Exception err) {
- // No-op.
+ @Override public void needCheckNotRemoved() {
+ rmvCheck = true;
}
/** {@inheritDoc} */
@@ -293,7 +295,31 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
*/
private void checkRemoved() throws IllegalStateException {
if (rmvd)
- throw new IllegalStateException("Atomic reference was removed from cache: " + name);
+ throw removedError();
+
+ if (rmvCheck) {
+ try {
+ rmvd = atomicView.get(key) == null;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ rmvCheck = false;
+
+ if (rmvd) {
+ ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+ throw removedError();
+ }
+ }
+ }
+
+ /**
+ * @return Error.
+ */
+ private IllegalStateException removedError() {
+ return new IllegalStateException("Atomic reference was removed from cache: " + name);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 2400a7e..31f4f24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -61,6 +61,9 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
/** Removed flag. */
private volatile boolean rmvd;
+ /** Check removed flag. */
+ private boolean rmvCheck;
+
/** Sequence key. */
private GridCacheInternalKey key;
@@ -391,7 +394,31 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
*/
private void checkRemoved() throws IllegalStateException {
if (rmvd)
- throw new IllegalStateException("Sequence was removed from cache: " + name);
+ throw removedError();
+
+ if (rmvCheck) {
+ try {
+ rmvd = seqView.get(key) == null;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ rmvCheck = false;
+
+ if (rmvd) {
+ ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+ throw removedError();
+ }
+ }
+ }
+
+ /**
+ * @return Error.
+ */
+ private IllegalStateException removedError() {
+ return new IllegalStateException("Sequence was removed from cache: " + name);
}
/** {@inheritDoc} */
@@ -400,8 +427,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
}
/** {@inheritDoc} */
- @Override public void onInvalid(@Nullable Exception err) {
- // No-op.
+ @Override public void needCheckNotRemoved() {
+ rmvCheck = true;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 76ea7ca..d2dedeb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -59,6 +59,9 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
/** Removed flag.*/
private volatile boolean rmvd;
+ /** Check removed flag. */
+ private boolean rmvCheck;
+
/** Atomic stamped key. */
private GridCacheInternalKey key;
@@ -206,8 +209,8 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
/** {@inheritDoc} */
- @Override public void onInvalid(@Nullable Exception err) {
- // No-op.
+ @Override public void needCheckNotRemoved() {
+ rmvCheck = true;
}
/** {@inheritDoc} */
@@ -369,7 +372,31 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
*/
private void checkRemoved() throws IllegalStateException {
if (rmvd)
- throw new IllegalStateException("Atomic stamped was removed from cache: " + name);
+ throw removedError();
+
+ if (rmvCheck) {
+ try {
+ rmvd = atomicView.get(key) == null;
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ rmvCheck = false;
+
+ if (rmvd) {
+ ctx.kernalContext().dataStructures().onRemoved(key, this);
+
+ throw removedError();
+ }
+ }
+ }
+
+ /**
+ * @return Error.
+ */
+ private IllegalStateException removedError() {
+ return new IllegalStateException("Atomic stamped was removed from cache: " + name);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index 85b6cfd..95b970a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -67,9 +67,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** Cache context. */
private GridCacheContext ctx;
- /** Current count. */
- private int cnt;
-
/** Initial count. */
private int initCnt;
@@ -96,7 +93,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
* Constructor.
*
* @param name Latch name.
- * @param cnt Current count.
* @param initCnt Initial count.
* @param autoDel Auto delete flag.
* @param key Latch key.
@@ -104,7 +100,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
* @param ctx Cache context.
*/
public GridCacheCountDownLatchImpl(String name,
- int cnt,
int initCnt,
boolean autoDel,
GridCacheInternalKey key,
@@ -112,14 +107,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
GridCacheContext ctx)
{
assert name != null;
- assert cnt >= 0;
assert initCnt >= 0;
assert key != null;
assert latchView != null;
assert ctx != null;
this.name = name;
- this.cnt = cnt;
this.initCnt = initCnt;
this.autoDel = autoDel;
this.key = key;
@@ -136,7 +129,12 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public int count() {
- return cnt;
+ try {
+ return CU.outTx(new GetCountCallable(), ctx);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/** {@inheritDoc} */
@@ -207,13 +205,11 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public boolean onRemoved() {
- assert cnt == 0;
-
return rmvd = true;
}
/** {@inheritDoc} */
- @Override public void onInvalid(@Nullable Exception err) {
+ @Override public void needCheckNotRemoved() {
// No-op.
}
@@ -231,8 +227,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
@Override public void onUpdate(int cnt) {
assert cnt >= 0;
- this.cnt = cnt;
-
while (internalLatch != null && internalLatch.getCount() > cnt)
internalLatch.countDown();
}
@@ -253,9 +247,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
if (log.isDebugEnabled())
log.debug("Failed to find count down latch with given name: " + name);
- assert cnt == 0;
-
- return new CountDownLatch(cnt);
+ return new CountDownLatch(0);
}
tx.commit();
@@ -337,6 +329,29 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/**
*
*/
+ private class GetCountCallable implements Callable<Integer> {
+ /** {@inheritDoc} */
+ @Override public Integer call() throws Exception {
+ Integer val;
+
+ try (IgniteInternalTx tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+ if (latchVal == null)
+ return 0;
+
+ val = latchVal.get();
+
+ tx.rollback();
+ }
+
+ return val;
+ }
+ }
+
+ /**
+ *
+ */
private class CountDownCallable implements Callable<Integer> {
/** Value to count down on (if 0 then latch is counted down to 0). */
private final int val;
@@ -359,9 +374,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
if (log.isDebugEnabled())
log.debug("Failed to find count down latch with given name: " + name);
- assert cnt == 0;
-
- return cnt;
+ return 0;
}
int retVal;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
index 48d8644..dd4f2cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheRemovable.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.datastructures;
-import org.jetbrains.annotations.*;
-
/**
* Provides callback for marking object as removed.
*/
@@ -31,7 +29,7 @@ public interface GridCacheRemovable {
public boolean onRemoved();
/**
- * @param err Error which cause data structure to become invalid.
+ *
*/
- public void onInvalid(@Nullable Exception err);
+ public void needCheckNotRemoved();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
index f74fe95..6d920fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetImpl.java
@@ -101,6 +101,19 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
return rmvd;
}
+ /**
+ * @return {@code True} if set header found in cache.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ public boolean checkHeader() throws IgniteCheckedException {
+ IgniteInternalCache<GridCacheSetHeaderKey, GridCacheSetHeader> cache0 = ctx.cache();
+
+ GridCacheSetHeader hdr = cache0.get(new GridCacheSetHeaderKey(name));
+
+ return hdr != null && hdr.id().equals(id);
+ }
+
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public int size() {
@@ -476,7 +489,7 @@ public class GridCacheSetImpl<T> extends AbstractCollection<T> implements Ignite
/**
* @return Set ID.
*/
- IgniteUuid id() {
+ public IgniteUuid id() {
return id;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index ba43da7..90c26f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -57,6 +57,9 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
/** Busy lock. */
private GridSpinBusyLock busyLock;
+ /** Check removed flag. */
+ private boolean rmvCheck;
+
/**
* Required by {@link Externalizable}.
*/
@@ -78,6 +81,13 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
}
/**
+ * @return Set delegate.
+ */
+ public GridCacheSetImpl delegate() {
+ return delegate;
+ }
+
+ /**
* Remove callback.
*/
public void blockOnRemove() {
@@ -510,8 +520,43 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
* Enters busy state.
*/
private void enterBusy() {
+ boolean rmvd;
+
+ if (rmvCheck) {
+ try {
+ rmvd = !delegate().checkHeader();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ rmvCheck = false;
+
+ if (rmvd) {
+ delegate.removed(true);
+
+ cctx.dataStructures().onRemoved(this);
+
+ throw removedError();
+ }
+ }
+
if (!busyLock.enterBusy())
- throw new IllegalStateException("Set has been removed from cache: " + delegate);
+ throw removedError();
+ }
+
+ /**
+ *
+ */
+ public void needCheckNotRemoved() {
+ rmvCheck = true;
+ }
+
+ /**
+ * @return Error.
+ */
+ private IllegalStateException removedError() {
+ return new IllegalStateException("Set has been removed from cache: " + delegate);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 48e9686..350068a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1413,7 +1413,7 @@ public class GridJobProcessor extends GridProcessorAdapter {
* @return {@code true} if node is dead, {@code false} is node is alive.
*/
private boolean isDeadNode(UUID uid) {
- return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+ return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index d1ee5ad..3a309f6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -863,7 +863,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @return {@code true} if node is dead, {@code false} is node is alive.
*/
private boolean isDeadNode(UUID uid) {
- return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
+ return ctx.discovery().node(uid) == null || !ctx.discovery().pingNodeNoError(uid);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index 0cbb77a..8639bc8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -246,4 +246,11 @@ public interface GridQueryIndexing {
* @return Backup filter.
*/
public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
+
+ /**
+ * Client disconnected callback.
+ *
+ * @param reconnectFut Reconnect future.
+ */
+ public void onDisconnected(IgniteFuture<?> reconnectFut);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1ba1fae..f3ad4b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -234,6 +234,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
idx.stop();
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ if (idx != null)
+ idx.onDisconnected(reconnectFut);
+ }
+
/**
* @param cctx Cache context.
* @throws IgniteCheckedException If failed.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bb451c7..78b09e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -124,7 +124,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache = ctx.cache().utilityCache();
- ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
+ if (!ctx.clientNode())
+ ctx.event().addLocalEventListener(topLsnr, EVTS_DISCOVERY);
try {
if (ctx.deploy().enabled())
@@ -165,7 +166,8 @@ public class GridServiceProcessor extends GridProcessorAdapter {
busyLock.block();
- ctx.event().removeLocalEventListener(topLsnr);
+ if (!ctx.clientNode())
+ ctx.event().removeLocalEventListener(topLsnr);
if (cfgQryId != null)
cache.context().continuousQueries().cancelInternalQuery(cfgQryId);
@@ -209,6 +211,27 @@ public class GridServiceProcessor extends GridProcessorAdapter {
log.debug("Stopped service processor.");
}
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ for (Map.Entry<String, GridServiceDeploymentFuture> e : depFuts.entrySet()) {
+ GridServiceDeploymentFuture fut = e.getValue();
+
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to deploy service, client node disconnected."));
+
+ depFuts.remove(e.getKey(), fut);
+ }
+
+ for (Map.Entry<String, GridFutureAdapter<?>> e : undepFuts.entrySet()) {
+ GridFutureAdapter fut = e.getValue();
+
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to undeploy service, client node disconnected."));
+
+ undepFuts.remove(e.getKey(), fut);
+ }
+ }
+
/**
* Validates service configuration.
*
@@ -328,6 +351,13 @@ public class GridServiceProcessor extends GridProcessorAdapter {
return old;
}
+ if (ctx.clientDisconnected()) {
+ fut.onDone(new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to deploy service, client node disconnected."));
+
+ depFuts.remove(cfg.getName(), fut);
+ }
+
while (true) {
try {
GridServiceDeploymentKey key = new GridServiceDeploymentKey(cfg.getName());
@@ -646,10 +676,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
else {
- Collection<ClusterNode> nodes =
- assigns.nodeFilter() == null ?
- ctx.discovery().nodes(topVer) :
- F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter());
+ Collection<ClusterNode> nodes = assigns.nodeFilter() == null ?
+ ctx.discovery().nodes(topVer) :
+ F.view(ctx.discovery().nodes(topVer), assigns.nodeFilter());
if (!nodes.isEmpty()) {
int size = nodes.size();
@@ -1019,7 +1048,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
cache.getAndRemove(key);
}
catch (IgniteCheckedException ex) {
- log.error("Failed to remove assignments for undeployed service: " + name, ex);
+ U.error(log, "Failed to remove assignments for undeployed service: " + name, ex);
}
}
}
@@ -1164,7 +1193,7 @@ public class GridServiceProcessor extends GridProcessorAdapter {
}
}
catch (IgniteCheckedException ex) {
- log.error("Failed to clean up zombie assignments for service: " + name, ex);
+ U.error(log, "Failed to clean up zombie assignments for service: " + name, ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 8e13bc4..556beea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -68,9 +68,15 @@ class GridServiceProxy<T> implements Serializable {
* @param name Service name.
* @param svc Service type class.
* @param sticky Whether multi-node request should be done.
+ * @param ctx Context.
*/
- @SuppressWarnings("unchecked") GridServiceProxy(ClusterGroup prj, String name, Class<? super T> svc,
- boolean sticky, GridKernalContext ctx) {
+ @SuppressWarnings("unchecked")
+ GridServiceProxy(ClusterGroup prj,
+ String name,
+ Class<? super T> svc,
+ boolean sticky,
+ GridKernalContext ctx)
+ {
this.prj = prj;
this.ctx = ctx;
hasLocNode = hasLocalNode(prj);
@@ -159,6 +165,9 @@ class GridServiceProxy<T> implements Serializable {
catch (RuntimeException | Error e) {
throw e;
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
catch (Exception e) {
throw new IgniteException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d59a51d..d3caf5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -76,8 +76,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
private final LongAdder8 execTasks = new LongAdder8();
/** */
- private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx =
- new ThreadLocal<>();
+ private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = new ThreadLocal<>();
/** */
private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
@@ -119,6 +118,24 @@ public class GridTaskProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ IgniteClientDisconnectedCheckedException err = disconnectedError(reconnectFut);
+
+ for (GridTaskWorker<?, ?> worker : tasks.values())
+ worker.finishTask(null, err);
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ * @return Client disconnected exception.
+ */
+ private IgniteClientDisconnectedCheckedException disconnectedError(@Nullable IgniteFuture<?> reconnectFut) {
+ return new IgniteClientDisconnectedCheckedException(
+ reconnectFut != null ? reconnectFut : ctx.cluster().clientReconnectFuture(),
+ "Failed to execute task, client node disconnected.");
+ }
+
+ /** {@inheritDoc} */
@SuppressWarnings("TooBroadScope")
@Override public void onKernalStop(boolean cancel) {
lock.writeLock();
@@ -552,7 +569,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
// Creates task session with task name and task version.
GridTaskSessionImpl ses = ctx.session().createTaskSession(
sesId,
- ctx.config().getNodeId(),
+ ctx.localNodeId(),
taskName,
dep,
taskCls == null ? null : taskCls.getName(),
@@ -597,25 +614,29 @@ public class GridTaskProcessor extends GridProcessorAdapter {
assert taskWorker0 == null : "Session ID is not unique: " + sesId;
- if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
- try {
- // Start task execution in another thread.
- if (sys)
- ctx.getSystemExecutorService().execute(taskWorker);
- else
- ctx.getExecutorService().execute(taskWorker);
- }
- catch (RejectedExecutionException e) {
- tasks.remove(sesId);
+ if (!ctx.clientDisconnected()) {
+ if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) {
+ try {
+ // Start task execution in another thread.
+ if (sys)
+ ctx.getSystemExecutorService().execute(taskWorker);
+ else
+ ctx.getExecutorService().execute(taskWorker);
+ }
+ catch (RejectedExecutionException e) {
+ tasks.remove(sesId);
- release(dep);
+ release(dep);
- handleException(new ComputeExecutionRejectedException("Failed to execute task " +
- "due to thread pool execution rejection: " + taskName, e), fut);
+ handleException(new ComputeExecutionRejectedException("Failed to execute task " +
+ "due to thread pool execution rejection: " + taskName, e), fut);
+ }
}
+ else
+ taskWorker.run();
}
else
- taskWorker.run();
+ taskWorker.finishTask(null, disconnectedError(null));
}
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index eb5fa77..133a31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1070,10 +1070,17 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
- if (!isDeadNode(nodeId))
- U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
- nodeId + ", taskName=" + ses.getTaskName() +
- ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+ try {
+ if (!isDeadNode(nodeId))
+ U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" +
+ nodeId + ", taskName=" + ses.getTaskName() +
+ ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e);
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send cancel request to node, client disconnected [nodeId=" +
+ nodeId + ", taskName=" + ses.getTaskName() + ']');
+ }
}
}
}
@@ -1169,24 +1176,39 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
}
}
catch (IgniteCheckedException e) {
- boolean deadNode = isDeadNode(res.getNode().id());
+ IgniteException fakeErr = null;
- // Avoid stack trace if node has left grid.
- if (deadNode)
- U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
- "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
- ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
- else
- U.error(log, "Failed to send job request: " + req, e);
+ try {
+ boolean deadNode = isDeadNode(res.getNode().id());
+
+ // Avoid stack trace if node has left grid.
+ if (deadNode) {
+ U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " +
+ "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() +
+ ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']');
+
+ fakeErr = new ClusterTopologyException("Failed to send job due to node failure: " + node, e);
+ }
+ else
+ U.error(log, "Failed to send job request: " + req, e);
+
+ }
+ catch (IgniteClientDisconnectedCheckedException e0) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to send job request, client disconnected [node=" + node +
+ ", taskName=" + ses.getTaskName() + ", taskSesId=" + ses.getId() + ", jobSesId=" +
+ res.getJobContext().getJobId() + ']');
+
+ fakeErr = U.convertException(e0);
+ }
GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(),
res.getJobContext().getJobId(), null, null, null, null, null, null, false);
- if (deadNode)
- fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " +
- node, e));
- else
- fakeRes.setFakeException(U.convertException(e));
+ if (fakeErr == null)
+ fakeErr = U.convertException(e);
+
+ fakeRes.setFakeException(fakeErr);
onResponse(fakeRes);
}
@@ -1345,8 +1367,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
*
* @param uid UID of node to check.
* @return {@code true} if node is dead, {@code false} is node is alive.
+ * @throws IgniteClientDisconnectedCheckedException if ping failed when client disconnected.
*/
- private boolean isDeadNode(UUID uid) {
+ private boolean isDeadNode(UUID uid) throws IgniteClientDisconnectedCheckedException {
return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index f457d6c..66eb596 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -626,6 +626,15 @@ public abstract class IgniteUtils {
}
});
+ m.put(IgniteClientDisconnectedCheckedException.class, new C1<IgniteCheckedException, IgniteException>() {
+ @Override public IgniteException apply(IgniteCheckedException e) {
+ return new IgniteClientDisconnectedException(
+ ((IgniteClientDisconnectedCheckedException)e).reconnectFuture(),
+ e.getMessage(),
+ e);
+ }
+ });
+
return m;
}
@@ -673,6 +682,25 @@ public abstract class IgniteUtils {
* @return Ignite runtime exception.
*/
public static IgniteException convertException(IgniteCheckedException e) {
+ IgniteClientDisconnectedException e0 = e.getCause(IgniteClientDisconnectedException.class);
+
+ if (e0 != null) {
+ assert e0.reconnectFuture() != null : e0;
+
+ throw e0;
+ }
+
+ IgniteClientDisconnectedCheckedException disconnectedErr =
+ e instanceof IgniteClientDisconnectedCheckedException ?
+ (IgniteClientDisconnectedCheckedException)e
+ : e.getCause(IgniteClientDisconnectedCheckedException.class);
+
+ if (disconnectedErr != null) {
+ assert disconnectedErr.reconnectFuture() != null : disconnectedErr;
+
+ e = disconnectedErr;
+ }
+
C1<IgniteCheckedException, IgniteException> converter = exceptionConverters.get(e.getClass());
if (converter != null)
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
index c935c4a..a4f7804 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/IpcSharedMemoryClientEndpoint.java
@@ -59,8 +59,9 @@ public class IpcSharedMemoryClientEndpoint implements IpcEndpoint {
* @param outSpace Out space.
* @param parent Parent logger.
*/
- public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace, IpcSharedMemorySpace outSpace,
- IgniteLogger parent) {
+ public IpcSharedMemoryClientEndpoint(IpcSharedMemorySpace inSpace,
+ IpcSharedMemorySpace outSpace,
+ IgniteLogger parent) {
assert inSpace != null;
assert outSpace != null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
index 6f544e0..f3bcab0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java
@@ -1570,6 +1570,7 @@ public class GridFunc {
* @param <T> Type of the collection.
* @return Light-weight view on given collection with provided predicate.
*/
+ @SafeVarargs
public static <T> Collection<T> view(@Nullable final Collection<T> c,
@Nullable final IgnitePredicate<? super T>... p) {
if (isEmpty(c) || isAlwaysFalse(p))
@@ -2706,6 +2707,7 @@ public class GridFunc {
* @param <T> Type of the free variable, i.e. the element the predicate is called on.
* @return Negated predicate.
*/
+ @SafeVarargs
public static <T> IgnitePredicate<T> not(@Nullable final IgnitePredicate<? super T>... p) {
return isAlwaysFalse(p) ? F.<T>alwaysTrue() : isAlwaysTrue(p) ? F.<T>alwaysFalse() : new P1<T>() {
@Override public boolean apply(T t) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
index 968d88d..0f6ed5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpi.java
@@ -17,6 +17,7 @@
package org.apache.ignite.spi;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -106,4 +107,18 @@ public interface IgniteSpi {
* @throws IgniteSpiException Thrown in case of any error during SPI stop.
*/
public void spiStop() throws IgniteSpiException;
+
+ /**
+ * Client node disconnected callback.
+ *
+ * @param reconnectFut Future that will be completed when client reconnected.
+ */
+ public void onClientDisconnected(IgniteFuture<?> reconnectFut);
+
+ /**
+ * Client node reconnected callback.
+ *
+ * @param clusterRestarted {@code True} if all cluster nodes restarted while client was disconnected.
+ */
+ public void onClientReconnected(boolean clusterRestarted);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/57ac2b3b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index 5e557bd..07b39bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.plugin.security.*;
import org.apache.ignite.resources.*;
@@ -58,9 +59,6 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Ignite instance. */
protected Ignite ignite;
- /** Local node id. */
- protected UUID nodeId;
-
/** Grid instance name. */
protected String gridName;
@@ -73,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Discovery listener. */
private GridLocalEventListener paramsLsnr;
+ /** Local node. */
+ private ClusterNode locNode;
+
/**
* Creates new adapter and initializes it from the current (this) class.
* SPI name will be initialized to the simple name of the class
@@ -111,7 +112,19 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** {@inheritDoc} */
@Override public UUID getLocalNodeId() {
- return nodeId;
+ return ignite.cluster().localNode().id();
+ }
+
+ /**
+ * @return Local node.
+ */
+ protected ClusterNode getLocalNode() {
+ if (locNode != null)
+ return locNode;
+
+ locNode = getSpiContext().localNode();
+
+ return locNode;
}
/** {@inheritDoc} */
@@ -194,17 +207,27 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
}
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) {
+ // No-op.
+ }
+
/**
* Inject ignite instance.
+ *
+ * @param ignite Ignite instance.
*/
@IgniteInstanceResource
protected void injectResources(Ignite ignite) {
this.ignite = ignite;
- if (ignite != null) {
- nodeId = ignite.configuration().getNodeId();
+ if (ignite != null)
gridName = ignite.name();
- }
}
/**