You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/07/10 06:46:23 UTC
[1/6] incubator-ignite git commit: # ignite-gg-10416 Exclude
lifecycleBeans for daemon node.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-901 fa007b1f5 -> 00d151b92
# ignite-gg-10416 Exclude lifecycleBeans for daemon node.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8cc75fc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8cc75fc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8cc75fc5
Branch: refs/heads/ignite-901
Commit: 8cc75fc562706be8aebb837b708c03a9be264027
Parents: 0f1b31a
Author: Andrey <an...@gridgain.com>
Authored: Wed Jul 8 10:50:16 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Wed Jul 8 10:50:16 2015 +0700
----------------------------------------------------------------------
.../org/apache/ignite/visor/commands/open/VisorOpenCommand.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8cc75fc5/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/open/VisorOpenCommand.scala
----------------------------------------------------------------------
diff --git a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/open/VisorOpenCommand.scala b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/open/VisorOpenCommand.scala
index 6498baf..632a96b 100644
--- a/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/open/VisorOpenCommand.scala
+++ b/modules/visor-console/src/main/scala/org/apache/ignite/visor/commands/open/VisorOpenCommand.scala
@@ -144,7 +144,7 @@ class VisorOpenCommand extends VisorConsoleCommand {
try
// Cache, IGFS, indexing SPI configurations should be excluded from daemon node config.
spring.loadConfigurations(url, "cacheConfiguration", "fileSystemConfiguration",
- "indexingSpi").get1()
+ "lifecycleBeans", "indexingSpi").get1()
finally {
if (log4jTup != null)
U.removeLog4jNoOpLogger(log4jTup)
[6/6] incubator-ignite git commit: # ignite-901
Posted by sb...@apache.org.
# ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/00d151b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/00d151b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/00d151b9
Branch: refs/heads/ignite-901
Commit: 00d151b927e8782de309add955f1740e84c7cdd8
Parents: 4053203
Author: sboikov <se...@inria.fr>
Authored: Thu Jul 9 22:19:49 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Fri Jul 10 00:50:15 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/MarshallerContextImpl.java | 11 +-
.../discovery/GridDiscoveryManager.java | 6 +-
.../affinity/GridAffinityAssignmentCache.java | 13 +-
.../processors/cache/GridCacheAdapter.java | 2 +-
.../cache/GridCacheAffinityManager.java | 17 +-
.../cache/GridCacheDeploymentManager.java | 5 -
.../processors/cache/GridCacheMvccManager.java | 3 -
.../GridCachePartitionExchangeManager.java | 57 ++--
.../processors/cache/GridCachePreloader.java | 5 +
.../cache/GridCachePreloaderAdapter.java | 5 +
.../processors/cache/GridCacheProcessor.java | 77 +++--
.../cache/GridCacheSharedContext.java | 52 ++-
.../cache/GridCacheSharedManager.java | 10 +-
.../cache/GridCacheSharedManagerAdapter.java | 18 +-
.../distributed/dht/GridDhtCacheAdapter.java | 10 +
.../dht/preloader/GridDhtPreloader.java | 9 +-
.../distributed/near/GridNearCacheAdapter.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 5 +-
.../cache/version/GridCacheVersionManager.java | 9 +-
.../communication/tcp/TcpCommunicationSpi.java | 322 +++++++++++++------
.../ignite/spi/discovery/tcp/ClientImpl.java | 16 +-
.../IgniteClientReconnectCacheTest.java | 208 +++++++++---
...niteClientReconnectFailoverAbstractTest.java | 5 +-
...ClientReconnectCacheQueriesFailoverTest.java | 39 ++-
24 files changed, 624 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 948babc..9f7c983 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -32,7 +32,7 @@ import java.util.concurrent.*;
*/
public class MarshallerContextImpl extends MarshallerContextAdapter {
/** */
- private CountDownLatch latch = new CountDownLatch(1);
+ private final CountDownLatch latch = new CountDownLatch(1);
/** */
private final File workDir;
@@ -57,15 +57,6 @@ public class MarshallerContextImpl extends MarshallerContextAdapter {
}
/**
- *
- */
- public void onDisconnected() {
- latch = new CountDownLatch(1);
-
- cache = null;
- }
-
- /**
* @param ctx Kernal context.
* @throws IgniteCheckedException In case of error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 986a995..8e7fc97 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -297,6 +297,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
locJoinEvt = new GridFutureAdapter<>();
+ discoCacheHist.clear();
+
+ topHist.clear();
+
registeredCaches.clear();
}
@@ -1044,7 +1048,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
- * @param serverNodesNum Server nodes number.
+ * @param srvNodesNum Server nodes number.
* @param clientNodesNum Client nodes number.
* @param totalCpus Total cpu number.
* @param heap Heap size.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index 6989385..178226d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -69,7 +69,7 @@ public class GridAffinityAssignmentCache {
private IgniteLogger log;
/** Node stop flag. */
- private volatile boolean stopping;
+ private volatile IgniteCheckedException stopErr;
/**
* Constructs affinity cached calculations.
@@ -131,11 +131,8 @@ public class GridAffinityAssignmentCache {
/**
* Kernal stop callback.
*/
- public void onKernalStop() {
- stopping = true;
-
- IgniteCheckedException err =
- new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
+ public void onKernalStop(IgniteCheckedException err) {
+ stopErr = err;
for (AffinityReadyFuture fut : readyFuts.values())
fut.onDone(err);
@@ -312,8 +309,8 @@ public class GridAffinityAssignmentCache {
fut.onDone(topVer);
}
- else if (stopping)
- fut.onDone(new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."));
+ else if (stopErr != null)
+ fut.onDone(stopErr);
return fut;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8a8e096..e70d8e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4508,7 +4508,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
*
*/
- public void onDisconnected() {
+ public void onReconnected() {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index ea17df1..1d0610e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -25,6 +25,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -54,7 +55,21 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
- aff.onKernalStop();
+ IgniteCheckedException err =
+ new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping.");
+
+ aff.onKernalStop(err);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(IgniteFuture reconnectFut) {
+ IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Failed to wait for topology update, client disconnected.");
+
+ aff.onKernalStop(err);
+
+ aff = new GridAffinityAssignmentCache(cctx, cctx.namex(), cctx.config().getAffinity(),
+ cctx.config().getAffinityMapper(), cctx.config().getBackups());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
index fb850f6..677377e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java
@@ -116,11 +116,6 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap
}
/** {@inheritDoc} */
- @Override public boolean restartOnDisconnect() {
- return true;
- }
-
- /** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
if (discoLsnr != null)
cctx.gridEvents().removeLocalEventListener(discoLsnr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index 9ae2209..36e108f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -208,10 +208,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
exchLog = cctx.logger(getClass().getName() + ".exchange");
pendingExplicit = GridConcurrentFactory.newMap();
- }
- /** {@inheritDoc} */
- @Override public void onKernalStart0() throws IgniteCheckedException {
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 47bb279..2a26ec8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -47,7 +47,6 @@ import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.IgniteSystemProperties.*;
import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridKernalState.*;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.*;
@@ -208,11 +207,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
};
/** {@inheritDoc} */
- @Override public boolean restartOnDisconnect() {
- return true;
- }
-
- /** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
@@ -246,8 +240,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
- super.onKernalStart0();
+ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
+ super.onKernalStart0(reconnect);
ClusterNode loc = cctx.localNode();
@@ -275,10 +269,26 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
// Allow discovery events to get processed.
locExchFut.onDone();
- if (log.isDebugEnabled())
- log.debug("Beginning to wait on local exchange future: " + fut);
+ if (reconnect) {
+ fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ try {
+ fut.get();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts())
+ cacheCtx.preloader().onInitialExchangeComplete(null);
+ }
+ catch (IgniteCheckedException e) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts())
+ cacheCtx.preloader().onInitialExchangeComplete(e);
+ }
+ }
+ });
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Beginning to wait on local exchange future: " + fut);
- try {
boolean first = true;
while (true) {
@@ -287,11 +297,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
break;
}
- catch (IgniteClientDisconnectedCheckedException e) {
- log.info("Disconnected while waiting for initial partition map exchange: " + e);
-
- break;
- }
catch (IgniteFutureTimeoutCheckedException ignored) {
if (first) {
U.warn(log, "Failed to wait for initial partition map exchange. " +
@@ -309,18 +314,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridCacheContext cacheCtx : cctx.cacheContexts())
cacheCtx.preloader().onInitialExchangeComplete(null);
- }
- catch (IgniteFutureTimeoutCheckedException e) {
- IgniteCheckedException err = new IgniteCheckedException("Timed out waiting for exchange future: " + fut, e);
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts())
- cacheCtx.preloader().onInitialExchangeComplete(err);
- throw err;
+ if (log.isDebugEnabled())
+ log.debug("Finished waiting for initial exchange: " + fut.exchangeId());
}
-
- if (log.isDebugEnabled())
- log.debug("Finished waiting on local exchange: " + fut.exchangeId());
}
/** {@inheritDoc} */
@@ -337,8 +334,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
new IgniteInterruptedCheckedException("Node is stopping: " + cctx.gridName());
// Finish all exchange futures.
- for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
- f.onDone(err);
+ ExchangeFutureSet exchFuts0 = exchFuts;
+
+ if (exchFuts0 != null) {
+ for (GridDhtPartitionsExchangeFuture f : exchFuts.values())
+ f.onDone(err);
+ }
for (AffinityReadyFuture f : readyFuts.values())
f.onDone(err);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index e0f6181..b8bb08e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -56,6 +56,11 @@ public interface GridCachePreloader {
public void onKernalStop();
/**
+ * Client reconnected callback.
+ */
+ public void onReconnected();
+
+ /**
* Callback by exchange manager when initial partition exchange is complete.
*
* @param err Error, if any happened on initial exchange.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index b4f386f..0adf510 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -87,6 +87,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
this.preloadPred = preloadPred;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 61f7e58..f58ef6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -707,7 +707,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
});
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
+ mgr.onKernalStart(false);
for (GridCacheAdapter<?, ?> cache : caches.values())
onKernalStart(cache);
@@ -799,7 +799,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
// Must call onKernalStart on shared managers after creation of fetched caches.
for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
+ mgr.onKernalStart(false);
for (Map.Entry<String, GridCacheAdapter<?, ?>> e : caches.entrySet()) {
GridCacheAdapter cache = e.getValue();
@@ -918,9 +918,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
@Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
cachesOnDisconnect = new HashMap<>(registeredCaches);
- registeredCaches.clear();
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(
+ ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
- registeredTemplates.clear();
+ for (IgniteInternalFuture fut : pendingFuts.values())
+ ((GridFutureAdapter)fut).onDone(err);
+
+ for (IgniteInternalFuture fut : pendingTemplateFuts.values())
+ ((GridFutureAdapter)fut).onDone(err);
for (GridCacheAdapter cache : caches.values()) {
GridCacheContext cctx = cache.context();
@@ -938,12 +944,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
sharedCtx.onDisconnected(reconnectFut);
- for (GridCacheAdapter cache : caches.values())
- cache.onDisconnected();
+ registeredCaches.clear();
+
+ registeredTemplates.clear();
}
/** {@inheritDoc} */
@Override public void onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
+ List<GridCacheAdapter> reconnected = new ArrayList<>();
+
for (GridCacheAdapter cache : caches.values()) {
String name = cache.name();
@@ -963,9 +972,9 @@ public class GridCacheProcessor extends GridProcessorAdapter {
else
stopped = false;
- cache.context().gate().reconnected(stopped);
-
if (stopped) {
+ cache.context().gate().reconnected(true);
+
sharedCtx.removeCacheContext(cache.ctx);
caches.remove(maskNull(cache.name()));
@@ -974,16 +983,17 @@ public class GridCacheProcessor extends GridProcessorAdapter {
onKernalStop(cache, true);
stopCache(cache, true);
}
- }
+ else {
+ cache.onReconnected();
- marshallerCache().context().preloader().syncFuture().listen(new CIX1<IgniteInternalFuture<?>>() {
- @Override public void applyx(IgniteInternalFuture<?> f) throws IgniteCheckedException {
- ctx.marshallerContext().onMarshallerCachePreloaded(ctx);
+ reconnected.add(cache);
}
- });
+ }
- for (GridCacheSharedManager<?, ?> mgr : sharedCtx.managers())
- mgr.onKernalStart();
+ sharedCtx.onReconnected();
+
+ for (GridCacheAdapter cache : reconnected)
+ cache.context().gate().reconnected(false);
cachesOnDisconnect = null;
}
@@ -2218,8 +2228,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
}
- if (!sndReqs.isEmpty())
- ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
+ Exception err = null;
+
+ if (!sndReqs.isEmpty()) {
+ try {
+ ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(sndReqs));
+
+ if (ctx.clientDisconnected())
+ err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
+ }
+ catch (IgniteException e) {
+ err = e;
+ }
+ }
+
+ if (err != null) {
+ for (DynamicCacheStartFuture fut : res)
+ fut.onDone(err);
+ }
return res;
}
@@ -2855,7 +2882,21 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (old != null)
fut = old;
- ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
+ Exception err = null;
+
+ try {
+ ctx.discovery().sendCustomEvent(new DynamicCacheChangeBatch(Collections.singleton(req)));
+
+ if (ctx.clientDisconnected())
+ err = new IgniteClientDisconnectedCheckedException(ctx.cluster().clientReconnectFuture(),
+ "Failed to execute dynamic cache change request, client node disconnected.");
+ }
+ catch (IgniteException e) {
+ err = e;
+ }
+
+ if (err != null)
+ fut.onDone(err);
fut.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index d0064f3..91a6042 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -110,7 +110,7 @@ public class GridCacheSharedContext<K, V> {
) {
this.kernalCtx = kernalCtx;
- setManagers(txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
+ setManagers(mgrs, txMgr, jtaMgr, verMgr, mvccMgr, depMgr, exchMgr, ioMgr);
this.storeSesLsnrs = storeSesLsnrs;
@@ -125,25 +125,30 @@ public class GridCacheSharedContext<K, V> {
*/
void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size());
- it.hasPrevious();) {
+ it.hasPrevious();) {
GridCacheSharedManager<?, ?> mgr = it.previous();
mgr.onDisconnected(reconnectFut);
- if (mgr.restartOnDisconnect())
+ if (restartOnDisconnect(mgr))
mgr.onKernalStop(true);
}
for (ListIterator<? extends GridCacheSharedManager<?, ?>> it = mgrs.listIterator(mgrs.size()); it.hasPrevious();) {
GridCacheSharedManager<?, ?> mgr = it.previous();
- if (mgr.restartOnDisconnect())
+ if (restartOnDisconnect(mgr))
mgr.stop(true);
}
+ }
- mgrs = new LinkedList<>();
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ void onReconnected() throws IgniteCheckedException {
+ List<GridCacheSharedManager<K, V>> mgrs = new LinkedList<>();
- setManagers(txMgr,
+ setManagers(mgrs, txMgr,
jtaMgr,
verMgr,
mvccMgr,
@@ -151,10 +156,23 @@ public class GridCacheSharedContext<K, V> {
new GridCachePartitionExchangeManager<K, V>(),
ioMgr);
+ this.mgrs = mgrs;
+
for (GridCacheSharedManager<K, V> mgr : mgrs) {
- if (mgr.restartOnDisconnect())
+ if (restartOnDisconnect(mgr))
mgr.start(this);
}
+
+ for (GridCacheSharedManager<?, ?> mgr : mgrs)
+ mgr.onKernalStart(true);
+ }
+
+ /**
+ * @param mgr Manager.
+ * @return {@code True} if manager is restarted cn reconnect.
+ */
+ private boolean restartOnDisconnect(GridCacheSharedManager<?, ?> mgr) {
+ return mgr instanceof GridCacheDeploymentManager || mgr instanceof GridCachePartitionExchangeManager;
}
/**
@@ -166,20 +184,21 @@ public class GridCacheSharedContext<K, V> {
* @param ioMgr IO manager.
* @param jtaMgr JTA manager.
*/
- private void setManagers(IgniteTxManager txMgr,
+ private void setManagers(List<GridCacheSharedManager<K, V>> mgrs,
+ IgniteTxManager txMgr,
CacheJtaManagerAdapter jtaMgr,
GridCacheVersionManager verMgr,
GridCacheMvccManager mvccMgr,
GridCacheDeploymentManager<K, V> depMgr,
GridCachePartitionExchangeManager<K, V> exchMgr,
GridCacheIoManager ioMgr) {
- this.mvccMgr = add(mvccMgr);
- this.verMgr = add(verMgr);
- this.txMgr = add(txMgr);
- this.jtaMgr = add(jtaMgr);
- this.depMgr = add(depMgr);
- this.exchMgr = add(exchMgr);
- this.ioMgr = add(ioMgr);
+ this.mvccMgr = add(mgrs, mvccMgr);
+ this.verMgr = add(mgrs, verMgr);
+ this.txMgr = add(mgrs, txMgr);
+ this.jtaMgr = add(mgrs, jtaMgr);
+ this.depMgr = add(mgrs, depMgr);
+ this.exchMgr = add(mgrs, exchMgr);
+ this.ioMgr = add(mgrs, ioMgr);
}
/**
@@ -620,7 +639,8 @@ public class GridCacheSharedContext<K, V> {
* @param mgr Manager to add.
* @return Added manager.
*/
- @Nullable private <T extends GridCacheSharedManager<K, V>> T add(@Nullable T mgr) {
+ @Nullable private <T extends GridCacheSharedManager<K, V>> T add(List<GridCacheSharedManager<K, V>> mgrs,
+ @Nullable T mgr) {
if (mgr != null)
mgrs.add(mgr);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
index 9739175..668bd00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManager.java
@@ -23,7 +23,7 @@ import org.apache.ignite.lang.*;
/**
* Cache manager shared across all caches.
*/
-public interface GridCacheSharedManager <K, V> {
+public interface GridCacheSharedManager<K, V> {
/**
* Starts manager.
*
@@ -40,9 +40,10 @@ public interface GridCacheSharedManager <K, V> {
public void stop(boolean cancel);
/**
+ * @param reconnect {@code True} if manager restarted after client reconnect.
* @throws IgniteCheckedException If failed.
*/
- public void onKernalStart() throws IgniteCheckedException;
+ public void onKernalStart(boolean reconnect) throws IgniteCheckedException;
/**
* @param cancel Cancel flag.
@@ -60,9 +61,4 @@ public interface GridCacheSharedManager <K, V> {
* NOTE: this method is for testing and profiling purposes only.
*/
public void printMemoryStats();
-
- /**
- * @return {@code True} if manager is restarted when client disconnects.
- */
- public boolean restartOnDisconnect();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
index 8029d49..3ad0759 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedManagerAdapter.java
@@ -36,6 +36,9 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/** Starting flag. */
private final AtomicBoolean starting = new AtomicBoolean(false);
+ /** */
+ private final AtomicBoolean stop = new AtomicBoolean(false);
+
/** {@inheritDoc} */
@Override public final void start(GridCacheSharedContext<K, V> cctx) throws IgniteCheckedException {
if (!starting.compareAndSet(false, true))
@@ -76,7 +79,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/** {@inheritDoc} */
@Override public final void stop(boolean cancel) {
- if (!starting.get())
+ if (!starting.get() || !stop.compareAndSet(false, true))
// Ignoring attempt to stop manager that has never been started.
return;
@@ -94,10 +97,10 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
- @Override public final void onKernalStart() throws IgniteCheckedException {
- onKernalStart0();
+ @Override public final void onKernalStart(boolean reconnect) throws IgniteCheckedException {
+ onKernalStart0(reconnect);
- if (log != null && log.isDebugEnabled())
+ if (!reconnect && log != null && log.isDebugEnabled())
log.debug(kernalStartInfo());
}
@@ -116,7 +119,7 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
/**
* @throws IgniteCheckedException If failed.
*/
- protected void onKernalStart0() throws IgniteCheckedException {
+ protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
// No-op.
}
@@ -166,11 +169,6 @@ public class GridCacheSharedManagerAdapter<K, V> implements GridCacheSharedManag
}
/** {@inheritDoc} */
- @Override public boolean restartOnDisconnect() {
- return false;
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheSharedManagerAdapter.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index adea9e0..90497b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -134,6 +134,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ super.onReconnected();
+
+ top = new GridDhtPartitionTopologyImpl(ctx);
+
+ if (preldr != null)
+ preldr.onReconnected();
+ }
+
+ /** {@inheritDoc} */
@Override public void onKernalStart() throws IgniteCheckedException {
super.onKernalStart();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index f33f791..5b2a14a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -67,7 +67,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
private GridDhtPartitionDemandPool demandPool;
/** Start future. */
- private final GridFutureAdapter<Object> startFut;
+ private GridFutureAdapter<Object> startFut;
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
@@ -239,6 +239,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
}
/** {@inheritDoc} */
+ @Override public void onReconnected() {
+ startFut = new GridFutureAdapter<>();
+
+ top = cctx.dht().topology();
+ }
+
+ /** {@inheritDoc} */
@Override public void onExchangeFutureAdded() {
demandPool.onExchangeFutureAdded();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 688299a..79b7c1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -90,7 +90,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda
public abstract GridDhtCacheAdapter<K, V> dht();
/** {@inheritDoc} */
- @Override public void onDisconnected() {
+ @Override public void onReconnected() {
map = new GridCacheConcurrentMap(ctx,
ctx.config().getNearConfiguration().getNearStartSize(),
0.75F,
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 1747de9..caaa22d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -115,7 +115,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
new ConcurrentHashMap8<>(5120);
/** {@inheritDoc} */
- @Override protected void onKernalStart0() {
+ @Override protected void onKernalStart0(boolean reconnect) {
+ if (reconnect)
+ return;
+
cctx.gridEvents().addLocalEventListener(
new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index c776361..90919c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -78,13 +78,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
};
- /**
- * @return Pre-generated UUID.
- */
- private IgniteUuid uuid() {
- return IgniteUuid.randomUuid();
- }
-
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled();
@@ -95,7 +88,7 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter {
}
/** {@inheritDoc} */
- @Override protected void onKernalStart0() throws IgniteCheckedException {
+ @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
for (ClusterNode n : cctx.discovery().remoteNodes())
onReceived(n.id(), n.metrics().getLastDataVersion());
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 8ea2b82..d99a764 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -228,6 +228,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** */
public static final byte HANDSHAKE_MSG_TYPE = -3;
+ /** */
+ private ConnectGateway connectGate;
+
/** Server listener. */
private final GridNioServerListener<Message> srvLsnr =
new GridNioServerListenerAdapter<Message>() {
@@ -289,136 +292,163 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- @Override public void onMessage(GridNioSession ses, Message msg) {
- UUID sndId = ses.meta(NODE_ID_META);
+ /**
+ * @param ses Session.
+ * @param msg Message.
+ */
+ private void onFirstMessage(GridNioSession ses, Message msg) {
+ UUID sndId;
- if (sndId == null) {
- assert ses.accepted();
+ if (msg instanceof NodeIdMessage)
+ sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+ else {
+ assert msg instanceof HandshakeMessage : msg;
- if (msg instanceof NodeIdMessage)
- sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
- else {
- assert msg instanceof HandshakeMessage : msg;
+ sndId = ((HandshakeMessage)msg).nodeId();
+ }
- sndId = ((HandshakeMessage)msg).nodeId();
- }
+ if (log.isDebugEnabled())
+ log.debug("Remote node ID received: " + sndId);
- if (log.isDebugEnabled())
- log.debug("Remote node ID received: " + sndId);
+ final UUID old = ses.addMeta(NODE_ID_META, sndId);
- final UUID old = ses.addMeta(NODE_ID_META, sndId);
+ assert old == null;
- assert old == null;
+ final ClusterNode rmtNode = getSpiContext().node(sndId);
- final ClusterNode rmtNode = getSpiContext().node(sndId);
+ if (rmtNode == null) {
+ if (log.isDebugEnabled())
+ log.debug("Close incoming connection, unknown node: " + sndId);
- if (rmtNode == null) {
- ses.close();
+ ses.close();
- return;
- }
+ return;
+ }
- ClusterNode locNode = getSpiContext().localNode();
+ ClusterNode locNode = getSpiContext().localNode();
- if (ses.remoteAddress() == null)
- return;
+ if (ses.remoteAddress() == null)
+ return;
- GridCommunicationClient oldClient = clients.get(sndId);
+ GridCommunicationClient oldClient = clients.get(sndId);
- boolean hasShmemClient = false;
+ boolean hasShmemClient = false;
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
- ses.send(new RecoveryLastReceivedMessage(-1));
+ ses.send(new RecoveryLastReceivedMessage(-1));
- return;
- }
- else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
- hasShmemClient = true;
- }
+ hasShmemClient = true;
}
+ }
- GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
+ GridFutureAdapter<GridCommunicationClient> fut = new GridFutureAdapter<>();
- GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
+ GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(sndId, fut);
- assert msg instanceof HandshakeMessage : msg;
+ assert msg instanceof HandshakeMessage : msg;
- HandshakeMessage msg0 = (HandshakeMessage)msg;
+ HandshakeMessage msg0 = (HandshakeMessage)msg;
- final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
+ final GridNioRecoveryDescriptor recoveryDesc = recoveryDescriptor(rmtNode);
- if (oldFut == null) {
- oldClient = clients.get(sndId);
+ if (oldFut == null) {
+ oldClient = clients.get(sndId);
- if (oldClient != null) {
- if (oldClient instanceof GridTcpNioCommunicationClient) {
- if (log.isDebugEnabled())
- log.debug("Received incoming connection when already connected " +
+ if (oldClient != null) {
+ if (oldClient instanceof GridTcpNioCommunicationClient) {
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection when already connected " +
"to this node, rejecting [locNode=" + locNode.id() +
", rmtNode=" + sndId + ']');
- ses.send(new RecoveryLastReceivedMessage(-1));
+ ses.send(new RecoveryLastReceivedMessage(-1));
- return;
- }
- else {
- assert oldClient instanceof GridShmemCommunicationClient;
+ return;
+ }
+ else {
+ assert oldClient instanceof GridShmemCommunicationClient;
- hasShmemClient = true;
- }
+ hasShmemClient = true;
}
+ }
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (log.isDebugEnabled())
- log.debug("Received incoming connection from remote node " +
+ if (log.isDebugEnabled())
+ log.debug("Received incoming connection from remote node " +
"[rmtNode=" + rmtNode.id() + ", reserved=" + reserved + ']');
- if (reserved) {
- try {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ try {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
- finally {
- clientFuts.remove(rmtNode.id(), fut);
- }
+ fut.onDone(client);
+ }
+ finally {
+ clientFuts.remove(rmtNode.id(), fut);
}
}
- else {
- if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
- if (log.isDebugEnabled()) {
- log.debug("Received incoming connection from remote node while " +
+ }
+ else {
+ if (oldFut instanceof ConnectFuture && locNode.order() < rmtNode.order()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Received incoming connection from remote node while " +
"connecting to this node, rejecting [locNode=" + locNode.id() +
", locNodeOrder=" + locNode.order() + ", rmtNode=" + rmtNode.id() +
", rmtNodeOrder=" + rmtNode.order() + ']');
- }
-
- ses.send(new RecoveryLastReceivedMessage(-1));
}
- else {
- boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
+
+ ses.send(new RecoveryLastReceivedMessage(-1));
+ }
+ else {
+ boolean reserved = recoveryDesc.tryReserve(msg0.connectCount(),
new ConnectClosure(ses, recoveryDesc, rmtNode, msg0, !hasShmemClient, fut));
- if (reserved) {
- GridTcpNioCommunicationClient client =
+ if (reserved) {
+ GridTcpNioCommunicationClient client =
connected(recoveryDesc, ses, rmtNode, msg0.received(), true, !hasShmemClient);
- fut.onDone(client);
- }
+ fut.onDone(client);
}
}
}
+ }
+
+ @Override public void onMessage(GridNioSession ses, Message msg) {
+ UUID sndId = ses.meta(NODE_ID_META);
+
+ if (sndId == null) {
+ assert ses.accepted() : ses;
+
+ if (!connectGate.tryEnter()) {
+ if (log.isDebugEnabled())
+ log.debug("Close incoming connection, failed to enter gateway.");
+
+ ses.close();
+
+ return;
+ }
+
+ try {
+ onFirstMessage(ses, msg);
+ }
+ finally {
+ connectGate.leave();
+ }
+ }
else {
rcvdMsgsCnt.increment();
@@ -737,24 +767,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
assert evt instanceof DiscoveryEvent : evt;
- assert evt.type() == EVT_NODE_LEFT ||
- evt.type() == EVT_NODE_FAILED ||
- evt.type() == EVT_CLIENT_NODE_DISCONNECTED : evt;
-
- if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
- for (GridCommunicationClient client : clients.values())
- client.forceClose();
-
- IgniteCheckedException err = new IgniteCheckedException("Failed to connect to node, " +
- "local node node disconnected.");
-
- for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
- clientFut.onDone(err);
+ assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED ;
- recoveryDescs.clear();
- }
- else
- onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
+ onNodeLeft(((DiscoveryEvent)evt).eventNode().id());
}
};
@@ -1356,6 +1371,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
registerMBean(gridName, this, TcpCommunicationSpiMBean.class);
+ connectGate = new ConnectGateway();
+
if (shmemSrv != null) {
shmemAcceptWorker = new ShmemAcceptWorker(shmemSrv);
@@ -1381,7 +1398,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (boundTcpShmemPort > 0)
spiCtx.registerPort(boundTcpShmemPort, IgnitePortProtocol.TCP);
- spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_CLIENT_NODE_DISCONNECTED);
+ spiCtx.addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
ctxInitLatch.countDown();
}
@@ -1618,6 +1635,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
// Safety.
ctxInitLatch.countDown();
+ if (connectGate != null)
+ connectGate.stopped();
+
// Force closing.
for (GridCommunicationClient client : clients.values())
client.forceClose();
@@ -1627,6 +1647,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
getSpiContext().removeLocalEventListener(discoLsnr);
}
+ /** {@inheritDoc} */
+ @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
+ connectGate.disconnected(reconnectFut);
+
+ for (GridCommunicationClient client : clients.values())
+ client.forceClose();
+
+ IgniteClientDisconnectedCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut,
+ "Failed to connect client node disconnected.");
+
+ for (GridFutureAdapter<GridCommunicationClient> clientFut : clientFuts.values())
+ clientFut.onDone(err);
+
+ recoveryDescs.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClientReconnected(boolean clusterRestarted) {
+ connectGate.reconnected();
+ }
+
/**
* @param nodeId Left node ID.
*/
@@ -1842,7 +1883,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
}
- return createTcpClient(node);
+ connectGate.enter();
+
+ try {
+ return createTcpClient(node);
+ }
+ finally {
+ connectGate.leave();
+ }
}
/**
@@ -3159,4 +3207,86 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
return S.toString(NodeIdMessage.class, this);
}
}
+
+ /**
+ *
+ */
+ private class ConnectGateway {
+ /** */
+ private GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+
+ /** */
+ private IgniteException err;
+
+ /**
+ *
+ */
+ void enter() {
+ lock.readLock();
+
+ if (err != null) {
+ lock.readUnlock();
+
+ throw err;
+ }
+ }
+
+ /**
+ * @return {@code True} if entered gateway.
+ */
+ boolean tryEnter() {
+ lock.readLock();
+
+ boolean res = err == null;
+
+ if (!res)
+ lock.readUnlock();
+
+ return res;
+ }
+
+ /**
+ *
+ */
+ void leave() {
+ lock.readUnlock();
+ }
+
+ /**
+ * @param reconnectFut Reconnect future.
+ */
+ void disconnected(IgniteFuture<?> reconnectFut) {
+ lock.writeLock();
+
+ err = new IgniteClientDisconnectedException(reconnectFut, "Failed to connect, client node disconnected.");
+
+ lock.writeUnlock();
+ }
+
+ /**
+ *
+ */
+ void reconnected() {
+ lock.writeLock();
+
+ try {
+ if (err instanceof IgniteClientDisconnectedException)
+ err = null;
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ *
+ */
+ void stopped() {
+ lock.readLock();
+
+ err = new IgniteException("Failed to connect, node stopped.");
+
+ lock.readUnlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 8041a63..404c71d 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -94,6 +95,10 @@ class ClientImpl extends TcpDiscoveryImpl {
/** */
protected MessageWorker msgWorker;
+ /** */
+ @GridToStringExclude
+ private int joinCnt;
+
/**
* @param adapter Adapter.
*/
@@ -1215,13 +1220,14 @@ class ClientImpl extends TcpDiscoveryImpl {
else {
log.info("Reconnected failed, will try join.");
- if (state != ClientImpl.State.DISCONNECTED) {
+ if (state == ClientImpl.State.STARTING || state == ClientImpl.State.CONNECTED) {
state = ClientImpl.State.DISCONNECTED;
nodeAdded = false;
IgniteClientDisconnectedCheckedException err =
- new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, client node disconnected.");
+ new IgniteClientDisconnectedCheckedException(null, "Failed to ping node, " +
+ "client node disconnected.");
for (Map.Entry<UUID, GridFutureAdapter<Boolean>> e : pingFuts.entrySet()) {
GridFutureAdapter<Boolean> fut = e.getValue();
@@ -1298,6 +1304,8 @@ class ClientImpl extends TcpDiscoveryImpl {
log.info("Try join topology with timeout: " + spi.joinTimeout);
+ joinCnt++;
+
final Socket sock = joinTopology(false, spi.joinTimeout);
if (sock == null) {
@@ -1319,9 +1327,11 @@ class ClientImpl extends TcpDiscoveryImpl {
sockWriter.setSocket(sock);
if (spi.joinTimeout > 0) {
+ final int joinCnt0 = joinCnt;
+
timer.schedule(new TimerTask() {
@Override public void run() {
- if (joining())
+ if (joinCnt == joinCnt0 && joining())
queue.add(JOIN_TIMEOUT);
}
}, spi.joinTimeout);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 802277c..36ea63f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -35,10 +35,12 @@ import org.apache.ignite.plugin.extensions.communication.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.spi.*;
import org.apache.ignite.spi.communication.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.testframework.*;
import org.apache.ignite.transactions.*;
import javax.cache.*;
+import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
@@ -64,9 +66,6 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
/** */
private UUID nodeId;
- /** */
- private Map<IgnitePredicate<? extends Event>, int[]> lsnrs;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -75,18 +74,14 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
cfg.setPeerClassLoadingEnabled(false);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setNetworkTimeout(5000);
+
if (nodeId != null) {
cfg.setNodeId(nodeId);
nodeId = null;
}
- if (lsnrs != null) {
- cfg.setLocalEventListeners(lsnrs);
-
- lsnrs = null;
- }
-
CacheConfiguration ccfg = new CacheConfiguration();
ccfg.setName(STATIC_CACHE);
@@ -631,7 +626,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
/**
* @throws Exception If failed.
*/
- public void _testReconnectInitialExchangeInProgress() throws Exception {
+ public void testReconnectInitialExchangeInProgress() throws Exception {
final UUID clientId = UUID.randomUUID();
Ignite srv = grid(0);
@@ -658,26 +653,28 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
nodeId = clientId;
- lsnrs = new HashMap<>();
-
- final CountDownLatch reconnectLatch = new CountDownLatch(1);
+ IgniteInternalFuture<Boolean> fut = GridTestUtils.runAsync(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ try {
+ Ignition.start(getConfiguration(getTestGridName(SRV_CNT)));
- lsnrs.put(new IgnitePredicate<Event>() {
- @Override public boolean apply(Event evt) {
- if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
- info("Reconnected: " + evt);
+ fail();
- reconnectLatch.countDown();
+ return false;
}
+ catch (IgniteClientDisconnectedException e) {
+ log.info("Expected start error: " + e);
- return true;
- }
- }, new int[]{EVT_CLIENT_NODE_RECONNECTED});
+ try {
+ e.reconnectFuture().get();
- IgniteInternalFuture<Ignite> fut = GridTestUtils.runAsync(new Callable<Ignite>() {
- @Override public Ignite call() throws Exception {
- try {
- return startGrid(SRV_CNT);
+ fail();
+ }
+ catch (IgniteException e0) {
+ log.info("Expected future error: " + e0);
+ }
+
+ return true;
}
catch (Throwable e) {
log.error("Unexpected error: " + e, e);
@@ -697,34 +694,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
srvSpi.failNode(clientId, null);
- log.info("Wait reconnect.");
-
- assertTrue(reconnectLatch.await(10 * 60_000, MILLISECONDS));
-
- try {
- srvCommSpi.stopBlock(true);
-
- fail();
- }
- catch (IgniteException e) {
- log.info("Expected error: " + e);
- }
-
- Ignite client = fut.get();
-
- CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>();
-
- ccfg.setName("newCache");
-
- ccfg.setCacheMode(REPLICATED);
+ srvCommSpi.stopBlock(false);
- log.info("Start new cache.");
-
- IgniteCache<Object, Object> cache = client.getOrCreateCache(ccfg);
-
- cache.put(1, 1);
-
- assertEquals(1, cache.get(1));
+ assertTrue(fut.get());
}
/**
@@ -887,6 +859,138 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
}
/**
+ * @throws Exception If failed.
+ */
+ public void testReconnectMarshallerCache() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final Ignite srv = clientRouter(client);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+ final IgniteCache<Object, Object> srvCache = srv.cache(null);
+
+ assertNotNull(srvCache);
+
+ clientCache.put(1, new TestClass1());
+ srvCache.put(2, new TestClass2());
+
+ reconnectClientNode(client, srv, new Runnable() {
+ @Override public void run() {
+ assertNotNull(srvCache.get(1));
+ assertNotNull(srvCache.get(2));
+
+ srvCache.put(3, new TestClass3());
+ }
+ });
+
+ srvCache.put(4, new TestClass4());
+
+ assertNotNull(clientCache.get(1));
+ assertNotNull(clientCache.get(2));
+ assertNotNull(clientCache.get(3));
+ assertNotNull(clientCache.get(4));
+
+ clientCache.put(5, new TestClass5());
+
+ assertNotNull(srvCache.get(5));
+ assertNotNull(clientCache.get(5));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testReconnectClusterRestart() throws Exception {
+ clientMode = true;
+
+ final Ignite client = startGrid(SRV_CNT);
+
+ assertTrue(client.cluster().localNode().isClient());
+
+ final CountDownLatch disconnectLatch = new CountDownLatch(1);
+ final CountDownLatch reconnectLatch = new CountDownLatch(1);
+
+ final IgniteCache<Object, Object> clientCache = client.getOrCreateCache(new CacheConfiguration<>());
+
+ clientCache.put(1, new TestClass1());
+
+ client.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+ info("Disconnected: " + evt);
+
+ disconnectLatch.countDown();
+ } else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+ info("Reconnected: " + evt);
+
+ reconnectLatch.countDown();
+ }
+
+ return true;
+ }
+ }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+ for (int i = 0; i < SRV_CNT; i++)
+ stopGrid(i);
+
+ assertTrue(disconnectLatch.await(10_000, MILLISECONDS));
+
+ clientMode = false;
+
+ Ignite srv = startGrid(0);
+
+ assertTrue(reconnectLatch.await(10_000, MILLISECONDS));
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ return clientCache.get(1);
+ }
+ }, IllegalStateException.class, null);
+
+ IgniteCache<Object, Object> srvCache = srv.getOrCreateCache(new CacheConfiguration<>());
+
+ srvCache.put(1, new TestClass1());
+ srvCache.put(2, new TestClass2());
+
+ IgniteCache<Object, Object> clientCache2 = client.cache(null);
+
+ assertNotNull(clientCache2);
+
+ assertNotNull(clientCache2.get(1));
+ assertNotNull(clientCache2.get(2));
+ }
+
+ /**
+ *
+ */
+ static class TestClass1 implements Serializable {
+ int val;
+ }
+
+ /**
+ *
+ */
+ static class TestClass2 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass3 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass4 implements Serializable {}
+
+ /**
+ *
+ */
+ static class TestClass5 implements Serializable {}
+
+ /**
* @param client Client.
* @param ccfg Cache configuration.
* @param msgToBlock Message to block.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
index 551cb1a..ea8f7af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverAbstractTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.testframework.*;
import javax.cache.*;
@@ -51,6 +52,8 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl
cfg.setPeerClassLoadingEnabled(false);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setJoinTimeout(30_000);
+
return cfg;
}
@@ -156,7 +159,7 @@ public abstract class IgniteClientReconnectFailoverAbstractTest extends IgniteCl
String err = null;
while (System.currentTimeMillis() < stopTime && !fut.isDone()) {
- U.sleep(100);
+ U.sleep(500);
CountDownLatch disconnectLatch = new CountDownLatch(1);
CountDownLatch reconnectLatch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/00d151b9/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
index b55ac57..127745b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClientReconnectCacheQueriesFailoverTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import javax.cache.*;
import java.util.*;
import java.util.concurrent.*;
@@ -65,17 +66,33 @@ public class IgniteClientReconnectCacheQueriesFailoverTest extends IgniteClientR
@Override public Void call() throws Exception {
SqlQuery<Integer, Person> sqlQry = new SqlQuery<>(Person.class, "where id > 1");
- assertEquals(9999, cache.query(sqlQry).getAll().size());
-
- SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p");
-
- List<List<?>> res = cache.query(fieldsQry).getAll();
-
- assertEquals(1, res.size());
-
- Double avg = (Double)res.get(0).get(0);
-
- assertEquals(5_000, avg.intValue());
+ try {
+ assertEquals(9999, cache.query(sqlQry).getAll().size());
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ throw e;
+ else
+ log.info("Ignore error: " + e);
+ }
+
+ try {
+ SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select avg(p.id) from Person p");
+
+ List<List<?>> res = cache.query(fieldsQry).getAll();
+
+ assertEquals(1, res.size());
+
+ Double avg = (Double)res.get(0).get(0);
+
+ assertEquals(5_000, avg.intValue());
+ }
+ catch (CacheException e) {
+ if (e.getCause() instanceof IgniteClientDisconnectedException)
+ throw e;
+ else
+ log.info("Ignore error: " + e);
+ }
return null;
}
[2/6] incubator-ignite git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg-10416
Posted by sb...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-gg-10416
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d04c1042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d04c1042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d04c1042
Branch: refs/heads/ignite-901
Commit: d04c1042ec120027c1ddc84d672726db7b7135b0
Parents: 8cc75fc c134dcf
Author: Andrey <an...@gridgain.com>
Authored: Thu Jul 9 17:04:20 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Thu Jul 9 17:04:20 2015 +0700
----------------------------------------------------------------------
RELEASE_NOTES.txt | 12 ++
.../src/main/java/org/apache/ignite/Ignite.java | 2 +-
.../configuration/CacheConfiguration.java | 4 +
.../configuration/TransactionConfiguration.java | 23 +++
.../apache/ignite/internal/IgniteKernal.java | 32 +--
.../processors/cache/GridCacheAttributes.java | 3 +
.../processors/cache/GridCacheContext.java | 8 +-
.../processors/cache/GridCacheIoManager.java | 8 +-
.../processors/cache/GridCacheProcessor.java | 118 ++++-------
.../cache/GridCacheSharedContext.java | 15 +-
.../distributed/near/GridNearGetFuture.java | 4 +-
.../cache/jta/CacheJtaManagerAdapter.java | 17 +-
.../cache/jta/CacheNoopJtaManager.java | 2 +-
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../datastructures/DataStructuresProcessor.java | 39 +++-
.../GridCacheCountDownLatchImpl.java | 15 +-
.../visor/cache/VisorCacheConfiguration.java | 11 -
.../ignite/spi/discovery/tcp/ServerImpl.java | 20 ++
.../tcp/internal/TcpDiscoveryNode.java | 2 +-
.../tcp/internal/TcpDiscoveryNodesRing.java | 8 +-
.../tcp/internal/TcpDiscoveryStatistics.java | 10 +-
...cheStoreSessionListenerAbstractSelfTest.java | 1 -
.../cache/CacheFutureExceptionSelfTest.java | 161 +++++++--------
.../IgniteCacheConfigurationTemplateTest.java | 26 +--
.../cache/IgniteDynamicCacheStartSelfTest.java | 16 +-
.../IgniteDynamicClientCacheStartSelfTest.java | 5 +-
.../IgniteClientDataStructuresAbstractTest.java | 109 +++++++---
.../IgniteCountDownLatchAbstractSelfTest.java | 12 +-
...acheAtomicReplicatedNodeRestartSelfTest.java | 8 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../tcp/TcpDiscoveryMultiThreadedTest.java | 38 ++++
.../IgniteSpiDiscoverySelfTestSuite.java | 3 +
.../HibernateTransactionalDataRegion.java | 12 +-
.../hibernate/HibernateL2CacheSelfTest.java | 7 +-
.../HibernateL2CacheTransactionalSelfTest.java | 5 -
.../apache/ignite/cache/jta/CacheTmLookup.java | 3 +-
.../processors/cache/jta/CacheJtaManager.java | 72 ++++++-
.../cache/jta/GridCacheXAResource.java | 16 +-
.../processors/cache/GridCacheJtaSelfTest.java | 52 +++--
.../GridTmLookupLifecycleAwareSelfTest.java | 29 ++-
modules/kafka/licenses/apache-2.0.txt | 202 +++++++++++++++++++
modules/kafka/pom.xml | 11 -
.../commands/cache/VisorCacheCommand.scala | 2 -
.../config/benchmark-put-indexed-val.properties | 64 ++++++
modules/yardstick/config/ignite-base-config.xml | 23 +++
.../cache/IgnitePutIndexedValue1Benchmark.java | 42 ++++
.../cache/IgnitePutIndexedValue2Benchmark.java | 42 ++++
.../cache/IgnitePutIndexedValue8Benchmark.java | 42 ++++
.../ignite/yardstick/cache/model/Person1.java | 55 +++++
.../ignite/yardstick/cache/model/Person2.java | 67 ++++++
.../ignite/yardstick/cache/model/Person8.java | 109 ++++++++++
51 files changed, 1219 insertions(+), 376 deletions(-)
----------------------------------------------------------------------
[3/6] incubator-ignite git commit: # ignite-gg-10416 Fixed tests.
Posted by sb...@apache.org.
# ignite-gg-10416 Fixed tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/546d5955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/546d5955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/546d5955
Branch: refs/heads/ignite-901
Commit: 546d5955a1fdb4a16c186242945d4a27ba13c52c
Parents: d04c104
Author: Andrey <an...@gridgain.com>
Authored: Thu Jul 9 17:14:10 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Thu Jul 9 17:14:10 2015 +0700
----------------------------------------------------------------------
.../util/spring/IgniteSpringHelperImpl.java | 10 +++++-----
.../spring/IgniteExcludeInConfigurationTest.java | 5 ++++-
.../org/apache/ignite/spring/sprint-exclude.xml | 19 +++++++++++++++++++
3 files changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546d5955/modules/spring/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelperImpl.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelperImpl.java b/modules/spring/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelperImpl.java
index 6cfca36..435f522 100644
--- a/modules/spring/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelperImpl.java
+++ b/modules/spring/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelperImpl.java
@@ -422,6 +422,8 @@ public class IgniteSpringHelperImpl implements IgniteSpringHelper {
GenericApplicationContext springCtx = new GenericApplicationContext();
if (excludedProps.length > 0) {
+ final List<String> excludedPropsList = Arrays.asList(excludedProps);
+
BeanFactoryPostProcessor postProc = new BeanFactoryPostProcessor() {
/**
* @param def Registered BeanDefinition.
@@ -433,12 +435,10 @@ public class IgniteSpringHelperImpl implements IgniteSpringHelper {
while (iterVals.hasNext()) {
PropertyValue val = iterVals.next();
- for (String excludedProp : excludedProps) {
- if (val.getName().equals(excludedProp)) {
- iterVals.remove();
+ if (excludedPropsList.contains(val.getName())) {
+ iterVals.remove();
- return;
- }
+ continue;
}
if (val.getValue() instanceof Iterable) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546d5955/modules/spring/src/test/java/org/apache/ignite/spring/IgniteExcludeInConfigurationTest.java
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/spring/IgniteExcludeInConfigurationTest.java b/modules/spring/src/test/java/org/apache/ignite/spring/IgniteExcludeInConfigurationTest.java
index 1edca77..b708f21 100644
--- a/modules/spring/src/test/java/org/apache/ignite/spring/IgniteExcludeInConfigurationTest.java
+++ b/modules/spring/src/test/java/org/apache/ignite/spring/IgniteExcludeInConfigurationTest.java
@@ -40,7 +40,8 @@ public class IgniteExcludeInConfigurationTest extends GridCommonAbstractTest {
public void testExclude() throws Exception {
IgniteSpringHelper spring = SPRING.create(false);
- Collection<IgniteConfiguration> cfgs = spring.loadConfigurations(cfgLocation, "typeMetadata").get1();
+ Collection<IgniteConfiguration> cfgs = spring.loadConfigurations(cfgLocation, "fileSystemConfiguration",
+ "typeMetadata").get1();
assertNotNull(cfgs);
assertEquals(1, cfgs.size());
@@ -50,6 +51,8 @@ public class IgniteExcludeInConfigurationTest extends GridCommonAbstractTest {
assertEquals(1, cfg.getCacheConfiguration().length);
assertNull(cfg.getCacheConfiguration()[0].getTypeMetadata());
+ assertNull(cfg.getFileSystemConfiguration());
+
cfgs = spring.loadConfigurations(cfgLocation, "keyType").get1();
assertNotNull(cfgs);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546d5955/modules/spring/src/test/java/org/apache/ignite/spring/sprint-exclude.xml
----------------------------------------------------------------------
diff --git a/modules/spring/src/test/java/org/apache/ignite/spring/sprint-exclude.xml b/modules/spring/src/test/java/org/apache/ignite/spring/sprint-exclude.xml
index 494f786..e6bf426 100644
--- a/modules/spring/src/test/java/org/apache/ignite/spring/sprint-exclude.xml
+++ b/modules/spring/src/test/java/org/apache/ignite/spring/sprint-exclude.xml
@@ -29,6 +29,25 @@
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+ <property name="fileSystemConfiguration">
+ <list>
+ <bean class="org.apache.ignite.configuration.FileSystemConfiguration">
+ <property name="name" value="test"/>
+ <property name="metaCacheName" value="meta"/>
+ <property name="dataCacheName" value="data"/>
+
+ <property name="maxSpaceSize" value="#{100L * 1024 * 1024}"/>
+
+ <!-- Loopback endpoint. -->
+ <property name="ipcEndpointConfiguration">
+ <bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration">
+ <property name="type" value="TCP" />
+ </bean>
+ </property>
+ </bean>
+ </list>
+ </property>
+
<!-- Cache configurations (all properties are optional). -->
<property name="cacheConfiguration">
<list>
[4/6] incubator-ignite git commit: # Hive version changed in tests:
1.2.0 -> 1.2.1
Posted by sb...@apache.org.
# Hive version changed in tests: 1.2.0 -> 1.2.1
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bee6f688
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bee6f688
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bee6f688
Branch: refs/heads/ignite-901
Commit: bee6f6884ba0da9b9418842f7926d1dbd3ddd624
Parents: 546d595
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Jul 9 15:05:01 2015 +0300
Committer: iveselovskiy <iv...@gridgain.com>
Committed: Thu Jul 9 15:05:01 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bee6f688/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 2ab3e8c..7393f69 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -130,7 +130,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
* @throws Exception If failed.
*/
public static void downloadHive() throws Exception {
- String ver = IgniteSystemProperties.getString("hive.version", "1.2.0");
+ String ver = IgniteSystemProperties.getString("hive.version", "1.2.1");
X.println("Will use Hive version: " + ver);
[5/6] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/master' into ignite-901
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-901
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/40532032
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/40532032
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/40532032
Branch: refs/heads/ignite-901
Commit: 40532032ba07e04c032ccf0cfb09788889a2c792
Parents: fa007b1 bee6f68
Author: sboikov <se...@inria.fr>
Authored: Thu Jul 9 19:07:01 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Thu Jul 9 19:07:01 2015 +0300
----------------------------------------------------------------------
.../ignite/testsuites/IgniteHadoopTestSuite.java | 2 +-
.../util/spring/IgniteSpringHelperImpl.java | 10 +++++-----
.../spring/IgniteExcludeInConfigurationTest.java | 5 ++++-
.../org/apache/ignite/spring/sprint-exclude.xml | 19 +++++++++++++++++++
.../visor/commands/open/VisorOpenCommand.scala | 2 +-
5 files changed, 30 insertions(+), 8 deletions(-)
----------------------------------------------------------------------