You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/02 19:56:52 UTC

incubator-ignite git commit: # IGNITE-709 Add test to check pending messages on client reconnect.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-709_2 d59403b48 -> f9f766263


# IGNITE-709 Add test to check pending messages on client reconnect.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f76626
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f76626
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f76626

Branch: refs/heads/ignite-709_2
Commit: f9f766263dfa9ea170cae4816e7667e0e3adb310
Parents: d59403b
Author: sevdokimov <se...@jetbrains.com>
Authored: Sat May 2 20:56:41 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sat May 2 20:56:41 2015 +0300

----------------------------------------------------------------------
 .../tcp/TcpClientDiscoverySelfTest.java         | 53 ++++++++++++++++----
 1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f76626/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index fd9b0f7..05fb52b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -39,7 +39,6 @@ import java.net.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
 
 import static java.util.concurrent.TimeUnit.*;
 import static org.apache.ignite.events.EventType.*;
@@ -327,7 +326,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
 
         attachListeners(2, 2);
 
-        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).suspend();
+        ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
 
         stopGrid("server-2");
 
@@ -786,21 +785,55 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
      */
     private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
         /** */
-        private final Lock ioOperationsLock = new ReentrantLock();
+        private final Object mux = new Object();
+
+        /** */
+        private final AtomicBoolean writeLock = new AtomicBoolean();
+
+        /** */
+        private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+        /**
+         * @param lock Lock.
+         */
+        private void waitFor(AtomicBoolean lock) {
+            try {
+                synchronized (mux) {
+                    while (lock.get())
+                        mux.wait();
+                }
+            }
+            catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+
+                throw new RuntimeException(e);
+            }
+        }
+
+        /**
+         * @param isPause Is lock.
+         * @param locks Locks.
+         */
+        private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+            synchronized (mux) {
+                for (AtomicBoolean lock : locks)
+                    lock.set(isPause);
+
+                mux.notifyAll();
+            }
+        }
 
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
             GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
-            ioOperationsLock.lock();
-            ioOperationsLock.unlock();
+            waitFor(writeLock);
 
             super.writeToSocket(sock, msg, bout);
         }
 
         /** {@inheritDoc} */
         @Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
-            ioOperationsLock.lock();
-            ioOperationsLock.unlock();
+            waitFor(openSockLock);
 
             return super.openSocket(sockAddr);
         }
@@ -808,8 +841,8 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
         /**
          *
          */
-        private void suspend() {
-            ioOperationsLock.lock();
+        private void pauseAll() {
+            pauseResumeOperation(true, openSockLock, writeLock);
 
             brokeConnection();
         }
@@ -818,7 +851,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
          *
          */
         private void resume() {
-            ioOperationsLock.unlock();
+            pauseResumeOperation(false, openSockLock, writeLock);
         }
     }
 }