You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/28 15:21:48 UTC
[27/50] [abbrv] ignite git commit: ignite-1.5 Fixed hang on metadata
update inside put in atomic cache when topology read lock is held. Also fixed
several test issues.
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 5a4ba14..283da80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1009,107 +1009,111 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
*/
private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr,
boolean lostAllow, boolean wait) throws Exception {
- if (wait)
+ if (wait) {
GridTestUtils.waitForCondition(new PA() {
- @Override public boolean apply() {
+ @Override
+ public boolean apply() {
return expEvts.size() == lsnr.size();
}
}, 2000L);
+ }
- Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+ synchronized (lsnr) {
+ Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
- for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
- prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+ for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+ prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
- List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
+ List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
- for (T3<Object, Object, Object> exp : expEvts) {
- List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+ for (T3<Object, Object, Object> exp : expEvts) {
+ List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
- if (F.eq(exp.get2(), exp.get3()))
- continue;
+ if (F.eq(exp.get2(), exp.get3()))
+ continue;
- if (rcvdEvts == null || rcvdEvts.isEmpty()) {
- lostEvts.add(exp);
+ if (rcvdEvts == null || rcvdEvts.isEmpty()) {
+ lostEvts.add(exp);
- continue;
- }
+ continue;
+ }
- Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
+ Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
- boolean found = false;
+ boolean found = false;
- while (iter.hasNext()) {
- CacheEntryEvent<?, ?> e = iter.next();
+ while (iter.hasNext()) {
+ CacheEntryEvent<?, ?> e = iter.next();
- if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
- && equalOldValue(e, exp)) {
- found = true;
+ if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
+ && equalOldValue(e, exp)) {
+ found = true;
- iter.remove();
+ iter.remove();
- break;
+ break;
+ }
}
- }
- // Lost event is acceptable.
- if (!found)
- lostEvts.add(exp);
- }
+ // Lost event is acceptable.
+ if (!found)
+ lostEvts.add(exp);
+ }
- boolean dup = false;
+ boolean dup = false;
- // Check duplicate.
- if (!lsnr.evts.isEmpty()) {
- for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
- if (!evts.isEmpty()) {
- for (CacheEntryEvent<?, ?> e : evts) {
- boolean found = false;
+ // Check duplicate.
+ if (!lsnr.evts.isEmpty()) {
+ for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
+ if (!evts.isEmpty()) {
+ for (CacheEntryEvent<?, ?> e : evts) {
+ boolean found = false;
- for (T3<Object, Object, Object> lostEvt : lostEvts) {
- if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
- found = true;
+ for (T3<Object, Object, Object> lostEvt : lostEvts) {
+ if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2())) {
+ found = true;
- lostEvts.remove(lostEvt);
+ lostEvts.remove(lostEvt);
- break;
+ break;
+ }
}
- }
- if (!found) {
- dup = true;
+ if (!found) {
+ dup = true;
- break;
+ break;
+ }
}
}
}
- }
- if (dup) {
- for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
- if (!e.isEmpty()) {
- for (CacheEntryEvent<?, ?> event : e)
- log.error("Got duplicate event: " + event);
+ if (dup) {
+ for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
+ if (!e.isEmpty()) {
+ for (CacheEntryEvent<?, ?> event : e)
+ log.error("Got duplicate event: " + event);
+ }
}
}
}
- }
- if (!lostAllow && lostEvts.size() > 100) {
- log.error("Lost event cnt: " + lostEvts.size());
+ if (!lostAllow && lostEvts.size() > 100) {
+ log.error("Lost event cnt: " + lostEvts.size());
- for (T3<Object, Object, Object> e : lostEvts)
- log.error("Lost event: " + e);
+ for (T3<Object, Object, Object> e : lostEvts)
+ log.error("Lost event: " + e);
- fail("Lose events, see log for details.");
- }
+ fail("Lose events, see log for details.");
+ }
- log.error("Lost event cnt: " + lostEvts.size());
+ log.error("Lost event cnt: " + lostEvts.size());
- expEvts.clear();
+ expEvts.clear();
- lsnr.evts.clear();
- lsnr.vals.clear();
+ lsnr.evts.clear();
+ lsnr.vals.clear();
+ }
}
/**
@@ -2111,7 +2115,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC
/**
* @return Count events.
*/
- public int size() {
+ public synchronized int size() {
int size = 0;
for (List<CacheEntryEvent<?, ?>> e : evts.values())
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5f5dfd4..db59a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -128,6 +129,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
cfg.setDiscoverySpi(disco);
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
return cfg;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
index b529b6c..49c6968 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
@@ -38,6 +39,7 @@ import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -171,14 +173,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest {
for (int i = 0 ; i < NODES_CNT; i++) {
log.info("Iteration: " + i);
- Ignite ignite = grid(i);
+ final Ignite ignite = grid(i);
ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService());
- ClusterGroup grp = ignite.cluster();
+ final ClusterGroup grp = ignite.cluster();
assertEquals(NODES_CNT, grp.nodes().size());
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ignite.services(grp).serviceDescriptors().size() == 1;
+ }
+ }, 5000);
+
Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
assertEquals(1, srvDscs.size());
@@ -206,14 +214,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest {
for (int i = 0 ; i < NODES_CNT; i++) {
log.info("Iteration: " + i);
- Ignite ignite = grid(i);
+ final Ignite ignite = grid(i);
ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME, new TestService());
- ClusterGroup grp = ignite.cluster();
+ final ClusterGroup grp = ignite.cluster();
assertEquals(NODES_CNT, grp.nodes().size());
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return ignite.services(grp).serviceDescriptors().size() == 1;
+ }
+ }, 5000);
+
Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
assertEquals(1, srvDscs.size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index dfea37a..92b18ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -17,15 +17,18 @@
package org.apache.ignite.internal.processors.service;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
/**
@@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
final Ignite ignite = startGrid(0);
- Thread t = new Thread(new Runnable() {
- @Override public void run() {
- Thread.currentThread().setName("deploy-thread");
-
+ IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
IgniteServices svcs = ignite.services();
IgniteServices services = svcs.withAsync();
@@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
catch (IgniteException e) {
finishLatch.countDown();
}
- catch (Throwable e) {
- log.error("Service deployment error: ", e);
+ finally {
+ finishLatch.countDown();
}
- }
- });
- t.start();
+ return null;
+ }
+ }, "deploy-thread");
depLatch.await();
@@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
U.dumpThreads(log);
assertTrue("Deploy future isn't completed", wait);
+
+ fut.get();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 731b0c7..7bbf531 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
@@ -55,6 +56,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
import org.apache.ignite.testframework.GridSpiTestContext;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
import org.apache.ignite.testframework.junits.IgniteTestResources;
import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
import org.jsr166.ConcurrentLinkedDeque8;
@@ -90,6 +92,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/** Initialized nodes */
private static final List<ClusterNode> nodes = new ArrayList<>();
+ /** */
+ private static GridTimeoutProcessor timeoutProcessor;
+
/** Flag indicating if listener should reject messages. */
private static boolean reject;
@@ -472,6 +477,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
+ timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+ timeoutProcessor.start();
+
+ timeoutProcessor.onKernalStart();
+
for (int i = 0; i < getSpiCount(); i++) {
CommunicationSpi<Message> spi = newCommunicationSpi();
@@ -485,6 +496,8 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
GridSpiTestContext ctx = initSpiContext();
+ ctx.timeoutProcessor(timeoutProcessor);
+
ctx.setLocalNode(node);
info(">>> Initialized context: nodeId=" + ctx.localNode().id());
@@ -548,6 +561,14 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
+ if (timeoutProcessor != null) {
+ timeoutProcessor.onKernalStop(true);
+
+ timeoutProcessor.stop(true);
+
+ timeoutProcessor = null;
+ }
+
for (CommunicationSpi<Message> spi : spis.values()) {
spi.onContextDestroyed();
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 5af0596..0df7da6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -897,8 +897,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
*/
public void testIpFinderCleaning() throws Exception {
try {
- ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
- new InetSocketAddress("host2", 1024)));
+ ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+ new InetSocketAddress("1.1.1.2", 1024)));
Ignite g1 = startGrid(1);
@@ -912,13 +912,19 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
}, timeout);
+ if (ipFinder.getRegisteredAddresses().size() != 1) {
+ log.error("Failed to wait for IP cleanup, will dump threads.");
+
+ U.dumpThreads(log);
+ }
+
assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses();
// Check that missing addresses are returned back.
ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister valid address.
- ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
- new InetSocketAddress("host2", 1024)));
+ ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+ new InetSocketAddress("1.1.1.2", 1024)));
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index e257a97..0bffe8b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -89,6 +91,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** */
private MessageFactory factory;
+ /** */
+ private GridTimeoutProcessor timeoutProcessor;
+
+ /**
+ * @param timeoutProcessor Timeout processor.
+ */
+ public void timeoutProcessor(GridTimeoutProcessor timeoutProcessor) {
+ this.timeoutProcessor = timeoutProcessor;
+ }
+
/** {@inheritDoc} */
@Override public Collection<ClusterNode> remoteNodes() {
return rmtNodes;
@@ -530,12 +542,14 @@ public class GridSpiTestContext implements IgniteSpiContext {
/** {@inheritDoc} */
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
- // No-op.
+ if (timeoutProcessor != null)
+ timeoutProcessor.addTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
- // No-op.
+ if (timeoutProcessor != null)
+ timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
index 77e2dae..4a84931 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
@@ -38,14 +38,16 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.startup.servlet.ServletContextListenerStartup;
import org.apache.ignite.transactions.Transaction;
@@ -191,6 +193,9 @@ public class WebSessionFilter implements Filter {
/** Transactions enabled flag. */
private boolean txEnabled;
+ /** */
+ private int retries;
+
/** {@inheritDoc} */
@Override public void init(FilterConfig cfg) throws ServletException {
ctx = cfg.getServletContext();
@@ -207,8 +212,6 @@ public class WebSessionFilter implements Filter {
cfg.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM),
ctx.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM));
- int retries;
-
try {
retries = retriesStr != null ? Integer.parseInt(retriesStr) : DFLT_MAX_RETRIES_ON_FAIL;
}
@@ -226,10 +229,6 @@ public class WebSessionFilter implements Filter {
log = webSesIgnite.log();
- if (webSesIgnite == null)
- throw new IgniteException("Grid for web sessions caching is not started (is it configured?): " +
- gridName);
-
cache = webSesIgnite.cache(cacheName);
if (cache == null)
@@ -409,41 +408,62 @@ public class WebSessionFilter implements Filter {
WebSession cached = new WebSession(ses, true);
- try {
- while (true) {
- try {
- IgniteCache<String, WebSession> cache0;
-
- if (cached.getMaxInactiveInterval() > 0) {
- long ttl = cached.getMaxInactiveInterval() * 1000;
+ for (int i = 0; i < retries; i++) {
+ try {
+ IgniteCache<String, WebSession> cache0;
- ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl));
+ if (cached.getMaxInactiveInterval() > 0) {
+ long ttl = cached.getMaxInactiveInterval() * 1000;
- cache0 = cache.withExpiryPolicy(plc);
- }
- else
- cache0 = cache;
+ ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS, ttl));
- WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
+ cache0 = cache.withExpiryPolicy(plc);
+ }
+ else
+ cache0 = cache;
- if (old != null) {
- cached = old;
+ WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
- if (cached.isNew())
- cached = new WebSession(cached, false);
- }
+ if (old != null) {
+ cached = old;
- break;
+ if (cached.isNew())
+ cached = new WebSession(cached, false);
}
- catch (CachePartialUpdateException e) {
+
+ break;
+ }
+ catch (CacheException | IgniteException e) {
+ if (log.isDebugEnabled())
+ log.debug(e.getMessage());
+
+ if (i == retries - 1)
+ throw new IgniteException("Failed to save session: " + sesId, e);
+ else {
if (log.isDebugEnabled())
- log.debug(e.getMessage());
+ log.debug("Failed to save session (will retry): " + sesId);
+
+ IgniteFuture<?> retryFut = null;
+
+ if (X.hasCause(e, ClusterTopologyException.class)) {
+ ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+ assert cause != null : e;
+
+ retryFut = cause.retryReadyFuture();
+ }
+
+ if (retryFut != null) {
+ try {
+ retryFut.get();
+ }
+ catch (IgniteException retryErr) {
+ throw new IgniteException("Failed to save session: " + sesId, retryErr);
+ }
+ }
}
}
}
- catch (CacheException e) {
- throw new IgniteException("Failed to save session: " + sesId, e);
- }
return cached;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
index 82f1633..b826031 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
@@ -30,12 +30,14 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CachePartialUpdateException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -117,7 +119,7 @@ class WebSessionListener {
break;
}
- catch (CachePartialUpdateException ignored) {
+ catch (CacheException | IgniteException e) {
if (i == retries - 1) {
U.warn(log, "Failed to apply updates for session (maximum number of retries exceeded) [sesId=" +
sesId + ", retries=" + retries + ']');
@@ -125,12 +127,25 @@ class WebSessionListener {
else {
U.warn(log, "Failed to apply updates for session (will retry): " + sesId);
- U.sleep(RETRY_DELAY);
+ IgniteFuture<?> retryFut = null;
+
+ if (X.hasCause(e, ClusterTopologyException.class)) {
+ ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+ assert cause != null : e;
+
+ retryFut = cause.retryReadyFuture();
+ }
+
+ if (retryFut != null)
+ retryFut.get();
+ else
+ U.sleep(RETRY_DELAY);
}
}
}
}
- catch (CacheException | IgniteInterruptedCheckedException e) {
+ catch (Exception e) {
U.error(log, "Failed to update session attributes [id=" + sesId + ']', e);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
index 4508edb..7a321d6 100644
--- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
+++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
@@ -142,7 +142,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
assert idx != -1;
- assert srv != null;
stopServer(srv);
@@ -181,7 +180,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
}
assert idx != -1;
- assert srv != null;
int port = TEST_JETTY_PORT + idx;