You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/18 12:14:33 UTC
[3/9] ignite git commit: ignite-1.5 Properly handle duplicated job
responses in GridTaskWorker.onResponse. Use correct 'initialRebalanceFuture'
for client nodes.
ignite-1.5 Properly handle duplicated job responses in GridTaskWorker.onResponse. Use correct 'initialRebalanceFuture' for client nodes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1f90655
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1f90655
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1f90655
Branch: refs/heads/ignite-1537
Commit: b1f906555cea8990dd39e8050ca4348f09da7f7f
Parents: 301e7a1
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 18 12:08:20 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 18 12:08:20 2015 +0300
----------------------------------------------------------------------
.../ClientAbstractConnectivitySelfTest.java | 2 +-
.../internal/cluster/ClusterGroupAdapter.java | 10 +-
.../dht/preloader/GridDhtPreloader.java | 2 +-
.../processors/task/GridTaskWorker.java | 8 +-
.../internal/util/lang/GridNodePredicate.java | 13 +-
.../ignite/internal/util/nio/GridNioServer.java | 11 +-
.../util/nio/GridSelectorNioSessionImpl.java | 7 +
.../TcpDiscoveryMulticastIpFinder.java | 12 +-
.../ignite/internal/ClusterGroupSelfTest.java | 32 ++-
.../IgniteClientReconnectCacheTest.java | 7 +-
.../ignite/internal/TaskNodeRestartTest.java | 230 +++++++++++++++++++
.../IgniteCacheSizeFailoverTest.java | 4 +-
.../random/RandomEvictionPolicySelfTest.java | 4 +-
.../GridServiceProcessorStopSelfTest.java | 18 +-
.../IgniteMessagingWithClientTest.java | 2 -
.../GridSessionCheckpointAbstractSelfTest.java | 3 +-
.../spi/discovery/tcp/TcpDiscoverySelfTest.java | 2 +-
.../testsuites/IgniteComputeGridTestSuite.java | 2 +
18 files changed, 340 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
index ef18a29..8207ccf 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/client/integration/ClientAbstractConnectivitySelfTest.java
@@ -134,7 +134,7 @@ public abstract class ClientAbstractConnectivitySelfTest extends GridCommonAbstr
/**
* Simple test of address list filtering.
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testResolveReachableOneAddress() throws Exception {
InetAddress addr = InetAddress.getByAddress(new byte[] {127, 0, 0, 1} );
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 9039ed8..75168a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -626,7 +626,15 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
/** {@inheritDoc} */
@Override public final ClusterGroup forRandom() {
- return ids != null ? forNodeId(F.rand(ids)) : forNode(F.rand(nodes()));
+ if (!F.isEmpty(ids))
+ return forNodeId(F.rand(ids));
+
+ Collection<ClusterNode> nodes = nodes();
+
+ if (nodes.isEmpty())
+ return new ClusterGroupAdapter(ctx, null, Collections.<UUID>emptySet());
+
+ return forNode(F.rand(nodes));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 9a6246f..c46a66c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -482,7 +482,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> initialRebalanceFuture() {
- return cctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : initRebalanceFut;
+ return cctx.kernalContext().clientNode() ? startFut : initRebalanceFut;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 9315d7c..59d3f90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -696,8 +696,9 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res);
- selfOccupied = true;
+ res = delayedRess.poll();
+ // We can not return here because there can be more delayed messages in the queue.
continue;
}
@@ -708,7 +709,10 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
if (log.isDebugEnabled())
log.debug("Received redundant response for a job (will ignore): " + res);
- return;
+ res = delayedRess.poll();
+
+ // We can not return here because there can be more delayed messages in the queue.
+ continue;
}
if (!jobRes.getNode().id().equals(res.getNodeId())) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java
index 4ce0b35..edec862 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridNodePredicate.java
@@ -100,13 +100,18 @@ public class GridNodePredicate implements IgnitePredicate<ClusterNode>, Iterable
public GridNodePredicate(@Nullable ClusterNode... nodes) {
if (F.isEmpty(nodes))
ids = Collections.emptySet();
- else if (nodes.length == 1)
- ids = Collections.singleton(nodes[0].id());
+ else if (nodes.length == 1) {
+ ClusterNode node = nodes[0];
+
+ ids = node != null ? Collections.singleton(node.id()) : Collections.<UUID>emptySet();
+ }
else {
ids = U.newHashSet(nodes.length);
- for (ClusterNode n : nodes)
- ids.add(n.id());
+ for (ClusterNode n : nodes) {
+ if (n != null)
+ ids.add(n.id());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 53cec84..be28c30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -425,10 +425,10 @@ public class GridNioServer<T> {
int msgCnt = sys ? ses.offerSystemFuture(fut) : ses.offerFuture(fut);
- IgniteInClosure<IgniteException> ackClosure;
+ IgniteInClosure<IgniteException> ackC;
- if (!sys && (ackClosure = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
- fut.ackClosure(ackClosure);
+ if (!sys && (ackC = ses.removeMeta(ACK_CLOSURE.ordinal())) != null)
+ fut.ackClosure(ackC);
if (ses.closed()) {
if (ses.removeFuture(fut))
@@ -1609,15 +1609,14 @@ public class GridNioServer<T> {
sessions.remove(ses);
- if (closed)
- ses.onServerStopped();
-
SelectionKey key = ses.key();
// Shutdown input and output so that remote client will see correct socket close.
Socket sock = ((SocketChannel)key.channel()).socket();
if (ses.setClosed()) {
+ ses.onClosed();
+
if (directBuf) {
if (ses.writeBuffer() != null)
((DirectBuffer)ses.writeBuffer()).cleaner().clean();
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 6b1f6a7..deb7d2b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -294,6 +294,13 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
*
*/
void onServerStopped() {
+ onClosed();
+ }
+
+ /**
+ *
+ */
+ void onClosed() {
if (sem != null)
sem.release(1_000_000);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
index 77bb99d..8402cbf 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinder.java
@@ -136,6 +136,9 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
/** */
private boolean mcastErr;
+ @GridToStringExclude
+ private Set<InetSocketAddress> locNodeAddrs;
+
/**
* Constructs new IP finder.
*/
@@ -369,6 +372,8 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
}
if (!clientMode) {
+ locNodeAddrs = new HashSet<>(addrs);
+
if (addrSnds.isEmpty()) {
try {
// Create non-bound socket if local host is loopback or failed to create sockets explicitly
@@ -403,8 +408,11 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
else
mcastErr = true;
}
- else
+ else {
assert addrSnds.isEmpty() : addrSnds;
+
+ locNodeAddrs = Collections.emptySet();
+ }
}
/** {@inheritDoc} */
@@ -607,7 +615,7 @@ public class TcpDiscoveryMulticastIpFinder extends TcpDiscoveryVmIpFinder {
U.close(sock);
}
- if (!rmtAddrs.isEmpty())
+ if (rmtAddrs.size() > locNodeAddrs.size())
break;
if (i < addrReqAttempts - 1) // Wait some time before re-sending address request.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
index d916d78..18eb3b7 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ClusterGroupSelfTest.java
@@ -109,6 +109,10 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
}
assertEquals(oldest.node(), ignite.cluster().forNode(node).node());
+
+ ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+
+ assertEquals(0, emptyGrp.forOldest().nodes().size());
}
/**
@@ -130,6 +134,10 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
}
assertEquals(youngest.node(), ignite.cluster().forNode(node).node());
+
+ ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+
+ assertEquals(0, emptyGrp.forYoungest().nodes().size());
}
/**
@@ -187,8 +195,7 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
assertEquals(grid(gridMaxOrder(clusterSize, false)).localNode().id(), oddYoungest.node().id());
assertEquals(grid(2).localNode().id(), oddOldest.node().id());
- try (Ignite g4 = startGrid(NODES_CNT);
- Ignite g5 = startGrid(NODES_CNT + 1))
+ try (Ignite g4 = startGrid(NODES_CNT); Ignite g5 = startGrid(NODES_CNT + 1))
{
clusterSize = g4.cluster().nodes().size();
@@ -241,6 +248,27 @@ public class ClusterGroupSelfTest extends ClusterGroupAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ public void testEmptyGroup() throws Exception {
+ ClusterGroup emptyGrp = ignite.cluster().forAttribute("nonExistent", "val");
+
+ assertEquals(0, emptyGrp.forOldest().nodes().size());
+ assertEquals(0, emptyGrp.forYoungest().nodes().size());
+ assertEquals(0, emptyGrp.forAttribute("nonExistent2", "val").nodes().size());
+ assertEquals(0, emptyGrp.forCacheNodes("cacheName").nodes().size());
+ assertEquals(0, emptyGrp.forClientNodes("cacheName").nodes().size());
+ assertEquals(0, emptyGrp.forClients().nodes().size());
+ assertEquals(0, emptyGrp.forDaemons().nodes().size());
+ assertEquals(0, emptyGrp.forDataNodes("cacheName").nodes().size());
+ assertEquals(0, emptyGrp.forRandom().nodes().size());
+ assertEquals(0, emptyGrp.forRemotes().nodes().size());
+ assertEquals(0, emptyGrp.forServers().nodes().size());
+ assertEquals(0, emptyGrp.forHost(ignite.cluster().localNode()).nodes().size());
+ assertEquals(0, emptyGrp.forHost("127.0.0.1").nodes().size());
+ }
+
+ /**
* @param cnt Count.
* @param even Even.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
index 5dbf75a..5234d6e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java
@@ -73,6 +73,9 @@ import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.values;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -169,6 +172,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
ccfg.setWriteSynchronizationMode(FULL_SYNC);
ccfg.setName("nearCache");
+ ccfg.setAtomicWriteOrderMode(PRIMARY);
final IgniteCache<Object, Object> nearCache = client.getOrCreateCache(ccfg, new NearCacheConfiguration<>());
@@ -786,8 +790,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac
for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) {
CacheAtomicWriteOrderMode[] writeOrders =
- atomicityMode == ATOMIC ? CacheAtomicWriteOrderMode.values() :
- new CacheAtomicWriteOrderMode[]{CacheAtomicWriteOrderMode.CLOCK};
+ atomicityMode == ATOMIC ? values() : new CacheAtomicWriteOrderMode[]{CLOCK};
for (CacheAtomicWriteOrderMode writeOrder : writeOrders) {
for (CacheWriteSynchronizationMode syncMode : CacheWriteSynchronizationMode.values()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
new file mode 100644
index 0000000..1e3b213
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TaskNodeRestartTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeJobResultPolicy;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class TaskNodeRestartTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final int NODES = 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGridsMultiThreaded(NODES);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTaskNodeRestart() throws Exception {
+ final AtomicBoolean finished = new AtomicBoolean();
+
+ final AtomicInteger stopIdx = new AtomicInteger();
+
+ IgniteInternalFuture<?> restartFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ int idx = stopIdx.getAndIncrement();
+
+ int node = NODES + idx;
+
+ while (!finished.get()) {
+ log.info("Start node: " + node);
+
+ startGrid(node);
+
+ U.sleep(300);
+
+ log.info("Stop node: " + node);
+
+ stopGrid(node);
+ }
+
+ return null;
+ }
+ }, 2, "stop-thread");
+
+ IgniteInternalFuture<?> fut = null;
+
+ try {
+ final long stopTime = System.currentTimeMillis() + 60_000;
+
+ final AtomicInteger idx = new AtomicInteger();
+
+ fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int node = idx.getAndIncrement() % NODES;
+
+ Ignite ignite = ignite(node);
+
+ log.info("Start thread: " + ignite.name());
+
+ IgniteCompute compute = ignite.compute();
+
+ while (U.currentTimeMillis() < stopTime) {
+ try {
+ compute.broadcast(new TestCallable());
+
+ compute.call(new TestCallable());
+
+ compute.execute(new TestTask1(), null);
+
+ compute.execute(new TestTask2(), null);
+ }
+ catch (IgniteException e) {
+ log.info("Error: " + e);
+ }
+ }
+
+ return null;
+ }
+ }, 20, "test-thread");
+
+ fut.get(90_000);
+
+ finished.set(true);
+
+ restartFut.get();
+ }
+ finally {
+ finished.set(true);
+
+ if (fut != null)
+ fut.cancel();
+
+ restartFut.get(5000);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTask1 extends ComputeTaskAdapter<Void, Void> {
+ /** {@inheritDoc} */
+ @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+ throws IgniteException {
+ Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new TestJob(), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTask2 implements ComputeTask<Void, Void> {
+ /** {@inheritDoc} */
+ @Nullable public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Void arg)
+ throws IgniteException {
+ Map<TestJob, ClusterNode> jobs = new HashMap<>();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new TestJob(), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestJob extends ComputeJobAdapter {
+ /** {@inheritDoc} */
+ @Override public Object execute() throws IgniteException {
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestCallable implements IgniteCallable<Void> {
+ /** {@inheritDoc} */
+ @Nullable @Override public Void call() throws Exception {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
index 1738a0d..5d074e1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSizeFailoverTest.java
@@ -82,7 +82,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest {
IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
- int idx = cntr.getAndIncrement();
+ int idx = cntr.getAndIncrement() % 2;
IgniteCache<Object, Object> cache = ignite(idx).cache(null);
@@ -97,7 +97,7 @@ public class IgniteCacheSizeFailoverTest extends GridCommonAbstractTest {
return null;
}
- }, 2, "size-thread");
+ }, 10, "size-thread");
try {
for (int i = 0; i < 10; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java
index af04cdc..a253a25 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/eviction/random/RandomEvictionPolicySelfTest.java
@@ -197,7 +197,9 @@ public class RandomEvictionPolicySelfTest extends
}
}, 10);
- assert g.cache(null).size() <= max;
+ int size = g.cache(null).size();
+
+ assertTrue("Unexpected cache size [size=" + size + ", max=" + max + ']', size <= max);
info(policy(0));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index 16ea5e4..dfea37a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteServices;
import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.services.Service;
import org.apache.ignite.services.ServiceContext;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -31,6 +32,13 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
* Tests that {@link GridServiceProcessor} completes deploy/undeploy futures during node stop.
*/
public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
/**
* @throws Exception If failed.
*/
@@ -43,6 +51,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
Thread t = new Thread(new Runnable() {
@Override public void run() {
+ Thread.currentThread().setName("deploy-thread");
+
IgniteServices svcs = ignite.services();
IgniteServices services = svcs.withAsync();
@@ -69,13 +79,19 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest {
Ignition.stopAll(true);
- assertTrue("Deploy future isn't completed", finishLatch.await(15, TimeUnit.SECONDS));
+ boolean wait = finishLatch.await(15, TimeUnit.SECONDS);
+
+ if (!wait)
+ U.dumpThreads(log);
+
+ assertTrue("Deploy future isn't completed", wait);
}
/**
* Simple map service.
*/
public interface TestService {
+ // No-op.
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
index 62f4c1a..e885f48 100644
--- a/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/messaging/IgniteMessagingWithClientTest.java
@@ -78,8 +78,6 @@ public class IgniteMessagingWithClientTest extends GridCommonAbstractTest implem
* @throws Exception If failed.
*/
public void testMessageSendWithClientJoin() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-996");
-
startGrid(0);
Ignite ignite1 = startGrid(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java
index 06cbf1c..c087d38 100644
--- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionCheckpointAbstractSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.compute.ComputeTaskSessionScope;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.TaskSessionResource;
@@ -55,7 +56,7 @@ public abstract class GridSessionCheckpointAbstractSelfTest extends GridCommonAb
private static final int SPLIT_COUNT = 5;
/** */
- private static CountDownLatch taskLatch;
+ private static volatile CountDownLatch taskLatch;
/** */
protected GridSessionCheckpointAbstractSelfTest() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 5475f25..5af0596 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -980,7 +980,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
}
}
- assertTrue("GridTcpDiscoveryMulticastIpFinder should register port." , found);
+ assertTrue("TcpDiscoveryMulticastIpFinder should register port." , found);
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1f90655/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index 23f2edc..e2c7e26 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.IgniteComputeEmptyClusterGroupTest;
import org.apache.ignite.internal.IgniteComputeTopologyExceptionTest;
import org.apache.ignite.internal.IgniteExecutorServiceTest;
import org.apache.ignite.internal.IgniteExplicitImplicitDeploymentSelfTest;
+import org.apache.ignite.internal.TaskNodeRestartTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
@@ -144,6 +145,7 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(IgniteComputeEmptyClusterGroupTest.class);
suite.addTestSuite(IgniteComputeTopologyExceptionTest.class);
suite.addTestSuite(GridTaskFailoverAffinityRunTest.class);
+ suite.addTestSuite(TaskNodeRestartTest.class);
return suite;
}