You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/13 20:25:09 UTC
[50/50] [abbrv] incubator-ignite git commit: Merge remote-tracking
branch 'remotes/origin/ignite-709_2' into ignite-836_2
Merge remote-tracking branch 'remotes/origin/ignite-709_2' into ignite-836_2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8c105ec2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8c105ec2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8c105ec2
Branch: refs/heads/ignite-836_2
Commit: 8c105ec2379acb78bd5fc9185cc99b9a1bc18562
Parents: 9126679 16e211e
Author: sevdokimov <se...@gridgain.com>
Authored: Wed May 13 15:09:57 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Wed May 13 15:09:57 2015 +0300
----------------------------------------------------------------------
assembly/release-base.xml | 4 +-
bin/ignite-schema-import.bat | 2 +-
bin/ignite-schema-import.sh | 2 +-
bin/ignite.bat | 2 +-
bin/ignite.sh | 2 +-
bin/ignitevisorcmd.bat | 2 +-
bin/ignitevisorcmd.sh | 2 +-
bin/include/build-classpath.bat | 46 +
bin/include/build-classpath.sh | 71 +
bin/include/functions.sh | 2 +-
bin/include/target-classpath.bat | 46 -
bin/include/target-classpath.sh | 71 -
examples/pom.xml | 2 +-
modules/aop/pom.xml | 2 +-
modules/aws/pom.xml | 2 +-
modules/clients/pom.xml | 2 +-
modules/cloud/pom.xml | 4 +-
.../TcpDiscoveryCloudIpFinderSelfTest.java | 2 -
modules/codegen/pom.xml | 2 +-
.../ignite/codegen/MessageCodeGenerator.java | 4 +-
modules/core/pom.xml | 2 +-
.../internal/direct/DirectByteBufferStream.java | 4 +-
.../communication/GridIoMessageFactory.java | 4 +-
.../eventstorage/GridEventStorageManager.java | 5 +-
.../cache/DynamicCacheDescriptor.java | 16 +-
.../processors/cache/GridCacheAdapter.java | 544 +-
.../cache/GridCacheEvictionManager.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 18 +-
.../processors/cache/GridCacheMvccManager.java | 4 +-
.../GridCachePartitionExchangeManager.java | 3 +
.../processors/cache/GridCacheProcessor.java | 189 +-
.../processors/cache/GridCacheProxyImpl.java | 24 -
.../processors/cache/GridCacheSwapManager.java | 215 +-
.../processors/cache/GridCacheTtlManager.java | 42 +-
.../processors/cache/GridCacheUtils.java | 5 +-
.../processors/cache/IgniteInternalCache.java | 27 -
...ridCacheOptimisticCheckPreparedTxFuture.java | 434 --
...idCacheOptimisticCheckPreparedTxRequest.java | 232 -
...dCacheOptimisticCheckPreparedTxResponse.java | 179 -
.../distributed/GridCacheTxRecoveryFuture.java | 506 ++
.../distributed/GridCacheTxRecoveryRequest.java | 261 +
.../GridCacheTxRecoveryResponse.java | 182 +
.../GridDistributedTxRemoteAdapter.java | 2 +-
.../distributed/dht/GridDhtLocalPartition.java | 2 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 32 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 27 +
.../cache/distributed/dht/GridDhtTxMapping.java | 2 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 81 +-
.../dht/GridPartitionedGetFuture.java | 2 +-
.../colocated/GridDhtColocatedLockFuture.java | 27 +-
.../colocated/GridDhtDetachedCacheEntry.java | 4 +-
.../distributed/near/GridNearCacheAdapter.java | 10 -
.../distributed/near/GridNearCacheEntry.java | 4 +-
.../distributed/near/GridNearLockFuture.java | 5 -
.../near/GridNearOptimisticTxPrepareFuture.java | 779 ++
.../GridNearPessimisticTxPrepareFuture.java | 349 +
.../cache/distributed/near/GridNearTxLocal.java | 84 +-
.../near/GridNearTxPrepareFuture.java | 1050 ---
.../near/GridNearTxPrepareFutureAdapter.java | 226 +
.../processors/cache/local/GridLocalCache.java | 8 +-
.../local/atomic/GridLocalAtomicCache.java | 27 +-
.../cache/query/GridCacheQueryManager.java | 21 +-
.../cache/query/GridCacheSqlQuery.java | 2 +-
.../cache/query/GridCacheTwoStepQuery.java | 17 +
.../cache/transactions/IgniteInternalTx.java | 9 +-
.../cache/transactions/IgniteTxAdapter.java | 4 +-
.../cache/transactions/IgniteTxHandler.java | 100 +-
.../transactions/IgniteTxLocalAdapter.java | 16 +-
.../cache/transactions/IgniteTxManager.java | 185 +-
.../datastreamer/DataStreamerImpl.java | 2 +
.../processors/igfs/IgfsDataManager.java | 3 +
.../processors/igfs/IgfsDeleteWorker.java | 4 +
.../processors/igfs/IgfsMetaManager.java | 2 +-
.../internal/processors/igfs/IgfsUtils.java | 11 +-
.../offheap/GridOffHeapProcessor.java | 17 +
.../processors/resource/GridResourceField.java | 11 +
.../processors/resource/GridResourceIoc.java | 387 +-
.../processors/resource/GridResourceMethod.java | 13 +
.../resource/GridResourceProcessor.java | 4 +-
.../ignite/internal/util/IgniteUtils.java | 19 +-
.../internal/util/future/SettableFuture.java | 86 +
.../util/lang/GridFilteredIterator.java | 2 +-
.../ignite/internal/util/lang/GridFunc.java | 7218 +++++-------------
.../util/offheap/GridOffHeapPartitionedMap.java | 9 +
.../unsafe/GridUnsafePartitionedMap.java | 155 +-
.../internal/visor/query/VisorQueryArg.java | 14 +-
.../internal/visor/query/VisorQueryJob.java | 2 +
.../discovery/tcp/TcpClientDiscoverySpi.java | 114 +-
.../spi/discovery/tcp/TcpDiscoverySpi.java | 255 +-
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 39 +
.../messages/TcpDiscoveryClientPingRequest.java | 56 +
.../TcpDiscoveryClientPingResponse.java | 67 +
.../resources/META-INF/classnames.properties | 12 +-
.../core/src/main/resources/ignite.properties | 2 +-
.../internal/GridUpdateNotifierSelfTest.java | 21 +-
.../processors/cache/CacheGetFromJobTest.java | 110 +
.../GridCacheAbstractFailoverSelfTest.java | 12 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 227 +-
.../cache/GridCacheAbstractSelfTest.java | 4 +-
.../cache/OffHeapTieredTransactionSelfTest.java | 127 +
.../GridCacheAbstractNodeRestartSelfTest.java | 101 +-
.../distributed/IgniteTxGetAfterStopTest.java | 131 +
...xOriginatingNodeFailureAbstractSelfTest.java | 2 +-
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 43 +
...ionedNearDisabledOffHeapFullApiSelfTest.java | 8 +-
...DisabledOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...abledOffHeapTieredAtomicFullApiSelfTest.java | 56 +
...earDisabledOffHeapTieredFullApiSelfTest.java | 33 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...rDisabledPrimaryNodeFailureRecoveryTest.java | 31 +
...rtitionedPrimaryNodeFailureRecoveryTest.java | 31 +
...woBackupsPrimaryNodeFailureRecoveryTest.java | 37 +
...ePrimaryNodeFailureRecoveryAbstractTest.java | 533 ++
...idCacheAtomicReplicatedFailoverSelfTest.java | 6 +
...CacheAtomicOffHeapTieredFullApiSelfTest.java | 32 +
...icOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...yWriteOrderOffHeapTieredFullApiSelfTest.java | 33 +
...erOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
...achePartitionedMultiNodeFullApiSelfTest.java | 15 +-
.../GridCachePartitionedNodeRestartTest.java | 4 +-
...dCachePartitionedOffHeapFullApiSelfTest.java | 8 +-
...titionedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...PartitionedOffHeapTieredFullApiSelfTest.java | 32 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 72 +
...ePartitionedOptimisticTxNodeRestartTest.java | 4 +-
.../GridCachePartitionedTxSalvageSelfTest.java | 25 +-
.../GridCacheReplicatedFailoverSelfTest.java | 6 +
.../GridCacheReplicatedNodeRestartSelfTest.java | 82 +
...idCacheReplicatedOffHeapFullApiSelfTest.java | 8 +-
...plicatedOffHeapMultiNodeFullApiSelfTest.java | 8 +-
...eReplicatedOffHeapTieredFullApiSelfTest.java | 33 +
...edOffHeapTieredMultiNodeFullApiSelfTest.java | 33 +
.../IgniteCacheExpiryPolicyAbstractTest.java | 2 +-
.../IgniteCacheExpiryPolicyTestSuite.java | 2 +
.../expiry/IgniteCacheTtlCleanupSelfTest.java | 85 +
...LocalAtomicOffHeapTieredFullApiSelfTest.java | 32 +
.../GridCacheLocalOffHeapFullApiSelfTest.java | 6 +-
...dCacheLocalOffHeapTieredFullApiSelfTest.java | 32 +
.../igfs/IgfsClientCacheSelfTest.java | 132 +
.../processors/igfs/IgfsOneClientNodeTest.java | 133 +
.../processors/igfs/IgfsStreamsSelfTest.java | 2 +-
.../tcp/TcpClientDiscoverySelfTest.java | 1021 ---
.../TcpClientDiscoverySpiConfigSelfTest.java | 2 +-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 1089 +++
.../ignite/testsuites/IgniteBasicTestSuite.java | 1 +
.../IgniteCacheFailoverTestSuite.java | 10 +-
.../IgniteCacheFullApiSelfTestSuite.java | 18 +
.../testsuites/IgniteCacheRestartTestSuite.java | 11 +-
.../testsuites/IgniteCacheTestSuite3.java | 5 +-
.../IgniteCacheTxRecoverySelfTestSuite.java | 4 +
.../ignite/testsuites/IgniteIgfsTestSuite.java | 3 +
.../IgniteSpiDiscoverySelfTestSuite.java | 2 +-
modules/extdata/p2p/pom.xml | 2 +-
modules/extdata/uri/pom.xml | 2 +-
modules/gce/pom.xml | 4 +-
modules/geospatial/pom.xml | 2 +-
modules/hadoop/pom.xml | 2 +-
modules/hibernate/pom.xml | 2 +-
modules/indexing/pom.xml | 2 +-
.../processors/query/h2/IgniteH2Indexing.java | 4 +
.../processors/query/h2/sql/GridSqlQuery.java | 20 +
.../query/h2/sql/GridSqlQueryParser.java | 10 +-
.../query/h2/sql/GridSqlQuerySplitter.java | 11 +-
.../processors/query/h2/sql/GridSqlSelect.java | 2 +-
.../processors/query/h2/sql/GridSqlUnion.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 3 +
.../h2/twostep/GridReduceQueryExecutor.java | 119 +-
.../cache/GridCacheOffheapIndexGetSelfTest.java | 111 +
.../IgniteCacheAbstractFieldsQuerySelfTest.java | 21 +
...eQueryMultiThreadedOffHeapTiredSelfTest.java | 37 +
.../IgniteCacheQueryMultiThreadedSelfTest.java | 29 +-
.../IgniteCacheQuerySelfTestSuite.java | 1 +
.../IgniteCacheWithIndexingTestSuite.java | 2 +
modules/jcl/pom.xml | 2 +-
modules/jta/pom.xml | 2 +-
modules/log4j/pom.xml | 2 +-
modules/rest-http/pom.xml | 2 +-
modules/scalar/pom.xml | 2 +-
.../ignite/scalar/ScalarConversions.scala | 8 -
modules/schedule/pom.xml | 2 +-
modules/schema-import/pom.xml | 2 +-
.../ignite/schema/generator/CodeGenerator.java | 41 +-
modules/slf4j/pom.xml | 2 +-
modules/spring/pom.xml | 2 +-
modules/ssh/pom.xml | 2 +-
modules/tools/pom.xml | 2 +-
modules/urideploy/pom.xml | 2 +-
modules/visor-console/pom.xml | 2 +-
.../commands/cache/VisorCacheScanCommand.scala | 2 +-
modules/visor-plugins/pom.xml | 2 +-
modules/web/pom.xml | 2 +-
modules/yardstick/pom.xml | 2 +-
parent/pom.xml | 2 +
pom.xml | 115 +-
194 files changed, 9825 insertions(+), 10127 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index ad78f86,53522c7..134097b
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@@ -4476,74 -4517,76 +4514,110 @@@ public class TcpDiscoverySpi extends Tc
/**
* @param msg Message.
*/
+ private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
+ utilityPool.execute(new Runnable() {
+ @Override public void run() {
+ boolean res = pingNode(msg.nodeToPing());
+
+ final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (worker == null) {
+ if (log.isDebugEnabled())
+ log.debug("Ping request from dead client node, will be skipped: " + msg.creatorNodeId());
+ }
+ else {
+ TcpDiscoveryClientPingResponse pingRes = new TcpDiscoveryClientPingResponse(
+ getLocalNodeId(), msg.nodeToPing(), res);
+
+ pingRes.verify(getLocalNodeId());
+
+ worker.addMessage(pingRes);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void processPingResponse(final TcpDiscoveryPingResponse msg) {
+ ClientMessageWorker clientWorker = clientMsgWorkers.get(msg.creatorNodeId());
+
+ if (clientWorker != null)
+ clientWorker.pingResult(true);
+ }
+
+ /**
+ * @param msg Message.
+ */
private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
if (isLocalNodeCoordinator()) {
- if (msg.verified()) {
+ boolean sndNext;
+
+ if (!msg.verified()) {
+ msg.verify(getLocalNodeId());
+ msg.topologyVersion(ring.topologyVersion());
+
+ notifyDiscoveryListener(msg);
+
+ sndNext = true;
+ }
+ else
+ sndNext = false;
+
+ if (sndNext && ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
+ else {
stats.onRingMessageReceived(msg);
- addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
+ try {
+ DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
- return;
+ DiscoverySpiCustomMessage nextMsg = msgObj.newMessageOnRingEnd();
+
+ if (nextMsg != null)
+ addMessage(new TcpDiscoveryCustomEventMessage(getLocalNodeId(), marsh.marshal(nextMsg)));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal discovery custom message.", e);
+ }
+
+ addMessage(new TcpDiscoveryDiscardMessage(getLocalNodeId(), msg.id()));
}
+ }
+ else {
+ if (msg.verified())
+ notifyDiscoveryListener(msg);
- msg.verify(getLocalNodeId());
- msg.topologyVersion(ring.topologyVersion());
+ if (ring.hasRemoteNodes())
+ sendMessageAcrossRing(msg);
}
+ }
- if (msg.verified()) {
- DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
+ /**
+ * @param msg Custom message.
+ */
+ private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) {
+ DiscoverySpiListener lsnr = TcpDiscoverySpi.this.lsnr;
- TcpDiscoverySpiState spiState = spiStateCopy();
+ TcpDiscoverySpiState spiState = spiStateCopy();
- Map<Long, Collection<ClusterNode>> hist;
+ Map<Long, Collection<ClusterNode>> hist;
- synchronized (mux) {
- hist = new TreeMap<>(topHist);
- }
+ synchronized (mux) {
+ hist = new TreeMap<>(topHist);
+ }
- Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
+ Collection<ClusterNode> snapshot = hist.get(msg.topologyVersion());
- if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
- assert msg.messageBytes() != null;
+ if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) {
+ assert msg.messageBytes() != null;
- TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
+ TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
+ if (node != null) {
try {
- Serializable msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
+ DiscoverySpiCustomMessage msgObj = marsh.unmarshal(msg.messageBytes(), U.gridClassLoader());
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
msg.topologyVersion(),
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8c105ec2/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index 0000000,507b3e7..da40d4e
mode 000000,100644..100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@@ -1,0 -1,1089 +1,1089 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.spi.discovery.tcp;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.internal.util.io.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.spi.discovery.tcp.messages.*;
+ import org.apache.ignite.testframework.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.net.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.events.EventType.*;
+
+ /**
+ * Client-based discovery tests.
+ */
+ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final AtomicInteger srvIdx = new AtomicInteger();
+
+ /** */
+ private static final AtomicInteger clientIdx = new AtomicInteger();
+
+ /** */
+ private static Collection<UUID> srvNodeIds;
+
+ /** */
+ private static Collection<UUID> clientNodeIds;
+
+ /** */
+ private static int clientsPerSrv;
+
+ /** */
+ private static CountDownLatch srvJoinedLatch;
+
+ /** */
+ private static CountDownLatch srvLeftLatch;
+
+ /** */
+ private static CountDownLatch srvFailedLatch;
+
+ /** */
+ private static CountDownLatch clientJoinedLatch;
+
+ /** */
+ private static CountDownLatch clientLeftLatch;
+
+ /** */
+ private static CountDownLatch clientFailedLatch;
+
+ /** */
+ private static CountDownLatch msgLatch;
+
+ /** */
+ private UUID nodeId;
+
+ /** */
+ private TcpDiscoveryVmIpFinder clientIpFinder;
+
+ /** */
+ private long joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setLocalHost("127.0.0.1");
+
+ if (gridName.startsWith("server")) {
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+ }
+ else if (gridName.startsWith("client")) {
+ TcpClientDiscoverySpi disco = new TestTcpClientDiscovery();
+
+ disco.setJoinTimeout(joinTimeout);
+
+ TcpDiscoveryVmIpFinder ipFinder;
+
+ if (clientIpFinder != null)
+ ipFinder = clientIpFinder;
+ else {
+ ipFinder = new TcpDiscoveryVmIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).
+ get((clientIdx.get() - 1) / clientsPerSrv).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Arrays.asList(addr));
+ }
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ String nodeId = cfg.getNodeId().toString();
+
+ nodeId = "cc" + nodeId.substring(2);
+
+ cfg.setNodeId(UUID.fromString(nodeId));
+ }
+
+ if (nodeId != null)
+ cfg.setNodeId(nodeId);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ Collection<InetSocketAddress> addrs = IP_FINDER.getRegisteredAddresses();
+
+ if (!F.isEmpty(addrs))
+ IP_FINDER.unregisterAddresses(addrs);
+
+ srvIdx.set(0);
+ clientIdx.set(0);
+
+ srvNodeIds = new GridConcurrentHashSet<>();
+ clientNodeIds = new GridConcurrentHashSet<>();
+
+ clientsPerSrv = 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllClients(true);
+ stopAllServers(true);
+
+ nodeId = null;
+ clientIpFinder = null;
+ joinTimeout = TcpClientDiscoverySpi.DFLT_JOIN_TIMEOUT;
+
+ assert G.allGrids().isEmpty();
+ }
+
+ /**
+ *
+ * @throws Exception
+ */
+ public void testJoinTimeout() throws Exception {
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+ joinTimeout = 1000;
+
+ try {
+ startClientNodes(1);
+
+ fail("Client cannot be start because no server nodes run");
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(3, 4);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(3);
+ clientLeftLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ stopGrid("client-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(3);
+ clientFailedLatch = new CountDownLatch(2);
+
+ attachListeners(3, 3);
+
+ failClient(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(3, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeJoin() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvJoinedLatch = new CountDownLatch(3);
+ clientJoinedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ startServerNodes(1);
+
+ await(srvJoinedLatch);
+ await(clientJoinedLatch);
+
+ checkNodes(4, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeLeave() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvLeftLatch = new CountDownLatch(2);
+ clientLeftLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(clientLeftLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testServerNodeFail() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(3, 3);
+
+ assert U.<Map>field(G.ignite("server-2").configuration().getDiscoverySpi(), "clientMsgWorkers").isEmpty();
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPing() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert ((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPingFailedNodeFromClient() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)srv1.configuration().getDiscoverySpi()).addIncomeConnectionListener(new IgniteInClosure<Socket>() {
+ @Override public void apply(Socket sock) {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ assert ((IgniteEx)client).context().discovery().pingNode(srv0.cluster().localNode().id());
+ assert !((IgniteEx)client).context().discovery().pingNode(srv1.cluster().localNode().id());
+
+ latch.countDown();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPingFailedClientNode() throws Exception {
+ startServerNodes(2);
+ startClientNodes(1);
+
+ Ignite srv0 = G.ignite("server-0");
+ Ignite srv1 = G.ignite("server-1");
+ Ignite client = G.ignite("client-0");
+
+ ((TcpDiscoverySpiAdapter)srv0.configuration().getDiscoverySpi()).setAckTimeout(1000);
+
+ ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).pauseSocketWrite();
+
+ assert !((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+ assert !((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+
+ ((TestTcpClientDiscovery)client.configuration().getDiscoverySpi()).resumeAll();
+
+ assert ((IgniteEx)srv1).context().discovery().pingNode(client.cluster().localNode().id());
+ assert ((IgniteEx)srv0).context().discovery().pingNode(client.cluster().localNode().id());
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnRouterFail() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ checkNodes(2, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientReconnectOnNetworkProblem() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ setClientRouter(2, 0);
+
+ srvFailedLatch = new CountDownLatch(2);
+ clientFailedLatch = new CountDownLatch(3);
+
+ attachListeners(2, 3);
+
+ ((TcpClientDiscoverySpi)G.ignite("client-2").configuration().getDiscoverySpi()).brokeConnection();
+
+ G.ignite("client-2").message().remoteListen(null, new MessageListener()); // Send some discovery message.
+
+ checkNodes(3, 3);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetMissedMessagesOnReconnect() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(2);
+
+ checkNodes(3, 2);
+
+ clientLeftLatch = new CountDownLatch(1);
+ srvLeftLatch = new CountDownLatch(2);
+
+ attachListeners(2, 2);
+
+ ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
+
+ stopGrid("server-2");
+
+ await(srvLeftLatch);
+ await(srvLeftLatch);
+
+ Thread.sleep(500);
+
+ assert G.ignite("client-0").cluster().nodes().size() == 4;
+ assert G.ignite("client-1").cluster().nodes().size() == 5;
+
+ clientLeftLatch = new CountDownLatch(1);
+
+ ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).resumeAll();
+
+ await(clientLeftLatch);
+
+ checkNodes(2, 2);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientSegmentation() throws Exception {
+ clientsPerSrv = 1;
+
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ // setClientRouter(2, 2);
+
+ srvFailedLatch = new CountDownLatch(2 + 2);
+ clientFailedLatch = new CountDownLatch(2 + 2);
+
+ attachListeners(2, 2);
+
+ final CountDownLatch client2StoppedLatch = new CountDownLatch(1);
+
+ IgnitionListener lsnr = new IgnitionListener() {
+ @Override public void onStateChange(@Nullable String name, IgniteState state) {
+ if (state == IgniteState.STOPPED_ON_SEGMENTATION)
+ client2StoppedLatch.countDown();
+ }
+ };
+ G.addListener(lsnr);
+
+ try {
+ failServer(2);
+
+ await(srvFailedLatch);
+ await(clientFailedLatch);
+
+ await(client2StoppedLatch);
+
+ checkNodes(2, 2);
+ }
+ finally {
+ G.removeListener(lsnr);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeJoinOneServer() throws Exception {
+ startServerNodes(1);
+
+ srvJoinedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ startClientNodes(1);
+
+ await(srvJoinedLatch);
+
+ checkNodes(1, 1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeLeaveOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvLeftLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ stopGrid("client-0");
+
+ await(srvLeftLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testClientNodeFailOneServer() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(1, 1);
+
+ srvFailedLatch = new CountDownLatch(1);
+
+ attachListeners(1, 0);
+
+ failClient(0);
+
+ await(srvFailedLatch);
+
+ checkNodes(1, 0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMetrics() throws Exception {
+ startServerNodes(3);
+ startClientNodes(3);
+
+ checkNodes(3, 3);
+
+ attachListeners(3, 3);
+
+ assertTrue(checkMetrics(3, 3, 0));
+
+ G.ignite("client-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 1);
+ }
+ }, 10000));
+
+ checkMetrics(3, 3, 1);
+
+ G.ignite("server-0").compute().broadcast(F.noop());
+
+ assertTrue(GridTestUtils.waitForCondition(new PA() {
+ @Override public boolean apply() {
+ return checkMetrics(3, 3, 2);
+ }
+ }, 10000));
+ }
+
+ /**
+ * @param srvCnt Number of Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ * @param execJobsCnt Expected number of executed jobs.
+ * @return Whether metrics are correct.
+ */
+ private boolean checkMetrics(int srvCnt, int clientCnt, int execJobsCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+ return false;
+ }
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ for (ClusterNode n : g.cluster().nodes()) {
+ if (n.metrics().getTotalExecutedJobs() != execJobsCnt)
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromServer() throws Exception {
+ testDataExchange("server-0");
+ }
+
+ /**
+ * TODO: IGNITE-587.
+ *
+ * @throws Exception If failed.
+ */
+ public void testDataExchangeFromClient() throws Exception {
+ testDataExchange("client-0");
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ private void testDataExchange(String masterName) throws Exception {
+ startServerNodes(2);
+ startClientNodes(2);
+
+ checkNodes(2, 2);
+
+ IgniteMessaging msg = grid(masterName).message();
+
+ UUID id = null;
+
+ try {
+ id = msg.remoteListen(null, new MessageListener());
+
- msgLatch = new CountDownLatch(4);
++ msgLatch = new CountDownLatch(2);
+
+ msg.send(null, "Message 1");
+
+ await(msgLatch);
+
+ startServerNodes(1);
+ startClientNodes(1);
+
+ checkNodes(3, 3);
+
- msgLatch = new CountDownLatch(6);
++ msgLatch = new CountDownLatch(3);
+
+ msg.send(null, "Message 2");
+
+ await(msgLatch);
+ }
+ finally {
+ if (id != null)
+ msg.stopRemoteListen(id);
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testDuplicateId() throws Exception {
+ startServerNodes(2);
+
+ nodeId = G.ignite("server-1").cluster().localNode().id();
+
+ try {
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("same ID") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testTimeoutWaitingNodeAddedMessage() throws Exception {
+ startServerNodes(2);
+
+ final CountDownLatch cnt = new CountDownLatch(1);
+
+ ((TcpDiscoverySpi)G.ignite("server-1").configuration().getDiscoverySpi()).addSendMessageListener(
+ new IgniteInClosure<TcpDiscoveryAbstractMessage>() {
+ @Override public void apply(TcpDiscoveryAbstractMessage msg) {
+ try {
+ cnt.await(10, MINUTES);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedException(e);
+ }
+ }
+ });
+
+ try {
+ startGrid("client-0");
+
+ assert false;
+ }
+ catch (IgniteCheckedException e) {
+ cnt.countDown();
+
+ IgniteSpiException spiEx = e.getCause(IgniteSpiException.class);
+
+ assert spiEx != null : e;
+ assert spiEx.getMessage().contains("Join process timed out") : spiEx.getMessage();
+ }
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
+ public void testGridStartTime() throws Exception {
+ startServerNodes(2);
+
+ startClientNodes(2);
+
+ long startTime = -1;
+
+ for (Ignite g : G.allGrids()) {
+ IgniteEx kernal = (IgniteKernal)g;
+
+ assertTrue(kernal.context().discovery().gridStartTime() > 0);
+
+ if (startTime == -1)
+ startTime = kernal.context().discovery().gridStartTime();
+ else
+ assertEquals(startTime, kernal.context().discovery().gridStartTime());
+ }
+ }
+
+ /**
+ * @param clientIdx Index.
+ * @throws Exception In case of error.
+ */
+ private void setClientRouter(int clientIdx, int srvIdx) throws Exception {
+ TcpClientDiscoverySpi disco =
+ (TcpClientDiscoverySpi)G.ignite("client-" + clientIdx).configuration().getDiscoverySpi();
+
+ TcpDiscoveryVmIpFinder ipFinder = (TcpDiscoveryVmIpFinder)disco.getIpFinder();
+
+ String addr = new ArrayList<>(IP_FINDER.getRegisteredAddresses()).get(srvIdx).toString();
+
+ if (addr.startsWith("/"))
+ addr = addr.substring(1);
+
+ ipFinder.setAddresses(Arrays.asList(addr));
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startServerNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param cnt Number of nodes.
+ * @throws Exception In case of error.
+ */
+ private void startClientNodes(int cnt) throws Exception {
+ for (int i = 0; i < cnt; i++) {
+ Ignite g = startGrid("client-" + clientIdx.getAndIncrement());
+
+ clientNodeIds.add(g.cluster().localNode().id());
+ }
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failServer(int idx) {
+ ((TcpDiscoverySpi)G.ignite("server-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param idx Index.
+ */
+ private void failClient(int idx) {
+ ((TcpClientDiscoverySpi)G.ignite("client-" + idx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void attachListeners(int srvCnt, int clientCnt) throws Exception {
+ if (srvJoinedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on server: " + evt);
+
+ srvJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (srvLeftLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on server: " + evt);
+
+ srvLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (srvFailedLatch != null) {
+ for (int i = 0; i < srvCnt; i++) {
+ G.ignite("server-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on server: " + evt);
+
+ srvFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+
+ if (clientJoinedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Joined event fired on client: " + evt);
+
+ clientJoinedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_JOINED);
+ }
+ }
+
+ if (clientLeftLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Left event fired on client: " + evt);
+
+ clientLeftLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_LEFT);
+ }
+ }
+
+ if (clientFailedLatch != null) {
+ for (int i = 0; i < clientCnt; i++) {
+ G.ignite("client-" + i).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ info("Failed event fired on client: " + evt);
+
+ clientFailedLatch.countDown();
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+ }
+ }
+
+ /**
+ * @param srvCnt Number of server nodes.
+ * @param clientCnt Number of client nodes.
+ */
+ private void checkNodes(int srvCnt, int clientCnt) {
+ for (int i = 0; i < srvCnt; i++) {
+ Ignite g = G.ignite("server-" + i);
+
+ assertTrue(srvNodeIds.contains(g.cluster().localNode().id()));
+
+ assertFalse(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+
+ for (int i = 0; i < clientCnt; i++) {
+ Ignite g = G.ignite("client-" + i);
+
+ assertTrue(clientNodeIds.contains(g.cluster().localNode().id()));
+
+ assertTrue(g.cluster().localNode().isClient());
+
+ checkRemoteNodes(g, srvCnt + clientCnt - 1);
+ }
+ }
+
+ /**
+ * @param ignite Grid.
+ * @param expCnt Expected nodes count.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ private void checkRemoteNodes(Ignite ignite, int expCnt) {
+ Collection<ClusterNode> nodes = ignite.cluster().forRemotes().nodes();
+
+ assertEquals(expCnt, nodes.size());
+
+ for (ClusterNode node : nodes) {
+ UUID id = node.id();
+
+ if (clientNodeIds.contains(id))
+ assertTrue(node.isClient());
+ else if (srvNodeIds.contains(id))
+ assertFalse(node.isClient());
+ else
+ assert false : "Unexpected node ID: " + id;
+ }
+ }
+
+ /**
+ * @param latch Latch.
+ * @throws InterruptedException If interrupted.
+ */
+ private void await(CountDownLatch latch) throws InterruptedException {
+ assertTrue("Latch count: " + latch.getCount(), latch.await(10000, MILLISECONDS));
+ }
+
+ /**
+ */
+ private static class MessageListener implements IgniteBiPredicate<UUID, Object> {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(UUID uuid, Object msg) {
+ X.println(">>> Received [locNodeId=" + ignite.configuration().getNodeId() + ", msg=" + msg + ']');
+
+ msgLatch.countDown();
+
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
+ /** */
+ private final Object mux = new Object();
+
+ /** */
+ private final AtomicBoolean writeLock = new AtomicBoolean();
+
+ /** */
+ private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+ /**
+ * @param lock Lock.
+ */
+ private void waitFor(AtomicBoolean lock) {
+ try {
+ synchronized (mux) {
+ while (lock.get())
+ mux.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param isPause Is lock.
+ * @param locks Locks.
+ */
+ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+ synchronized (mux) {
+ for (AtomicBoolean lock : locks)
+ lock.set(isPause);
+
+ mux.notifyAll();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
+ GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
+ waitFor(writeLock);
+
+ super.writeToSocket(sock, msg, bout);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
+ waitFor(openSockLock);
+
+ return super.openSocket(sockAddr);
+ }
+
+ /**
+ *
+ */
+ public void pauseSocketWrite() {
+ pauseResumeOperation(true, writeLock);
+ }
+
+ /**
+ *
+ */
+ public void pauseAll() {
+ pauseResumeOperation(true, openSockLock, writeLock);
+
+ brokeConnection();
+ }
+
+ /**
+ *
+ */
+ public void resumeAll() {
+ pauseResumeOperation(false, openSockLock, writeLock);
+ }
+ }
+ }