You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/10 15:56:37 UTC
[8/8] incubator-ignite git commit: # ignite-901
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ce2caffd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ce2caffd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ce2caffd
Branch: refs/heads/ignite-901
Commit: ce2caffdde641b6722dfc53877fe5b4633aef2a1
Parents: 782c235
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jul 10 12:15:01 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jul 10 16:56:14 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 4 +
.../ignite/internal/GridKernalContextImpl.java | 13 +-
.../ignite/internal/GridKernalGateway.java | 3 +-
.../apache/ignite/internal/IgniteKernal.java | 3 +-
.../internal/cluster/IgniteClusterImpl.java | 2 +-
.../discovery/GridDiscoveryManager.java | 119 +++++----
.../affinity/GridAffinityAssignmentCache.java | 2 +
.../processors/cache/GridCacheAdapter.java | 267 +++++++------------
.../processors/cache/GridCacheIoManager.java | 1 +
.../processors/cache/GridCacheProcessor.java | 4 +-
.../cache/GridCacheSharedContext.java | 7 +-
.../cache/GridCacheSharedManagerAdapter.java | 1 +
.../processors/cache/IgniteCacheProxy.java | 2 +-
.../CacheDataStructuresManager.java | 1 +
.../continuous/CacheContinuousQueryHandler.java | 10 +-
.../cache/transactions/IgniteTxManager.java | 4 +-
.../continuous/GridContinuousProcessor.java | 9 +-
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 15 ++
.../communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/ClientImpl.java | 59 ++--
.../GridDeploymentManagerStopSelfTest.java | 8 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 148 ++++++++--
.../IgniteCacheQuerySelfTestSuite.java | 1 -
23 files changed, 378 insertions(+), 307 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index fb0a157..65e0644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -120,12 +120,16 @@ public interface GridComponent {
@Nullable public DiscoveryDataExchangeType discoveryDataType();
/**
+ * Client disconnected callback.
+ *
* @param reconnectFut Reconnect future.
* @throws IgniteCheckedException If failed.
*/
public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException;
/**
+ * Client reconnected callback.
+ *
* @param clusterRestarted Cluster restarted flag.
* @throws IgniteCheckedException If failed.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 4a60e28..fd8b50c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -70,7 +70,6 @@ import java.util.*;
import java.util.concurrent.*;
import static org.apache.ignite.IgniteSystemProperties.*;
-import static org.apache.ignite.internal.GridKernalState.*;
import static org.apache.ignite.internal.IgniteComponentType.*;
/**
@@ -306,6 +305,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
private MarshallerContextImpl marshCtx;
/** */
+ private ClusterNode locNode;
+
+ /** */
private volatile boolean disconnected;
/**
@@ -330,6 +332,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
* @param restExecSvc REST executor service.
+ * @param plugins Plugin providers.
* @throws IgniteCheckedException In case of error.
*/
@SuppressWarnings("TypeMayBeWeakened")
@@ -506,9 +509,6 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
return ((IgniteKernal)grid).isStopping();
}
- /** */
- private ClusterNode locNode;
-
/** {@inheritDoc} */
@Override public UUID localNodeId() {
if (locNode != null)
@@ -918,7 +918,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** {@inheritDoc} */
@Override public boolean clientDisconnected() {
- return locNode.isClient() && disconnected;
+ if (locNode == null)
+ locNode = discoMgr != null ? discoMgr.localNode() : null;
+
+ return locNode != null ? (locNode.isClient() && disconnected) : false;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
index 957174a..1d50aa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalGateway.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal;
-import org.apache.ignite.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.jetbrains.annotations.*;
@@ -116,7 +115,7 @@ public interface GridKernalGateway {
/**
* Disconnected callback.
*
- * @return Reconnect future.
+ * @return Reconnect future.
*/
@Nullable public GridFutureAdapter<?> onDisconnected();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 22338cc..4718d75 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -439,8 +439,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
assert cfg != null;
return F.transform(cfg.getUserAttributes().entrySet(), new C1<Map.Entry<String, ?>, String>() {
- @Override
- public String apply(Map.Entry<String, ?> e) {
+ @Override public String apply(Map.Entry<String, ?> e) {
return e.getKey() + ", " + e.getValue().toString();
}
});
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index 246eab5..0287ca7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -52,7 +52,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus
@GridToStringExclude
private ConcurrentMap nodeLoc;
- /** */
+ /** Client reconnect future. */
private IgniteFuture<?> reconnecFut;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 3e8557d..044dc71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -326,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_PHY_RAM, totSysMemory);
- final DiscoverySpi spi = getSpi();
+ DiscoverySpi spi = getSpi();
discoOrdered = discoOrdered();
@@ -477,7 +477,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
// If this is a local join event, just save it and do not notify listeners.
if (type == EVT_NODE_JOINED && node.id().equals(locNode.id())) {
if (gridStartTime == 0)
- gridStartTime = spi.getGridStartTime();
+ gridStartTime = getSpi().getGridStartTime();
DiscoveryEvent discoEvt = new DiscoveryEvent();
@@ -515,9 +515,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
assert locNode.isClient() : locNode;
assert node.isClient() : node;
- boolean clusterRestarted = gridStartTime != spi.getGridStartTime();
+ boolean clusterRestarted = gridStartTime != getSpi().getGridStartTime();
- gridStartTime = spi.getGridStartTime();
+ gridStartTime = getSpi().getGridStartTime();
((IgniteKernal)ctx.grid()).onReconnected(clusterRestarted);
@@ -1198,6 +1198,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
try {
return getSpi().pingNode(nodeId);
}
+ catch (IgniteException e) {
+ return false;
+ }
finally {
busyLock.leaveBusy();
}
@@ -1580,9 +1583,15 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
* @param msg Custom message.
+ * @throws IgniteCheckedException If failed.
*/
- public void sendCustomEvent(DiscoveryCustomMessage msg) {
- getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ public void sendCustomEvent(DiscoveryCustomMessage msg) throws IgniteCheckedException {
+ try {
+ getSpi().sendCustomEvent(new CustomMessageWrapper(msg));
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException(e);
+ }
}
/**
@@ -1679,55 +1688,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
).start();
}
- /**
- * Method is called when any discovery event occurs.
- *
- * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
- * @param topVer Topology version.
- * @param node Remote node this event is connected with.
- * @param topSnapshot Topology snapshot.
- */
- @SuppressWarnings("RedundantTypeArguments")
- private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
- assert node != null;
-
- if (ctx.event().isRecordable(type)) {
- DiscoveryEvent evt = new DiscoveryEvent();
-
- evt.node(ctx.discovery().localNode());
- evt.eventNode(node);
- evt.type(type);
-
- evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter));
-
- if (type == EVT_NODE_METRICS_UPDATED)
- evt.message("Metrics were updated: " + node);
-
- else if (type == EVT_NODE_JOINED)
- evt.message("Node joined: " + node);
-
- else if (type == EVT_NODE_LEFT)
- evt.message("Node left: " + node);
-
- else if (type == EVT_NODE_FAILED)
- evt.message("Node failed: " + node);
-
- else if (type == EVT_NODE_SEGMENTED)
- evt.message("Node segmented: " + node);
-
- else if (type == EVT_CLIENT_NODE_DISCONNECTED)
- evt.message("Client node disconnected: " + node);
-
- else if (type == EVT_CLIENT_NODE_RECONNECTED)
- evt.message("Client node reconnected: " + node);
-
- else
- assert false;
-
- ctx.event().record(evt);
- }
- }
-
/** Worker for network segment checks. */
private class SegmentCheckWorker extends GridWorker {
/** */
@@ -1818,6 +1778,55 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * Method is called when any discovery event occurs.
+ *
+ * @param type Discovery event type. See {@link DiscoveryEvent} for more details.
+ * @param topVer Topology version.
+ * @param node Remote node this event is connected with.
+ * @param topSnapshot Topology snapshot.
+ */
+ @SuppressWarnings("RedundantTypeArguments")
+ private void recordEvent(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot) {
+ assert node != null;
+
+ if (ctx.event().isRecordable(type)) {
+ DiscoveryEvent evt = new DiscoveryEvent();
+
+ evt.node(ctx.discovery().localNode());
+ evt.eventNode(node);
+ evt.type(type);
+
+ evt.topologySnapshot(topVer, U.<ClusterNode, ClusterNode>arrayList(topSnapshot, daemonFilter));
+
+ if (type == EVT_NODE_METRICS_UPDATED)
+ evt.message("Metrics were updated: " + node);
+
+ else if (type == EVT_NODE_JOINED)
+ evt.message("Node joined: " + node);
+
+ else if (type == EVT_NODE_LEFT)
+ evt.message("Node left: " + node);
+
+ else if (type == EVT_NODE_FAILED)
+ evt.message("Node failed: " + node);
+
+ else if (type == EVT_NODE_SEGMENTED)
+ evt.message("Node segmented: " + node);
+
+ else if (type == EVT_CLIENT_NODE_DISCONNECTED)
+ evt.message("Client node disconnected: " + node);
+
+ else if (type == EVT_CLIENT_NODE_RECONNECTED)
+ evt.message("Client node reconnected: " + node);
+
+ else
+ assert false;
+
+ ctx.event().record(evt);
+ }
+ }
+
+ /**
* @param type Event type.
* @param topVer Topology version.
* @param node Node.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 178226d..d5c2b1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -130,6 +130,8 @@ public class GridAffinityAssignmentCache {
/**
* Kernal stop callback.
+ *
+ * @param err Error.
*/
public void onKernalStop(IgniteCheckedException err) {
stopErr = err;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index e70d8e8..d2a730a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -909,12 +909,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[]) null);
+ return keySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Set<K> keySetx() {
- return keySetx((CacheEntryPredicate[]) null);
+ return keySetx((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -1220,8 +1220,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
@@ -1263,12 +1262,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
String taskName,
final IgniteBiInClosure<KeyCacheObject, Object> vis) {
return ctx.closures().callLocalSafe(new GPC<Object>() {
- @Nullable
- @Override
- public Object call() {
+ @Nullable @Override public Object call() {
try {
ctx.store().loadAll(tx, keys, vis);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw new GridClosureException(e);
}
@@ -1470,9 +1468,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (ctx.config().getInterceptor() != null)
fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
- return (V) ctx.config().getInterceptor().onGet(key, f.get());
+ @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException {
+ return (V)ctx.config().getInterceptor().onGet(key, f.get());
}
});
@@ -1984,14 +1981,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, filter)
- .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
+ .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2049,13 +2044,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(drMap.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllDrAsync(ctx, drMap).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAllConflict [drMap=" + drMap + ']';
}
});
@@ -2070,13 +2063,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.putAllDrAsync(ctx, drMap);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAllConflictAsync [drMap=" + drMap + ']';
}
});
@@ -2093,9 +2084,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable
- @Override
- public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap =
Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor);
@@ -2127,14 +2116,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable
- @Override
- public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
- @Override
- public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2175,8 +2161,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() {
- @Override
- public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2206,8 +2191,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<?> fut = asyncOp(new AsyncInOp(keys) {
@Override public IgniteInternalFuture<GridCacheReturn> inOp(IgniteTxLocalAdapter tx) {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() {
- @Override
- public EntryProcessor apply(K k) {
+ @Override public EntryProcessor apply(K k) {
return entryProcessor;
}
});
@@ -2224,8 +2208,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
(IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override
- public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2258,8 +2241,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut;
return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() {
- @Override
- public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
+ @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut)
throws IgniteCheckedException {
GridCacheReturn ret = fut.get();
@@ -2280,12 +2262,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(map.keySet());
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) {
- @Nullable
- @Override
- public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
+ @Nullable @Override public Map<K, EntryProcessorResult<T>> op(IgniteTxLocalAdapter tx)
throws IgniteCheckedException {
IgniteInternalFuture<GridCacheReturn> fut =
- tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>) map, args);
+ tx.invokeAsync(ctx, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args);
return fut.get().value();
}
@@ -2333,14 +2313,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, filter).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxAsync [key=" + key + ", val=" + val + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2370,13 +2348,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<V>(true) {
- @Override
- public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
- return (V) tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
+ @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ return (V)tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray()).get().value();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2396,14 +2372,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), true, null, -1, ctx.noValArray())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2428,13 +2402,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
Boolean stored = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxIfAbsent [key=" + key + ", val=" + val + ']';
}
});
@@ -2459,14 +2431,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.noValArray()).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putxIfAbsentAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2537,13 +2507,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replacex [key=" + key + ", val=" + val + ']';
}
});
@@ -2559,14 +2527,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
return asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.putAllAsync(ctx, F.t(key, val), false, null, -1, ctx.hasValArray()).chain(
(IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replacexAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -2584,8 +2550,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(newVal);
return syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
@@ -2594,8 +2559,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -2658,13 +2622,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValues(m.values());
syncOp(new SyncInOp(m.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.putAllAsync(ctx, m, false, null, -1, CU.empty0()).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "putAll [map=" + m + ']';
}
});
@@ -2706,18 +2668,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
V prevVal = syncOp(new SyncOp<V>(true) {
- @Override
- public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
V ret = tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0()).get().value();
if (ctx.config().getInterceptor() != null)
- return (V) ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
+ return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2();
return ret;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ']';
}
});
@@ -2740,15 +2700,13 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() {
- @Override
- public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) {
// TODO should we invoke interceptor here?
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, true, CU.empty0())
.chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ']';
}
});
@@ -2790,13 +2748,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
syncOp(new SyncInOp(keys.size() == 1) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAll [keys=" + keys + ']';
}
});
@@ -2818,13 +2774,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
IgniteInternalFuture<Object> fut = asyncOp(new AsyncInOp(keys) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, keys, null, false, CU.empty0()).chain(RET2NULL);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllAsync [keys=" + keys + ']';
}
});
@@ -2847,13 +2801,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, CU.empty0()).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removex [key=" + key + ']';
}
});
@@ -2887,14 +2839,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false, filter).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", filter=" + Arrays.toString(filter) + ']';
}
});
@@ -2913,21 +2863,19 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override
- public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- return (GridCacheReturn) tx.removeAllAsync(ctx,
+ return tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
ctx.equalsValArray(val)).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -2942,13 +2890,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
syncOp(new SyncInOp(false) {
- @Override
- public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public void inOp(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
tx.removeAllDrAsync(ctx, drMap).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllConflict [drMap=" + drMap + ']';
}
});
@@ -2963,13 +2909,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.dr().onReceiveCacheEntriesReceived(drMap.size());
return asyncOp(new AsyncInOp(drMap.keySet()) {
- @Override
- public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<?> inOp(IgniteTxLocalAdapter tx) {
return tx.removeAllDrAsync(ctx, drMap);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAllDrASync [drMap=" + drMap + ']';
}
});
@@ -2985,22 +2929,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<GridCacheReturn>(true) {
- @Override
- public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public GridCacheReturn op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- return (GridCacheReturn) tx.putAllAsync(ctx,
- F.t(key, newVal),
- true,
- null,
- -1,
- ctx.equalsValArray(oldVal)).get();
+ return tx.putAllAsync(ctx,
+ F.t(key, newVal),
+ true,
+ null,
+ -1,
+ ctx.equalsValArray(oldVal)).get();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replace [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3014,17 +2956,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override
- public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.removeAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.removeAllAsync(ctx,
Collections.singletonList(key),
null,
true,
@@ -3033,8 +2975,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3049,17 +2990,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return asyncOp(new AsyncOp<GridCacheReturn>() {
- @Override
- public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<GridCacheReturn> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
try {
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(oldVal);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
- IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture) tx.putAllAsync(ctx,
+ IgniteInternalFuture<GridCacheReturn> fut = (IgniteInternalFuture)tx.putAllAsync(ctx,
F.t(key, newVal),
true,
null,
@@ -3069,8 +3010,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return fut;
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "replaceAsync [key=" + key + ", oldVal=" + oldVal + ", newVal=" + newVal + ']';
}
});
@@ -3090,8 +3030,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
boolean rmv = syncOp(new SyncOp<Boolean>(true) {
- @Override
- public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
+ @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException {
// Register before hiding in the filter.
if (ctx.deploymentEnabled())
ctx.deploy().registerClass(val);
@@ -3100,8 +3039,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.equalsValArray(val)).get().success();
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "remove [key=" + key + ", val=" + val + ']';
}
});
@@ -3126,24 +3064,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheValue(val);
IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() {
- @Override
- public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
+ @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) {
// Register before hiding in the filter.
if (ctx.deploymentEnabled()) {
try {
ctx.deploy().registerClass(val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
return new GridFinishedFuture<>(e);
}
}
return tx.removeAllAsync(ctx, Collections.singletonList(key), null, false,
ctx.equalsValArray(val)).chain(
- (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG);
+ (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG);
}
- @Override
- public String toString() {
+ @Override public String toString() {
return "removeAsync [key=" + key + ", val=" + val + ']';
}
});
@@ -3754,16 +3691,17 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private IgniteCacheExpiryPolicy expiryPlc =
ctx.cache().expiryPolicy(opCtx != null ? opCtx.expiry() : null);
- @Override
- public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
+ @Override public Cache.Entry<K, V> apply(Cache.Entry<K, V> lazyEntry) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
V val = localPeek(lazyEntry.getKey(), CachePeekModes.ONHEAP_ONLY, expiryPlc);
return new CacheEntryImpl<>(lazyEntry.getKey(), val);
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- } finally {
+ }
+ finally {
ctx.gate().leave(prev);
}
}
@@ -3787,20 +3725,20 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
.execute();
return ctx.itHolder().iterator(fut, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() {
- @Override
- protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
+ @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) {
return new CacheEntryImpl<>(e.getKey(), e.getValue());
}
- @Override
- protected void remove(Cache.Entry<K, V> item) {
+ @Override protected void remove(Cache.Entry<K, V> item) {
CacheOperationContext prev = ctx.gate().enter(opCtx);
try {
GridCacheAdapter.this.remove(item.getKey());
- } catch (IgniteCheckedException e) {
+ }
+ catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
- } finally {
+ }
+ finally {
ctx.gate().leave(prev);
}
}
@@ -4457,8 +4395,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
return getAllAsync(Collections.singletonList(key), deserializePortable).chain(
new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override
- public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
Map<K, V> map = e.get();
assert map.isEmpty() || map.size() == 1 : map.size();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 7e6b906..84e4dc2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -894,6 +894,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
/**
* @param cacheId Cache ID to remove handlers for.
+ * @param type Message type.
*/
public void removeHandler(int cacheId, Class<? extends GridCacheMessage> type) {
clsHandlers.remove(new ListenerKey(cacheId, type));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 46f9206..bf0f63b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2297,7 +2297,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
- catch (IgniteException e) {
+ catch (IgniteCheckedException e) {
err = e;
}
}
@@ -2957,7 +2957,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
"Failed to execute dynamic cache change request, client node disconnected.");
}
- catch (IgniteException e) {
+ catch (IgniteCheckedException e) {
err = e;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index 91a6042..4075d79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -176,6 +176,7 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param mgrs Managers list.
* @param txMgr Transaction manager.
* @param verMgr Version manager.
* @param mvccMgr MVCC manager.
@@ -221,6 +222,7 @@ public class GridCacheSharedContext<K, V> {
* Adds cache context to shared cache context.
*
* @param cacheCtx Cache context to add.
+ * @throws IgniteCheckedException If cache ID conflict detected.
*/
@SuppressWarnings("unchecked")
public void addCacheContext(GridCacheContext cacheCtx) throws IgniteCheckedException {
@@ -315,7 +317,7 @@ public class GridCacheSharedContext<K, V> {
*/
public byte dataCenterId() {
// Data center ID is same for all caches, so grab the first one.
- GridCacheContext<K, V> cacheCtx = F.first(cacheContexts());
+ GridCacheContext<?, ?> cacheCtx = F.first(cacheContexts());
return cacheCtx.dataCenterId();
}
@@ -327,7 +329,7 @@ public class GridCacheSharedContext<K, V> {
if (preloadersStartFut == null) {
GridCompoundFuture<Object, Object> compound = null;
- for (GridCacheContext<K, V> cacheCtx : cacheContexts()) {
+ for (GridCacheContext<?, ?> cacheCtx : cacheContexts()) {
IgniteInternalFuture<Object> startFut = cacheCtx.preloader().startFuture();
if (!startFut.isDone()) {
@@ -636,6 +638,7 @@ public class GridCacheSharedContext<K, V> {
}
/**
+ * @param mgrs Managers list.
* @param mgr Manager to add.
* @return Added manager.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 3ad0759..6ad76ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -117,6 +117,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/**
+ * @param reconnect {@code True} if manager restarted after client reconnect.
* @throws IgniteCheckedException If failed.
*/
protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 9767f49..6f2eed9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -1774,7 +1774,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
assert false;
}
- @Override public void block() {
+ @Override public void stopped() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
index 3691ee6..f710105 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/datastructures/CacheDataStructuresManager.java
@@ -107,6 +107,7 @@ public class CacheDataStructuresManager extends GridCacheManagerAdapter {
/**
* Client reconnect callback.
+ *
* @throws IgniteCheckedException If failed.
*/
public void onReconnected() throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 2ed4341..879c30c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -396,6 +396,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
}
/** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheContinuousQueryHandler.class, this);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeString(out, cacheName);
out.writeObject(topic);
@@ -438,11 +443,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
taskHash = in.readInt();
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheContinuousQueryHandler.class, this);
- }
-
/**
* @param ctx Kernal context.
* @return Cache context.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index caaa22d..82543c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -154,10 +154,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
/** {@inheritDoc} */
@Override public void onDisconnected(IgniteFuture reconnectFut) {
+ txFinishSync.onDisconnected(reconnectFut);
+
for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
rollbackTx(e.getValue());
-
- txFinishSync.onDisconnected(reconnectFut);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index defcd3f..daa9494 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -511,7 +511,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
try {
ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
}
- catch (IgniteException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
+ catch (IgniteCheckedException e) { // Marshaller exception may occurs if user pass unmarshallable filter.
startFuts.remove(routineId);
locInfos.remove(routineId);
@@ -576,7 +576,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
// Unregister handler locally.
unregisterHandler(routineId, routine.hnd, true);
- ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ try {
+ ctx.discovery().sendCustomEvent(new StopRoutineDiscoveryMessage(routineId));
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(e);
+ }
if (ctx.isStopping())
fut.onDone();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index a49d85a..07b39bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -71,6 +71,9 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** Discovery listener. */
private GridLocalEventListener paramsLsnr;
+ /** Local node. */
+ private ClusterNode locNode;
+
/**
* Creates new adapter and initializes it from the current (this) class.
* SPI name will be initialized to the simple name of the class
@@ -112,6 +115,18 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
return ignite.cluster().localNode().id();
}
+ /**
+ * @return Local node.
+ */
+ protected ClusterNode getLocalNode() {
+ if (locNode != null)
+ return locNode;
+
+ locNode = getSpiContext().localNode();
+
+ return locNode;
+ }
+
/** {@inheritDoc} */
@Override public final String getIgniteHome() {
return ignite.configuration().getIgniteHome();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index d99a764..4fce6f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1717,7 +1717,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (log.isTraceEnabled())
log.trace("Sending message to node [node=" + node + ", msg=" + msg + ']');
- if (node.isLocal())
+ if (node.equals(getLocalNode()))
notifyListener(node.id(), msg, NOOP);
else {
GridCommunicationClient client = null;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 404c71d..cad5435 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*;
+import static org.apache.ignite.spi.discovery.tcp.ClientImpl.State.*;
/**
*
@@ -265,13 +266,13 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
State state = this.state;
- if (spi.getSpiContext().isStopping() || state == State.STOPPED || state == State.SEGMENTED) {
+ if (spi.getSpiContext().isStopping() || state == STOPPED || state == SEGMENTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(false);
return false;
}
- else if (state == State.DISCONNECTED) {
+ else if (state == DISCONNECTED) {
if (pingFuts.remove(nodeId, fut))
fut.onDone(new IgniteClientDisconnectedCheckedException(null,
"Failed to ping node, client node disconnected."));
@@ -282,7 +283,7 @@ class ClientImpl extends TcpDiscoveryImpl {
timer.schedule(new TimerTask() {
@Override public void run() {
if (pingFuts.remove(nodeId, finalFut)) {
- if (ClientImpl.this.state == State.DISCONNECTED)
+ if (ClientImpl.this.state == DISCONNECTED)
finalFut.onDone(new IgniteClientDisconnectedCheckedException(null,
"Failed to ping node, client node disconnected."));
else
@@ -345,10 +346,10 @@ class ClientImpl extends TcpDiscoveryImpl {
@Override public void sendCustomEvent(DiscoverySpiCustomMessage evt) {
State state = this.state;
- if (state == State.SEGMENTED)
+ if (state == SEGMENTED)
throw new IgniteException("Failed to send custom message: client is segmented.");
- if (state == State.DISCONNECTED)
+ if (state == DISCONNECTED)
throw new IgniteException("Failed to send custom message: client is disconnected.");
try {
@@ -981,10 +982,6 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- assert state == ClientImpl.State.DISCONNECTED
- || state == ClientImpl.State.CONNECTED
- || state == ClientImpl.State.STARTING : state;
-
boolean success = false;
Exception err = null;
@@ -1135,7 +1132,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@SuppressWarnings("InfiniteLoopStatement")
@Override protected void body() throws InterruptedException {
- state = ClientImpl.State.STARTING;
+ state = STARTING;
spi.stats.onJoinStarted();
@@ -1146,23 +1143,23 @@ class ClientImpl extends TcpDiscoveryImpl {
Object msg = queue.take();
if (msg == JOIN_TIMEOUT) {
- if (state == ClientImpl.State.STARTING) {
+ if (state == STARTING) {
joinError(new IgniteSpiException("Join process timed out, did not receive response for " +
"join request (consider increasing 'joinTimeout' configuration property) " +
"[joinTimeout=" + spi.joinTimeout + ", sock=" + currSock + ']'));
break;
}
- else if (state == ClientImpl.State.DISCONNECTED) {
+ else if (state == DISCONNECTED) {
log.info("Rejoin timeout, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
}
else if (msg == SPI_STOP) {
- state = ClientImpl.State.STOPPED;
+ state = STOPPED;
assert spi.getSpiContext().isStopping();
@@ -1182,7 +1179,7 @@ class ClientImpl extends TcpDiscoveryImpl {
boolean join = joinLatch.getCount() > 0;
- if (spi.getSpiContext().isStopping() || (state == ClientImpl.State.SEGMENTED)) {
+ if (spi.getSpiContext().isStopping() || state == SEGMENTED) {
leaveLatch.countDown();
if (join) {
@@ -1209,10 +1206,10 @@ class ClientImpl extends TcpDiscoveryImpl {
reconnector = null;
if (spi.isClientReconnectDisabled()) {
- if (state != ClientImpl.State.SEGMENTED && state != ClientImpl.State.STOPPED) {
+ if (state != SEGMENTED && state != STOPPED) {
log.info("Reconnected failed, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1220,8 +1217,8 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
log.info("Reconnected failed, will try join.");
- if (state == ClientImpl.State.STARTING || state == ClientImpl.State.CONNECTED) {
- state = ClientImpl.State.DISCONNECTED;
+ if (state == STARTING || state == CONNECTED) {
+ state = DISCONNECTED;
nodeAdded = false;
@@ -1264,8 +1261,8 @@ class ClientImpl extends TcpDiscoveryImpl {
err = spi.checkFailedError((TcpDiscoveryCheckFailedMessage)msg);
if (err != null) {
- if (state == ClientImpl.State.DISCONNECTED) {
- state = ClientImpl.State.SEGMENTED;
+ if (state == DISCONNECTED) {
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1298,9 +1295,9 @@ class ClientImpl extends TcpDiscoveryImpl {
* @throws InterruptedException If interrupted.
*/
private void tryJoin() throws InterruptedException {
- assert state == ClientImpl.State.DISCONNECTED || state == ClientImpl.State.STARTING : state;
+ assert state == DISCONNECTED || state == STARTING : state;
- boolean join = state == ClientImpl.State.STARTING;
+ boolean join = state == STARTING;
log.info("Try join topology with timeout: " + spi.joinTimeout);
@@ -1314,7 +1311,7 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
log.info("Send join request on rejoin failed, will segment.");
- state = ClientImpl.State.SEGMENTED;
+ state = SEGMENTED;
notifyDiscovery(EVT_NODE_SEGMENTED, topVer, locNode, allVisibleNodes());
}
@@ -1377,14 +1374,14 @@ class ClientImpl extends TcpDiscoveryImpl {
private boolean joining() {
ClientImpl.State state = ClientImpl.this.state;
- return state == ClientImpl.State.STARTING || state == ClientImpl.State.DISCONNECTED;
+ return state == STARTING || state == DISCONNECTED;
}
/**
* @return {@code True} if client disconnected.
*/
private boolean disconnected() {
- return state == ClientImpl.State.DISCONNECTED;
+ return state == DISCONNECTED;
}
/**
@@ -1477,14 +1474,14 @@ class ClientImpl extends TcpDiscoveryImpl {
if (disconnected())
notifyDiscovery(EVT_CLIENT_NODE_RECONNECTED, topVer, locNode, nodes);
+ else
+ spi.stats.onJoinFinished();
- state = ClientImpl.State.CONNECTED;
+ state = CONNECTED;
joinErr.set(null);;
joinLatch.countDown();
-
- spi.stats.onJoinFinished();
}
else if (log.isDebugEnabled())
log.debug("Discarding node add finished message (this message has already been processed) " +
@@ -1737,7 +1734,7 @@ class ClientImpl extends TcpDiscoveryImpl {
* @param msg Message.
*/
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
- if (msg.verified() && state == ClientImpl.State.CONNECTED) {
+ if (msg.verified() && state == CONNECTED) {
DiscoverySpiListener lsnr = spi.lsnr;
if (lsnr != null) {
@@ -1877,7 +1874,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/**
*
*/
- private enum State {
+ enum State {
/** */
STARTING,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
index b8f9ce1..62f5d41 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/deployment/GridDeploymentManagerStopSelfTest.java
@@ -98,13 +98,9 @@ public class GridDeploymentManagerStopSelfTest extends GridCommonAbstractTest {
@Override public boolean unregister(String rsrcName) { return false; }
/** {@inheritDoc} */
- @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
- // No-op.
- }
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) { /* No-op. */ }
/** {@inheritDoc} */
- @Override public void onClientReconnected(boolean clusterRestarted) {
- // No-op.
- }
+ @Override public void onClientReconnected(boolean clusterRestarted) { /* No-op. */ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ce2caffd/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 5838481..4d19f3e 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -391,11 +391,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
final CountDownLatch latch = new CountDownLatch(1);
((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
- @Override
- public void apply(Socket sock) {
+ @Override public void apply(Socket sock) {
try {
latch.await();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -753,11 +753,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
attachListeners(1, 1);
((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
- @Override
- public void apply(TcpDiscoveryAbstractMessage msg) {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
try {
Thread.sleep(1000000);
- } catch (InterruptedException ignored) {
+ }
+ catch (InterruptedException ignored) {
Thread.interrupted();
}
}
@@ -787,8 +787,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("client-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return checkMetrics(3, 3, 1);
}
}, 10000));
@@ -798,8 +797,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
G.ignite("server-0").compute().broadcast(F.noop());
assertTrue(GridTestUtils.waitForCondition(new PA() {
- @Override
- public boolean apply() {
+ @Override public boolean apply() {
return checkMetrics(3, 3, 2);
}
}, 10000));
@@ -1204,8 +1202,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
clientsPerSrv = CLIENTS;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
- @Override
- public Void call() throws Exception {
+ @Override public Void call() throws Exception {
Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
clientNodeIds.add(g.cluster().localNode().id());
@@ -1297,7 +1294,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
if (changeTop)
clientSpi.pauseAll();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
assertEquals(0, disconnectLatch.getCount());
reconnectLatch.countDown();
@@ -1406,7 +1404,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
startClientNodes(1);
- Ignite srv = G.ignite("server-0");
+ final Ignite srv = G.ignite("server-0");
Ignite client = G.ignite("client-0");
TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
@@ -1461,7 +1459,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
log.info("Fail client connection.");
srvSpi.failClientReconnect.set(1_000_000);
- srvSpi.failNodeAdded.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
clientSpi.brakeConnection();
}
@@ -1474,8 +1472,17 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertFalse(err.get());
- if (!failSrv)
+ if (!failSrv) {
await(srvFailedLatch);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return srv.cluster().nodes().size() == 1;
+ }
+ }, 10_000);
+
+ checkNodes(1, 0);
+ }
}
/**
@@ -1485,10 +1492,6 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
netTimeout = 3000;
joinTimeout = 60_000;
- clientIpFinder = new TcpDiscoveryVmIpFinder();
-
- clientIpFinder.setAddresses(Collections.singleton("localhost:47500..47509"));
-
final CountDownLatch disconnectLatch = new CountDownLatch(1);
final CountDownLatch reconnectLatch = new CountDownLatch(1);
final AtomicBoolean err = new AtomicBoolean(false);
@@ -1501,8 +1504,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
Ignite client = G.ignite("client-0");
client.events().localListen(new IgnitePredicate<Event>() {
- @Override
- public boolean apply(Event evt) {
+ @Override public boolean apply(Event evt) {
if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
log.info("Disconnected event.");
@@ -1510,7 +1512,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertEquals(1, disconnectLatch.getCount());
disconnectLatch.countDown();
- } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
log.info("Reconnected event.");
assertEquals(1, reconnectLatch.getCount());
@@ -1518,7 +1521,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertFalse(err.get());
reconnectLatch.countDown();
- } else {
+ }
+ else {
log.error("Unexpected event: " + evt);
err.set(true);
@@ -1545,6 +1549,11 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+ clientNodeIds.clear();
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ checkNodes(1, 1);
+
assertFalse(err.get());
}
@@ -1552,6 +1561,87 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDisconnectAfterNetworkTimeout() throws Exception {
+ netTimeout = 5000;
+ joinTimeout = 60_000;
+ maxMissedClientHbs = 2;
+
+ startServerNodes(1);
+
+ startClientNodes(1);
+
+ final Ignite srv = G.ignite("server-0");
+ Ignite client = G.ignite("client-0");
+
+ TestTcpDiscoverySpi srvSpi = ((TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi());
+ TestTcpDiscoverySpi clientSpi = ((TestTcpDiscoverySpi)client.configuration().getDiscoverySpi());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ final AtomicBoolean err = new AtomicBoolean(false);
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override
+ public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ log.info("Disconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(1, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ disconnectLatch.countDown();
+ }
+ else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ log.info("Reconnected event.");
+
+ assertEquals(1, reconnectLatch.getCount());
+ assertEquals(0, disconnectLatch.getCount());
+ assertFalse(err.get());
+
+ reconnectLatch.countDown();
+ }
+ else {
+ log.error("Unexpected event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED, EVT_NODE_SEGMENTED);
+
+ log.info("Fail client connection1.");
+
+ srvSpi.failClientReconnect.set(1_000_000);
+ srvSpi.skipNodeAdded = true;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ log.info("Fail client connection2.");
+
+ srvSpi.failClientReconnect.set(0);
+ srvSpi.skipNodeAdded = false;
+
+ clientSpi.brakeConnection();
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ clientNodeIds.clear();
+
+ clientNodeIds.add(client.cluster().localNode().id());
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override
+ public boolean apply() {
+ return srv.cluster().nodes().size() == 2;
+ }
+ }, 10_000);
+
+ checkNodes(1, 1);
+
+ assertFalse(err.get());
}
/**
@@ -1834,6 +1924,9 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/** */
private volatile String delayJoinAckFor;
+ /** */
+ private volatile boolean skipNodeAdded;
+
/**
* @param lock Lock.
*/
@@ -1906,6 +1999,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
boolean fail = false;
+ if (skipNodeAdded &&
+ (msg instanceof TcpDiscoveryNodeAddedMessage || msg instanceof TcpDiscoveryNodeAddFinishedMessage)) {
+ log.info("Skip message: " + msg);
+
+ return;
+ }
+
if (msg instanceof TcpDiscoveryNodeAddedMessage)
fail = failNodeAdded.getAndDecrement() > 0;
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage)