You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/12/25 07:36:34 UTC
[1/6] ignite git commit: ignite-1.5 Fixed links.
Repository: ignite
Updated Branches:
refs/heads/ignite-1537 2c4612062 -> bf101deb9
ignite-1.5 Fixed links.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fe140992
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fe140992
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fe140992
Branch: refs/heads/ignite-1537
Commit: fe140992581d196fd2abe9702c3a6ac28ee9ba47
Parents: 61c072e
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 24 12:14:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 24 12:14:18 2015 +0300
----------------------------------------------------------------------
README.txt | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/fe140992/README.txt
----------------------------------------------------------------------
diff --git a/README.txt b/README.txt
index f66f9ee..9133f2c 100644
--- a/README.txt
+++ b/README.txt
@@ -17,9 +17,9 @@ The main feature set of Ignite In-Memory Data Fabric includes:
For information on how to get started with Apache Ignite please visit:
- http://apacheignite.readme.io/v1.0/docs/getting-started
+ http://apacheignite.readme.io/docs/getting-started
You can find Apache Ignite documentation here:
- http://apacheignite.readme.io/v1.0/docs/getting-started
+ http://apacheignite.readme.io/docs
[5/6] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-1.5' into ignite-1537
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-1.5' into ignite-1537
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/023ea3db
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/023ea3db
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/023ea3db
Branch: refs/heads/ignite-1537
Commit: 023ea3dbd9958acdcc1b65d085f8ed3acf4e21f0
Parents: 2c46120 49c2988
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 25 08:58:50 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 25 08:58:50 2015 +0300
----------------------------------------------------------------------
README.txt | 4 +-
.../JettyRestProcessorAbstractSelfTest.java | 25 +++-
.../connection/GridClientNioTcpConnection.java | 17 +--
.../client/message/GridClientCacheBean.java | 139 +++++++++++++++++++
.../rest/client/message/GridClientNodeBean.java | 70 ++++++----
.../top/GridTopologyCommandHandler.java | 38 +++--
...yMetadataUpdateChangingTopologySelfTest.java | 1 -
7 files changed, 232 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
[6/6] ignite git commit: ignite-647
Posted by sb...@apache.org.
ignite-647
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf101deb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf101deb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf101deb
Branch: refs/heads/ignite-1537
Commit: bf101deb96d4a718d12fcbc857c9f272b4335166
Parents: 023ea3d
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 25 09:11:33 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 25 09:11:33 2015 +0300
----------------------------------------------------------------------
.../internal/processors/cache/GridCacheIoManager.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/bf101deb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 2a555c1..b297827 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -124,7 +124,15 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
if (cacheMsg.partitionExchangeMessage()) {
if (cacheMsg instanceof GridDhtAffinityAssignmentRequest) {
- fut = cctx.exchange().affinityReadyFuture(new AffinityTopologyVersion(cctx.localNode().order()));
+ assert cacheMsg.topologyVersion() != null : cacheMsg;
+
+ AffinityTopologyVersion startTopVer = new AffinityTopologyVersion(cctx.localNode().order());
+
+ assert cacheMsg.topologyVersion().compareTo(startTopVer) > 0 :
+ "Invalid affinity request [startTopVer=" + startTopVer + ", msg=" + cacheMsg + ']';
+
+ // Need to wait for initial exchange to avoid race between cache start and affinity request.
+ fut = cctx.exchange().affinityReadyFuture(startTopVer);
if (fut != null && !fut.isDone()) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
[4/6] ignite git commit: ignite-1.5 Fixed test.
Posted by sb...@apache.org.
ignite-1.5 Fixed test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/49c29886
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/49c29886
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/49c29886
Branch: refs/heads/ignite-1537
Commit: 49c298866b7c113aa62af4fe0587d6d39edd9f50
Parents: debe34d
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 24 15:52:59 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 24 15:52:59 2015 +0300
----------------------------------------------------------------------
.../util/nio/GridNioRecoveryDescriptor.java | 7 -
.../ignite/internal/util/nio/GridNioServer.java | 7 +
.../communication/tcp/TcpCommunicationSpi.java | 12 +-
.../internal/util/nio/GridNioSelfTest.java | 127 ++++++++++++-------
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 14 ++
.../GridUriDeploymentFileProcessorSelfTest.java | 19 ++-
6 files changed, 120 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 5647239..429f990 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -134,13 +134,6 @@ public class GridNioRecoveryDescriptor {
}
/**
- * @return Received messages count.
- */
- public long receivedCount() {
- return rcvCnt;
- }
-
- /**
* @return Maximum size of unacknowledged messages queue.
*/
public int queueLimit() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index be28c30..17a0b8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -309,6 +309,13 @@ public class GridNioServer<T> {
}
/**
+ * @return Configured port.
+ */
+ public int port() {
+ return locAddr != null ? locAddr.getPort() : -1;
+ }
+
+ /**
* Creates and returns a builder for a new instance of this class.
*
* @return Builder for new instance.
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index bf6e869..6cdfe9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -620,7 +620,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
nioSrvr.resend(ses);
if (sndRes)
- nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.receivedCount()));
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received()));
recovery.connected();
@@ -714,7 +714,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
};
- nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.receivedCount()), lsnr);
+ nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
}
else {
try {
@@ -2587,16 +2587,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
else
ch.write(ByteBuffer.wrap(U.IGNITE_HEADER));
- ClusterNode localNode = getLocalNode();
+ ClusterNode locNode = getLocalNode();
- if (localNode == null)
+ if (locNode == null)
throw new IgniteCheckedException("Local node has not been started or " +
"fully initialized [isStopping=" + getSpiContext().isStopping() + ']');
if (recovery != null) {
- HandshakeMessage msg = new HandshakeMessage(localNode.id(),
+ HandshakeMessage msg = new HandshakeMessage(locNode.id(),
recovery.incrementConnectCount(),
- recovery.receivedCount());
+ recovery.received());
if (log.isDebugEnabled())
log.debug("Write handshake message [rmtNode=" + rmtNodeId + ", msg=" + msg + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index 594e3c2..6089795 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
+import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -61,6 +62,9 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
/** Message count in test without reconnect. */
private static final int MSG_CNT = 2000;
+ /** */
+ private static final int START_PORT = 55443;
+
/** Message id provider. */
private static final AtomicInteger idProvider = new AtomicInteger(1);
@@ -80,13 +84,15 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
private static volatile Marshaller marsh;
/** Test port. */
- private int port = 55443;
+ private static int port;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
getTestResources().startThreads(true);
marsh = getTestResources().getMarshaller();
+
+ port = START_PORT;
}
/** {@inheritDoc} */
@@ -94,13 +100,6 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
getTestResources().stopThreads();
}
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- port++;
- }
-
/**
* @throws Exception If failed.
*/
@@ -127,19 +126,18 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr);
try {
IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
byte[] msg = new byte[MSG_SIZE];
for (int i = 0; i < msg.length; i++)
msg[i] = (byte) (i ^ (i * i - 1)); // Some data
for (int i = 0; i < RECONNECT_MSG_CNT; i++)
- validateSendMessage(msg);
+ validateSendMessage(srvr.port(), msg);
}
}, THREAD_CNT);
@@ -177,11 +175,11 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr);
+ GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr);
Socket s = createSocket();
- s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000);
+ s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000);
try {
byte[] msg = new byte[MSG_SIZE];
@@ -235,12 +233,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr);
+ GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr);
try {
Socket s = createSocket();
- s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000);
+ s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000);
if (!(s instanceof SSLSocket)) {
// These methods are not supported by SSL sockets.
@@ -277,7 +275,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr);
final AtomicLong cnt = new AtomicLong();
@@ -285,8 +283,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
try {
IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
- @Override
- public void run() {
+ @Override public void run() {
try {
byte[] msg = new byte[MSG_SIZE];
@@ -294,7 +291,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
msg[i] = (byte) (i ^ (i * i - 1)); // Some data
try (Socket s = createSocket()) {
- s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000);
+ s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000);
OutputStream out = s.getOutputStream();
@@ -369,12 +366,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridPlainParser(), lsnr);
+ GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr);
try {
Socket s = createSocket();
- s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000);
+ s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000);
// This is needed for SSL to begin handshake.
s.getOutputStream().write(new byte[1]);
@@ -439,16 +436,12 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer.Builder<?> builder = serverBuilder(port, new GridPlainParser(), lsnr);
-
- GridNioServer<?> srvr = builder.sendQueueLimit(5).build();
-
- srvr.start();
+ GridNioServer<?> srvr = startServer(new GridPlainParser(), lsnr, 5);
try {
Socket s = createSocket();
- s.connect(new InetSocketAddress(U.getLocalHost(), port), 1000);
+ s.connect(new InetSocketAddress(U.getLocalHost(), srvr.port()), 1000);
s.getOutputStream().write(new byte[1]);
@@ -473,9 +466,10 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
/**
* Sends message and validates reply.
*
+ * @param port Port.
* @param msg Message to send.
*/
- private void validateSendMessage(byte[] msg) {
+ private void validateSendMessage(int port, byte[] msg) {
try {
Socket s = createSocket();
@@ -552,19 +546,54 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
/**
* Starts server with specified arguments.
*
- * @param port Port to listen.
* @param parser Parser to use.
* @param lsnr Listener.
* @return Started server.
* @throws Exception If failed.
*/
- protected final GridNioServer<?> startServer(int port, GridNioParser parser, GridNioServerListener lsnr)
+ protected final GridNioServer<?> startServer(GridNioParser parser, GridNioServerListener lsnr)
throws Exception {
- GridNioServer<?> srvr = serverBuilder(port, parser, lsnr).build();
+ return startServer(parser, lsnr, null);
+ }
+
+ /**
+ * Starts server with specified arguments.
+ *
+ * @param parser Parser to use.
+ * @param lsnr Listener.
+ * @param queueLimit Optional send queue limit.
+ * @return Started server.
+ * @throws Exception If failed.
+ */
+ protected final GridNioServer<?> startServer(GridNioParser parser,
+ GridNioServerListener lsnr,
+ @Nullable Integer queueLimit) throws Exception {
+ for (int i = 0; i < 10; i++) {
+ try {
+ int srvPort = port++;
+
+ GridNioServer.Builder<?> builder = serverBuilder(srvPort, parser, lsnr);
+
+ if (queueLimit != null)
+ builder.sendQueueLimit(queueLimit);
+
+ GridNioServer<?> srvr = builder.build();
+
+ srvr.start();
+
+ return srvr;
+ }
+ catch (IgniteCheckedException e) {
+ if (i < 9 && e.hasCause(BindException.class))
+ log.error("Failed to start server, will try another port: " + e);
+ else
+ throw e;
+ }
+ }
- srvr.start();
+ fail("Failed to start server.");
- return srvr;
+ return null;
}
/**
@@ -604,13 +633,13 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
NioListener lsnr = new NioListener(latch);
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
+ GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
TestClient client = null;
try {
for (int i = 0; i < 5; i++) {
- client = createClient(U.getLocalHost(), port, U.getLocalHost());
+ client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost());
client.sendMessage(createMessage(), MSG_SIZE);
client.sendMessage(createMessage(), MSG_SIZE);
@@ -638,13 +667,13 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
NioListener lsnr = new NioListener(latch);
- GridNioServer<?> srvr1 = startServer(port, new BufferedParser(false), lsnr);
- GridNioServer<?> srvr2 = startServer(port + 1, new BufferedParser(false), lsnr);
+ GridNioServer<?> srvr1 = startServer(new BufferedParser(false), lsnr);
+ GridNioServer<?> srvr2 = startServer(new BufferedParser(false), lsnr);
GridNioSession ses = null;
try {
- SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), port + 1));
+ SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port()));
GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null);
@@ -676,7 +705,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
NioListener lsnr = new NioListener(latch);
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
try {
final byte[] data = createMessage();
@@ -686,7 +715,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
TestClient client = null;
try {
- client = createClient(U.getLocalHost(), port, U.getLocalHost());
+ client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost());
for (int i = 0; i < MSG_CNT; i++)
client.sendMessage(data, data.length);
@@ -722,7 +751,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
final AtomicReference<Exception> err = new AtomicReference<>();
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()),
+ final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()),
new EchoListener());
try {
@@ -734,7 +763,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
TestClient client = null;
try {
- client = createClient(U.getLocalHost(), port, U.getLocalHost());
+ client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost());
MessageWithId msg = new MessageWithId(idProvider.getAndIncrement());
@@ -827,7 +856,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
final AtomicLong cntr = new AtomicLong();
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
try {
multithreaded(new Runnable() {
@@ -835,7 +864,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
TestClient client = null;
try {
- client = createClient(U.getLocalHost(), port, U.getLocalHost());
+ client = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost());
while (cntr.getAndIncrement() < MSG_CNT * THREAD_CNT) {
MessageWithId msg = new MessageWithId(idProvider.getAndIncrement());
@@ -908,14 +937,14 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
srvr.idleTimeout(1000);
try {
multithreaded(new Runnable() {
@Override public void run() {
- try (TestClient ignored = createClient(U.getLocalHost(), port, U.getLocalHost())) {
+ try (TestClient ignored = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost())) {
info("Before sleep.");
U.sleep(4000);
@@ -976,7 +1005,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
};
- GridNioServer<?> srvr = startServer(port, new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
+ final GridNioServer<?> srvr = startServer(new GridBufferedParser(true, ByteOrder.nativeOrder()), lsnr);
// Set flag using reflection.
Field f = srvr.getClass().getDeclaredField("skipWrite");
@@ -990,7 +1019,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
try {
multithreaded(new Runnable() {
@Override public void run() {
- try (TestClient ignored = createClient(U.getLocalHost(), port, U.getLocalHost())) {
+ try (TestClient ignored = createClient(U.getLocalHost(), srvr.port(), U.getLocalHost())) {
info("Before sleep.");
U.sleep(4000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 9e78fb9..d07a1e6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -149,6 +149,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
int expMsgs = 0;
+ long totAcked = 0;
+
for (int i = 0; i < 5; i++) {
info("Iteration: " + i);
@@ -160,6 +162,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
expMsgs += msgPerIter;
+ final long totAcked0 = totAcked;
+
for (TcpCommunicationSpi spi : spis) {
GridNioServer srv = U.field(spi, "nioSrvr");
@@ -177,6 +181,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
+ long acked = GridTestUtils.getFieldValue(recoveryDesc, "acked");
+
+ return acked > totAcked0;
+ }
+ }, 5000);
+
+ GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
return recoveryDesc.messagesFutures().isEmpty();
}
}, 10_000);
@@ -204,6 +216,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
assertEquals(expMsgs, lsnr.rcvCnt.get());
}
+
+ totAcked += msgPerIter;
}
}
finally {
http://git-wip-us.apache.org/repos/asf/ignite/blob/49c29886/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java b/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java
index b87551d..cbcac9c 100644
--- a/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java
+++ b/modules/urideploy/src/test/java/org/apache/ignite/spi/deployment/uri/GridUriDeploymentFileProcessorSelfTest.java
@@ -20,7 +20,11 @@ package org.apache.ignite.spi.deployment.uri;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.spi.GridSpiTest;
import org.apache.ignite.testframework.junits.spi.GridSpiTestConfig;
@@ -69,7 +73,7 @@ public class GridUriDeploymentFileProcessorSelfTest extends GridUriDeploymentAbs
* if {@code false} then it should be undeployed.
* @throws Exception If failed.
*/
- private void proceedTest(String garFileName, String garDescFileName, String taskId, boolean deployed)
+ private void proceedTest(String garFileName, String garDescFileName, final String taskId, final boolean deployed)
throws Exception {
info("This test checks broken tasks. All exceptions that might happen are the part of the test.");
@@ -123,10 +127,17 @@ public class GridUriDeploymentFileProcessorSelfTest extends GridUriDeploymentAbs
// Copy to deployment directory.
U.copy(garFile, destDir, true);
- // Wait for SPI
- Thread.sleep(1000);
-
try {
+ // Wait for SPI
+ GridTestUtils.waitForCondition(new GridAbsPredicateX() {
+ @Override public boolean applyx() throws IgniteCheckedException {
+ if (deployed)
+ return getSpi().findResource(taskId) != null;
+ else
+ return getSpi().findResource(taskId) == null;
+ }
+ }, 5000);
+
if (deployed)
assert getSpi().findResource(taskId) != null;
else
[3/6] ignite git commit: IGNITE-2252 Added support for cache sql
schema in REST topology command - Fixes #374.
Posted by sb...@apache.org.
IGNITE-2252 Added support for cache sql schema in REST topology command - Fixes #374.
Signed-off-by: Andrey <an...@gridgain.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/debe34de
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/debe34de
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/debe34de
Branch: refs/heads/ignite-1537
Commit: debe34de1881f5a1268993ae584db70f16a761cf
Parents: 383f317
Author: Andrey <an...@gridgain.com>
Authored: Thu Dec 24 17:49:02 2015 +0700
Committer: Andrey <an...@gridgain.com>
Committed: Thu Dec 24 17:49:02 2015 +0700
----------------------------------------------------------------------
.../JettyRestProcessorAbstractSelfTest.java | 25 +++-
.../connection/GridClientNioTcpConnection.java | 17 +--
.../client/message/GridClientCacheBean.java | 139 +++++++++++++++++++
.../rest/client/message/GridClientNodeBean.java | 70 ++++++----
.../top/GridTopologyCommandHandler.java | 38 +++--
5 files changed, 230 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/debe34de/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 4c73f78..4b1d47c 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -35,6 +35,7 @@ import java.util.regex.Pattern;
import net.sf.json.JSONNull;
import net.sf.json.JSONObject;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSqlIndexMetada
import org.apache.ignite.internal.processors.cache.query.GridCacheSqlMetadata;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandler;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -1054,11 +1054,28 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro
assertEquals(JSONNull.getInstance(), node.get("attributes"));
assertEquals(JSONNull.getInstance(), node.get("metrics"));
- assertEquals("PARTITIONED", node.get("defaultCacheMode"));
+ Collection<Map> caches = (Collection)node.get("caches");
- Map caches = (Map)node.get("caches");
+ Collection<IgniteCacheProxy<?, ?>> publicCaches = grid(0).context().cache().publicCaches();
- assertEquals(F.asMap("person", "PARTITIONED"), caches);
+ assertNotNull(caches);
+ assertEquals(publicCaches.size(), caches.size());
+
+ for (Map cache : caches) {
+ final String cacheName = cache.get("name").equals("") ? null : (String)cache.get("name");
+
+ IgniteCacheProxy<?, ?> publicCache = F.find(publicCaches, null, new P1<IgniteCacheProxy<?, ?>>() {
+ @Override public boolean apply(IgniteCacheProxy<?, ?> c) {
+ return F.eq(c.getName(), cacheName);
+ }
+ });
+
+ assertNotNull(publicCache);
+
+ CacheMode cacheMode = CacheMode.valueOf((String)cache.get("mode"));
+
+ assertEquals(publicCache.getConfiguration(CacheConfiguration.class).getCacheMode(),cacheMode);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/debe34de/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
index 576df3a..cfcb07f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientNioTcpConnection.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.rest.client.message.GridClientCache
import org.apache.ignite.internal.processors.rest.client.message.GridClientHandshakeRequest;
import org.apache.ignite.internal.processors.rest.client.message.GridClientMessage;
import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeBean;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheBean;
import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeMetricsBean;
import org.apache.ignite.internal.processors.rest.client.message.GridClientPingPacket;
import org.apache.ignite.internal.processors.rest.client.message.GridClientResponse;
@@ -926,27 +927,17 @@ public class GridClientNioTcpConnection extends GridClientConnection {
Map<String, GridClientCacheMode> caches = new HashMap<>();
if (nodeBean.getCaches() != null) {
- for (Map.Entry<String, String> e : nodeBean.getCaches().entrySet()) {
+ for (GridClientCacheBean cacheBean : nodeBean.getCaches()) {
try {
- caches.put(e.getKey(), GridClientCacheMode.valueOf(e.getValue()));
+ caches.put(cacheBean.getName(), cacheBean.getMode());
}
catch (IllegalArgumentException ignored) {
log.warning("Invalid cache mode received from remote node (will ignore) [srv=" + serverAddress() +
- ", cacheName=" + e.getKey() + ", cacheMode=" + e.getValue() + ']');
+ ", cacheName=" + cacheBean.getName() + ", cacheMode=" + cacheBean.getMode() + ']');
}
}
}
- if (nodeBean.getDefaultCacheMode() != null) {
- try {
- caches.put(null, GridClientCacheMode.valueOf(nodeBean.getDefaultCacheMode()));
- }
- catch (IllegalArgumentException ignored) {
- log.warning("Invalid cache mode received for default cache from remote node (will ignore) [srv="
- + serverAddress() + ", cacheMode=" + nodeBean.getDefaultCacheMode() + ']');
- }
- }
-
if (!caches.isEmpty())
nodeBuilder.caches(caches);
http://git-wip-us.apache.org/repos/asf/ignite/blob/debe34de/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientCacheBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientCacheBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientCacheBean.java
new file mode 100644
index 0000000..e055ec3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientCacheBean.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.rest.client.message;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.client.GridClientCacheMode;
+
+/**
+ * Cache bean.
+ */
+public class GridClientCacheBean implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Cache name
+ */
+ private String name;
+
+ /**
+ * Cache mode
+ */
+ private GridClientCacheMode mode;
+
+ /**
+ * Custom name of the sql schema.
+ */
+ private String sqlSchema;
+
+ public GridClientCacheBean() {
+ }
+
+ public GridClientCacheBean(String name, GridClientCacheMode mode, String sqlSchema) {
+ this.name = name;
+ this.mode = mode;
+ this.sqlSchema = sqlSchema;
+ }
+
+ /**
+ * Gets cache name.
+ *
+ * @return Cache name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets cache name.
+ *
+ * @param name Cache name.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets cache mode.
+ *
+ * @return Cache mode.
+ */
+ public GridClientCacheMode getMode() {
+ return mode;
+ }
+
+ /**
+ * Sets cache mode.
+ *
+ * @param mode Cache mode.
+ */
+ public void setMode(GridClientCacheMode mode) {
+ this.mode = mode;
+ }
+
+ /**
+ * Gets custom name of the sql schema.
+ *
+ * @return Custom name of the sql schema.
+ */
+ public String getSqlSchema() {
+ return sqlSchema;
+ }
+
+ /**
+ * Sets custom name of the sql schema.
+ *
+ * @param sqlSchema Custom name of the sql schema.
+ */
+ public void setSqlSchema(String sqlSchema) {
+ this.sqlSchema = sqlSchema;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int hashCode() {
+ return name != null ? name.hashCode() : 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+
+ GridClientCacheBean other = (GridClientCacheBean) obj;
+
+ return name == null ? other.name == null : name.equals(other.name);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String toString() {
+ return "GridClientCacheBean [name=" + name + ", mode=" + mode + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/debe34de/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientNodeBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientNodeBean.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientNodeBean.java
index 2a34c80..8ba6eb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientNodeBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientNodeBean.java
@@ -21,10 +21,12 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
+import org.apache.ignite.internal.client.GridClientCacheMode;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -55,11 +57,8 @@ public class GridClientNodeBean implements Externalizable {
/** Node attributes. */
private Map<String, Object> attrs;
- /** Mode for cache with {@code null} name. */
- private String dfltCacheMode;
-
/** Node caches. */
- private Map<String, String> caches;
+ private Collection<GridClientCacheBean> caches;
/**
* Gets node ID.
@@ -177,40 +176,22 @@ public class GridClientNodeBean implements Externalizable {
/**
* Gets configured node caches.
*
- * @return Map where key is cache name and value is cache mode ("LOCAL", "REPLICATED", "PARTITIONED").
+ * @return Configured node caches.
*/
- public Map<String, String> getCaches() {
+ public Collection<GridClientCacheBean> getCaches() {
return caches;
}
/**
* Sets configured node caches.
*
- * @param caches Map where key is cache name and value is cache mode ("LOCAL", "REPLICATED", "PARTITIONED").
+ * @param caches Configured node caches.
*/
- public void setCaches(Map<String, String> caches) {
+ public void setCaches(Collection<GridClientCacheBean> caches) {
this.caches = caches;
}
/**
- * Gets mode for cache with null name.
- *
- * @return Default cache mode.
- */
- public String getDefaultCacheMode() {
- return dfltCacheMode;
- }
-
- /**
- * Sets mode for default cache.
- *
- * @param dfltCacheMode Default cache mode.
- */
- public void setDefaultCacheMode(String dfltCacheMode) {
- this.dfltCacheMode = dfltCacheMode;
- }
-
- /**
* Sets REST binary protocol port.
*
* @param tcpPort Port on which REST binary protocol is bound.
@@ -242,10 +223,25 @@ public class GridClientNodeBean implements Externalizable {
out.writeInt(tcpPort);
out.writeInt(0); // Jetty port.
+ String dfltCacheMode = null;
+
+ Map<String, String> cacheMap = null;
+
+ if (caches != null) {
+ cacheMap = U.newHashMap(caches.size());
+
+ for (GridClientCacheBean cacheBean : caches) {
+ if (cacheBean.getName() == null)
+ dfltCacheMode = cacheBean.getMode().toString();
+ else
+ cacheMap.put(cacheBean.getName(), cacheBean.getMode().toString());
+ }
+ }
+
U.writeString(out, dfltCacheMode);
U.writeMap(out, attrs);
- U.writeMap(out, caches);
+ U.writeMap(out, cacheMap);
U.writeCollection(out, tcpAddrs);
U.writeCollection(out, tcpHostNames);
@@ -263,10 +259,24 @@ public class GridClientNodeBean implements Externalizable {
tcpPort = in.readInt();
in.readInt(); // Jetty port.
- dfltCacheMode = U.readString(in);
+ String dfltCacheMode = U.readString(in);
attrs = U.readMap(in);
- caches = U.readMap(in);
+
+ Map<String, String> cacheMap = U.readMap(in);
+
+ if (cacheMap == null && dfltCacheMode != null) {
+ cacheMap = U.newHashMap(1);
+
+ cacheMap.put(null, dfltCacheMode);
+ }
+
+ if (cacheMap != null) {
+ caches = new ArrayList<>(cacheMap.size());
+
+ for (Map.Entry<String, String> e : cacheMap.entrySet())
+ caches.add(new GridClientCacheBean(e.getKey(), GridClientCacheMode.valueOf(e.getValue()), null));
+ }
tcpAddrs = U.readCollection(in);
tcpHostNames = U.readCollection(in);
@@ -283,4 +293,4 @@ public class GridClientNodeBean implements Externalizable {
@Override public String toString() {
return "GridClientNodeBean [id=" + nodeId + ']';
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/debe34de/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
index f950ac2..297785e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/top/GridTopologyCommandHandler.java
@@ -30,13 +30,17 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.client.GridClientCacheMode;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.rest.GridRestCommand;
import org.apache.ignite.internal.processors.rest.GridRestProtocol;
import org.apache.ignite.internal.processors.rest.GridRestResponse;
import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeBean;
+import org.apache.ignite.internal.processors.rest.client.message.GridClientCacheBean;
import org.apache.ignite.internal.processors.rest.client.message.GridClientNodeMetricsBean;
import org.apache.ignite.internal.processors.rest.handlers.GridRestCommandHandlerAdapter;
import org.apache.ignite.internal.processors.rest.request.GridRestRequest;
@@ -175,6 +179,22 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter {
}
/**
+ * Creates cache bean.
+ *
+ * @param ccfg Cache configuration.
+ * @return Cache bean.
+ */
+ public GridClientCacheBean createCacheBean(CacheConfiguration ccfg) {
+ GridClientCacheBean cacheBean = new GridClientCacheBean();
+
+ cacheBean.setName(ccfg.getName());
+ cacheBean.setMode(GridClientCacheMode.valueOf(ccfg.getCacheMode().toString()));
+ cacheBean.setSqlSchema(ccfg.getSqlSchema());
+
+ return cacheBean;
+ }
+
+ /**
* Creates node bean out of grid node. Notice that cache attribute is handled separately.
*
* @param node Grid node.
@@ -194,22 +214,16 @@ public class GridTopologyCommandHandler extends GridRestCommandHandlerAdapter {
nodeBean.setTcpAddresses(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_ADDRS)));
nodeBean.setTcpHostNames(nonEmptyList(node.<Collection<String>>attribute(ATTR_REST_TCP_HOST_NAMES)));
- Map<String, CacheMode> nodeCaches = ctx.discovery().nodeCaches(node);
-
- Map<String, String> cacheMap = U.newHashMap(nodeCaches.size());
+ GridCacheProcessor cacheProc = ctx.cache();
- for (Map.Entry<String, CacheMode> cache : nodeCaches.entrySet()) {
- String cacheName = cache.getKey();
+ Map<String, CacheMode> nodeCaches = ctx.discovery().nodeCaches(node);
- String mode = cache.getValue().toString();
+ Collection<GridClientCacheBean> caches = new ArrayList<>(nodeCaches.size());
- if (cacheName != null)
- cacheMap.put(cacheName, mode);
- else
- nodeBean.setDefaultCacheMode(mode);
- }
+ for (String cacheName : nodeCaches.keySet())
+ caches.add(createCacheBean(cacheProc.cacheConfiguration(cacheName)));
- nodeBean.setCaches(cacheMap);
+ nodeBean.setCaches(caches);
if (mtr) {
ClusterMetrics metrics = node.metrics();
[2/6] ignite git commit: ignite-1.5 Corrected fix for hang on
metadata update. Fix for ignite-647 (issues with dynamic cache start when
fair affinity is used).
Posted by sb...@apache.org.
ignite-1.5 Corrected fix for hang on metadata update. Fix for ignite-647 (issues with dynamic cache start when fair affinity is used).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/383f317d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/383f317d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/383f317d
Branch: refs/heads/ignite-1537
Commit: 383f317d03aca8903aeaa00da903366911103cef
Parents: fe14099
Author: sboikov <sb...@gridgain.com>
Authored: Thu Dec 24 13:12:23 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Dec 24 13:12:23 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheProcessor.java | 3 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +
.../dht/atomic/GridDhtAtomicCache.java | 89 ++++++++----------
.../GridDhtPartitionsExchangeFuture.java | 20 +++-
...ridNearOptimisticTxPrepareFutureAdapter.java | 10 +-
.../ignite/IgniteCacheAffinitySelfTest.java | 7 --
.../fair/FairAffinityDynamicCacheSelfTest.java | 17 +---
.../cache/CrossCacheTxRandomOperationsTest.java | 2 -
...yMetadataUpdateChangingTopologySelfTest.java | 97 +++++++++++++-------
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 3 +-
.../TcpDiscoveryMulticastIpFinderSelfTest.java | 21 ++++-
.../IgniteCacheRestartTestSuite2.java | 3 +
.../stream/mqtt/IgniteMqttStreamerTest.java | 33 ++++---
13 files changed, 173 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 380c163..ff02e70 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -1961,7 +1961,8 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (req.initiatingNodeId() == null)
desc.staticallyConfigured(true);
- desc.receivedOnDiscovery(true);
+ if (joiningNodeId.equals(ctx.localNodeId()))
+ desc.receivedOnDiscovery(true);
DynamicCacheDescriptor old = registeredCaches.put(maskNull(req.cacheName()), desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 7586a42..bcc2ab7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -491,6 +491,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+ if (topVer == null)
+ topVer = ctx.cache().context().exchange().readyAffinityVersion();
+
BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));
if (err != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 634a9ea..393413e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1290,59 +1290,48 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
GridCacheReturn retVal = null;
- IgniteTxManager tm = ctx.tm();
+ if (keys.size() > 1 && // Several keys ...
+ writeThrough() && !req.skipStore() && // and store is enabled ...
+ !ctx.store().isLocal() && // and this is not local store ...
+ !ctx.dr().receiveEnabled() // and no DR.
+ ) {
+ // This method can only be used when there are no replicated entries in the batch.
+ UpdateBatchResult updRes = updateWithBatch(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry,
+ sndPrevVal);
- // Needed for metadata cache transaction.
- boolean set = tm.setTxTopologyHint(req.topologyVersion());
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
- try {
- if (keys.size() > 1 && // Several keys ...
- writeThrough() && !req.skipStore() && // and store is enabled ...
- !ctx.store().isLocal() && // and this is not local store ...
- !ctx.dr().receiveEnabled() // and no DR.
- ) {
- // This method can only be used when there are no replicated entries in the batch.
- UpdateBatchResult updRes = updateWithBatch(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- completionCb,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal);
-
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
-
- if (req.operation() == TRANSFORM)
- retVal = updRes.invokeResults();
- }
- else {
- UpdateSingleResult updRes = updateSingle(node,
- hasNear,
- req,
- res,
- locked,
- ver,
- dhtFut,
- completionCb,
- ctx.isDrEnabled(),
- taskName,
- expiry,
- sndPrevVal);
-
- retVal = updRes.returnValue();
- deleted = updRes.deleted();
- dhtFut = updRes.dhtFuture();
- }
+ if (req.operation() == TRANSFORM)
+ retVal = updRes.invokeResults();
}
- finally {
- if (set)
- tm.setTxTopologyHint(null);
+ else {
+ UpdateSingleResult updRes = updateSingle(node,
+ hasNear,
+ req,
+ res,
+ locked,
+ ver,
+ dhtFut,
+ completionCb,
+ ctx.isDrEnabled(),
+ taskName,
+ expiry,
+ sndPrevVal);
+
+ retVal = updRes.returnValue();
+ deleted = updRes.deleted();
+ dhtFut = updRes.dhtFuture();
}
if (retVal == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 854726f..a10294f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -329,6 +329,19 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
* @return {@code True} if cache was added during this exchange.
*/
public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) {
+ if (cacheStarted(cacheId))
+ return true;
+
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @return {@code True} if non-client cache was added during this exchange.
+ */
+ private boolean cacheStarted(int cacheId) {
if (!F.isEmpty(reqs)) {
for (DynamicCacheChangeRequest req : reqs) {
if (req.start() && !req.clientStartOnly()) {
@@ -338,9 +351,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
}
- GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
- return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer);
+ return false;
}
/**
@@ -419,7 +430,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
// If local node did not initiate exchange or local node is the only cache node in grid.
Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx, exchId.topologyVersion());
- return !exchId.nodeId().equals(cctx.localNodeId()) ||
+ return cacheStarted(cacheCtx.cacheId()) ||
+ !exchId.nodeId().equals(cctx.localNodeId()) ||
(affNodes.size() == 1 && affNodes.contains(cctx.localNode()));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fa7020b..fe6180a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -52,10 +52,16 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
// Obtain the topology version to use.
long threadId = Thread.currentThread().getId();
- AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+ AffinityTopologyVersion topVer = null;
+
+ if (tx.system())
+ topVer = tx.topologyVersionSnapshot();
+
+ if (topVer == null)
+ topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
// If there is another system transaction in progress, use it's topology version to prevent deadlock.
- if (topVer == null && tx != null && tx.system())
+ if (topVer == null && tx.system())
topVer = cctx.tm().lockedTopologyVersion(threadId, tx);
if (topVer != null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java
index 3d76268..5b08f62 100644
--- a/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/IgniteCacheAffinitySelfTest.java
@@ -92,17 +92,10 @@ public class IgniteCacheAffinitySelfTest extends IgniteCacheAbstractTest {
return new NearCacheConfiguration();
}
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed.");
- }
-
/**
* @throws Exception if failed.
*/
public void testAffinity() throws Exception {
- fail("Enable when https://issues.apache.org/jira/browse/IGNITE-647 is fixed.");
-
checkAffinity();
stopGrid(gridCount() - 1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
index ef67495..4299935 100644
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/fair/FairAffinityDynamicCacheSelfTest.java
@@ -37,22 +37,11 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
/** */
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
- /** */
- public FairAffinityDynamicCacheSelfTest(){
- super(false);
- }
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
- TcpDiscoverySpi disco = new TcpDiscoverySpi();
-
- disco.setIpFinder(IP_FINDER);
-
- cfg.getTransactionConfiguration().setTxSerializableEnabled(true);
-
- cfg.setDiscoverySpi(disco);
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
return cfg;
}
@@ -71,8 +60,6 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testStartStopCache() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-647");
-
CacheConfiguration<Integer, Integer> cacheCfg = new CacheConfiguration<>();
cacheCfg.setCacheMode(CacheMode.PARTITIONED);
@@ -94,6 +81,6 @@ public class FairAffinityDynamicCacheSelfTest extends GridCommonAbstractTest {
}
});
- destFut.get(2000L);
+ destFut.get(5000L);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
index d88f12f..2577d93 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CrossCacheTxRandomOperationsTest.java
@@ -126,8 +126,6 @@ public class CrossCacheTxRandomOperationsTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testCrossCacheTxOperationsFairAffinity() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-647");
-
txOperations(PARTITIONED, FULL_SYNC, true, true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
index c95c586..9eaa848 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteBinaryMetadataUpdateChangingTopologySelfTest.java
@@ -25,10 +25,12 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -48,7 +50,6 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
/**
* Tests specific scenario when binary metadata should be updated from a system thread
@@ -105,7 +106,7 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync();
- cache.putAll(F.asMap(key1, "val1", key2, new TestValue()));
+ cache.putAll(F.asMap(key1, "val1", key2, new TestValue1()));
try {
Thread.sleep(500);
@@ -118,8 +119,47 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
}
});
+ Thread.sleep(1000);
+
+ spi.stopBlock();
+
+ cache.future().get();
+
+ fut.get();
+ }
+ finally {
+ stopGrid(4);
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNoDeadlockInvoke() throws Exception {
+ int key1 = primaryKey(ignite(1).cache("cache"));
+ int key2 = primaryKey(ignite(2).cache("cache"));
+
+ TestCommunicationSpi spi = (TestCommunicationSpi)ignite(1).configuration().getCommunicationSpi();
+
+ spi.blockMessages(GridNearTxPrepareResponse.class, ignite(0).cluster().localNode().id());
+
+ IgniteCache<Object, Object> cache = ignite(0).cache("cache").withAsync();
+
+ cache.invokeAll(F.asSet(key1, key2), new TestEntryProcessor());
+
+ try {
Thread.sleep(500);
+ IgniteInternalFuture<Void> fut = GridTestUtils.runAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ startGrid(4);
+
+ return null;
+ }
+ });
+
+ Thread.sleep(1000);
+
spi.stopBlock();
cache.future().get();
@@ -145,12 +185,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
/** */
private Map<Class<?>, Set<UUID>> blockCls = new HashMap<>();
- /** */
- private Class<?> recordCls;
-
- /** */
- private List<Object> recordedMsgs = new ArrayList<>();
-
/** {@inheritDoc} */
@Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC)
throws IgniteSpiException {
@@ -158,9 +192,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
Object msg0 = ((GridIoMessage)msg).message();
synchronized (this) {
- if (recordCls != null && msg0.getClass().equals(recordCls))
- recordedMsgs.add(msg0);
-
Set<UUID> blockNodes = blockCls.get(msg0.getClass());
if (F.contains(blockNodes, node.id())) {
@@ -178,28 +209,6 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
}
/**
- * @param recordCls Message class to record.
- */
- void record(@Nullable Class<?> recordCls) {
- synchronized (this) {
- this.recordCls = recordCls;
- }
- }
-
- /**
- * @return Recorded messages.
- */
- List<Object> recordedMessages() {
- synchronized (this) {
- List<Object> msgs = recordedMsgs;
-
- recordedMsgs = new ArrayList<>();
-
- return msgs;
- }
- }
-
- /**
* @param cls Message class.
* @param nodeId Node ID.
*/
@@ -241,7 +250,27 @@ public class IgniteBinaryMetadataUpdateChangingTopologySelfTest extends GridComm
/**
*
*/
- private static class TestValue {
+ static class TestEntryProcessor implements CacheEntryProcessor<Object, Object, Object> {
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry<Object, Object> e, Object... arguments) {
+ e.setValue(new TestValue2());
+
+ return null;
+ }
+ }
+
+ /**
+ *
+ */
+ private static class TestValue1 {
+ /** Field1. */
+ private String field1;
+ }
+
+ /**
+ *
+ */
+ private static class TestValue2 {
/** Field1. */
private String field1;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 38e3d98..9e78fb9 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -179,8 +179,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS
@Override public boolean apply() {
return recoveryDesc.messagesFutures().isEmpty();
}
- }, spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() + 7000 :
- 10_000);
+ }, 10_000);
assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0,
recoveryDesc.messagesFutures().size());
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
index b39be56..90fdb0a 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/ipfinder/multicast/TcpDiscoveryMulticastIpFinderSelfTest.java
@@ -101,11 +101,11 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
assertEquals(1, addrs1.size());
assertEquals(2, addrs2.size());
- assertEquals(3, addrs3.size());
+ assertTrue("Unexpected number of addresses: " + addrs3, addrs3.size() == 2 || addrs3.size() == 3);
- assertEquals(3, ipFinder1.getRegisteredAddresses().size());
- assertEquals(3, ipFinder2.getRegisteredAddresses().size());
- assertEquals(3, ipFinder3.getRegisteredAddresses().size());
+ checkRequestAddresses(ipFinder1, 3);
+ checkRequestAddresses(ipFinder2, 3);
+ checkRequestAddresses(ipFinder3, 3);
}
finally {
if (ipFinder1 != null)
@@ -118,4 +118,17 @@ public class TcpDiscoveryMulticastIpFinderSelfTest
ipFinder3.close();
}
}
+
+ /**
+ * @param ipFinder IP finder.
+ * @param exp Expected number of addresses.
+ */
+ private void checkRequestAddresses(TcpDiscoveryMulticastIpFinder ipFinder, int exp) {
+ for (int i = 0; i < 10; i++) {
+ if (ipFinder.getRegisteredAddresses().size() == exp)
+ return;
+ }
+
+ assertEquals(exp, ipFinder.getRegisteredAddresses().size());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
index c9e9467..de87e99 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheRestartTestSuite2.java
@@ -21,6 +21,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.GridCachePutAllFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCacheAtomicPutAllFailoverSelfTest;
import org.apache.ignite.internal.processors.cache.IgniteCachePutAllRestartTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteBinaryMetadataUpdateNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicNodeRestartTest;
import org.apache.ignite.internal.processors.cache.distributed.replicated.IgniteCacheAtomicReplicatedNodeRestartSelfTest;
@@ -42,6 +43,8 @@ public class IgniteCacheRestartTestSuite2 extends TestSuite {
suite.addTestSuite(IgniteCachePutAllRestartTest.class);
suite.addTestSuite(GridCachePutAllFailoverSelfTest.class);
+ suite.addTestSuite(IgniteBinaryMetadataUpdateNodeRestartTest.class);
+
return suite;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/383f317d/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 891866d..92a530d 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -87,7 +87,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
private MqttStreamer<Integer, String> streamer;
/** The UUID of the currently active remote listener. */
- private UUID remoteListener;
+ private UUID remoteLsnr;
/** The Ignite data streamer. */
private IgniteDataStreamer<Integer, String> dataStreamer;
@@ -105,7 +105,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- @Before @SuppressWarnings("unchecked")
+ @Before
+ @SuppressWarnings("unchecked")
public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
@@ -121,13 +122,13 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
broker.setPersistenceAdapter(null);
broker.setPersistenceFactory(null);
- PolicyMap policyMap = new PolicyMap();
- PolicyEntry policy = new PolicyEntry();
+ PolicyMap plcMap = new PolicyMap();
+ PolicyEntry plc = new PolicyEntry();
- policy.setQueuePrefetch(1);
+ plc.setQueuePrefetch(1);
- broker.setDestinationPolicy(policyMap);
- broker.getDestinationPolicy().setDefaultEntry(policy);
+ broker.setDestinationPolicy(plcMap);
+ broker.getDestinationPolicy().setDefaultEntry(plc);
broker.setSchedulerSupport(false);
// add the MQTT transport connector to the broker
@@ -194,7 +195,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testConnectionStatusWithBrokerDisconnection() throws Exception {
- // configure streamer
+ fail("https://issues.apache.org/jira/browse/IGNITE-2255");
+
+ // Configure streamer.
streamer.setSingleTupleExtractor(singleTupleExtractor());
streamer.setTopic(SINGLE_TOPIC_NAME);
streamer.setBlockUntilConnected(true);
@@ -202,8 +205,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
streamer.start();
- // action time: repeat 5 times; make sure the connection state is kept correctly every time
+ // Action time: repeat 5 times; make sure the connection state is kept correctly every time.
for (int i = 0; i < 5; i++) {
+ log.info("Iteration: " + i);
+
assertTrue(streamer.isConnected());
broker.stop();
@@ -355,7 +360,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
/**
- * @throws Exception
+ * @throws Exception If failed.
*/
public void testSingleTopic_NoQoS_Reconnect() throws Exception {
// configure streamer
@@ -557,7 +562,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// Listen to cache PUT events and expect as many as messages as test data items
final CountDownLatch latch = new CountDownLatch(expect);
- IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+ IgniteBiPredicate<UUID, CacheEvent> cb = new IgniteBiPredicate<UUID, CacheEvent>() {
@Override public boolean apply(UUID uuid, CacheEvent evt) {
latch.countDown();
@@ -565,8 +570,8 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
};
- remoteListener = ignite.events(ignite.cluster().forCacheNodes(null))
- .remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+ remoteLsnr = ignite.events(ignite.cluster().forCacheNodes(null))
+ .remoteListen(cb, null, EVT_CACHE_OBJECT_PUT);
return latch;
}
@@ -586,7 +591,7 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertEquals(cnt, cache.size(CachePeekMode.ALL));
// remove the event listener
- grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+ grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteLsnr);
}
/**