You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/02 19:56:52 UTC
incubator-ignite git commit: # IGNITE-709 Add test to check pending
messages on client reconnect.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-709_2 d59403b48 -> f9f766263
# IGNITE-709 Add test to check pending messages on client reconnect.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f76626
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f76626
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f76626
Branch: refs/heads/ignite-709_2
Commit: f9f766263dfa9ea170cae4816e7667e0e3adb310
Parents: d59403b
Author: sevdokimov <se...@jetbrains.com>
Authored: Sat May 2 20:56:41 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sat May 2 20:56:41 2015 +0300
----------------------------------------------------------------------
.../tcp/TcpClientDiscoverySelfTest.java | 53 ++++++++++++++++----
1 file changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f76626/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
index fd9b0f7..05fb52b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySelfTest.java
@@ -39,7 +39,6 @@ import java.net.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
import static java.util.concurrent.TimeUnit.*;
import static org.apache.ignite.events.EventType.*;
@@ -327,7 +326,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
attachListeners(2, 2);
- ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).suspend();
+ ((TestTcpClientDiscovery)G.ignite("client-1").configuration().getDiscoverySpi()).pauseAll();
stopGrid("server-2");
@@ -786,21 +785,55 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
*/
private static class TestTcpClientDiscovery extends TcpClientDiscoverySpi {
/** */
- private final Lock ioOperationsLock = new ReentrantLock();
+ private final Object mux = new Object();
+
+ /** */
+ private final AtomicBoolean writeLock = new AtomicBoolean();
+
+ /** */
+ private final AtomicBoolean openSockLock = new AtomicBoolean();
+
+ /**
+ * @param lock Lock.
+ */
+ private void waitFor(AtomicBoolean lock) {
+ try {
+ synchronized (mux) {
+ while (lock.get())
+ mux.wait();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @param isPause Is lock.
+ * @param locks Locks.
+ */
+ private void pauseResumeOperation(boolean isPause, AtomicBoolean... locks) {
+ synchronized (mux) {
+ for (AtomicBoolean lock : locks)
+ lock.set(isPause);
+
+ mux.notifyAll();
+ }
+ }
/** {@inheritDoc} */
@Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg,
GridByteArrayOutputStream bout) throws IOException, IgniteCheckedException {
- ioOperationsLock.lock();
- ioOperationsLock.unlock();
+ waitFor(writeLock);
super.writeToSocket(sock, msg, bout);
}
/** {@inheritDoc} */
@Override protected Socket openSocket(InetSocketAddress sockAddr) throws IOException {
- ioOperationsLock.lock();
- ioOperationsLock.unlock();
+ waitFor(openSockLock);
return super.openSocket(sockAddr);
}
@@ -808,8 +841,8 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
/**
*
*/
- private void suspend() {
- ioOperationsLock.lock();
+ private void pauseAll() {
+ pauseResumeOperation(true, openSockLock, writeLock);
brokeConnection();
}
@@ -818,7 +851,7 @@ public class TcpClientDiscoverySelfTest extends GridCommonAbstractTest {
*
*/
private void resume() {
- ioOperationsLock.unlock();
+ pauseResumeOperation(false, openSockLock, writeLock);
}
}
}