You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/08/17 15:20:19 UTC
[ignite-3] branch main updated: IGNITE-15236 Thin 3.0: Start
client-handler module (#283)
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new afe2cce IGNITE-15236 Thin 3.0: Start client-handler module (#283)
afe2cce is described below
commit afe2cce304938e02ac32c50c6fa1e5fee5b63103
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Tue Aug 17 18:20:12 2021 +0300
IGNITE-15236 Thin 3.0: Start client-handler module (#283)
* Start client-handler module when the node starts.
* Add integration test.
---
.../ClientConnectorConfigurationSchema.java | 2 +-
.../org/apache/ignite/cli/ConfigCommandTest.java | 10 +-
.../ignite/client/handler/ClientHandlerModule.java | 90 ++++++-----
.../handler/ClientInboundMessageHandler.java | 84 +++++-----
.../handler/ClientHandlerIntegrationTest.java | 48 +++---
.../apache/ignite/client/AbstractClientTest.java | 28 ++--
modules/runner/pom.xml | 11 ++
.../runner/app/ITThinClientConnectionTest.java | 173 +++++++++++++++++++++
.../apache/ignite/internal/app/IgnitionImpl.java | 14 +-
parent/pom.xml | 6 +
10 files changed, 343 insertions(+), 123 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
index cdfa263..db0318d 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/clientconnector/ClientConnectorConfigurationSchema.java
@@ -38,5 +38,5 @@ public class ClientConnectorConfigurationSchema {
/** TCP port range. */
@Min(0)
@Value(hasDefault = true)
- public final int portRange = 0;
+ public final int portRange = 100;
}
diff --git a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ConfigCommandTest.java b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ConfigCommandTest.java
index 5d8d470..84494c2 100644
--- a/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ConfigCommandTest.java
+++ b/modules/cli/src/integrationTest/java/org/apache/ignite/cli/ConfigCommandTest.java
@@ -50,6 +50,9 @@ public class ConfigCommandTest extends AbstractCliTest {
/** Port for REST communication */
private int restPort;
+ /** Port for thin client communication */
+ private int clientPort;
+
/** Network port. */
private int networkPort;
@@ -59,10 +62,12 @@ public class ConfigCommandTest extends AbstractCliTest {
// TODO: IGNITE-15131 Must be replaced by receiving the actual port configs from the started node.
// This approach still can produce the port, which will be unavailable at the moment of node start.
restPort = getAvailablePort();
+ clientPort = getAvailablePort();
networkPort = getAvailablePort();
String configStr = "network.port=" + networkPort + "\n" +
- "rest.port=" + restPort + "\n" + "rest.portRange=0";
+ "rest.port=" + restPort + "\n" + "rest.portRange=0" + "\n" +
+ "clientConnector.port=" + clientPort + "\n" + "clientConnector.portRange=0";
IgnitionManager.start("node1", configStr, workDir);
@@ -110,7 +115,8 @@ public class ConfigCommandTest extends AbstractCliTest {
assertEquals(0, exitCode);
assertEquals(
- "\"{\"network\":{\"netClusterNodes\":[],\"port\":" + networkPort + "}," +
+ "\"{\"clientConnector\":{\"port\":" + clientPort + ",\"portRange\":0}," +
+ "\"network\":{\"netClusterNodes\":[],\"port\":" + networkPort + "}," +
"\"node\":{\"metastorageNodes\":[\"localhost1\"]}," +
"\"rest\":{\"port\":" + restPort + ",\"portRange\":0}}\"" + nl,
unescapeQuotes(out.toString())
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 59ff857..6cf8e57 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.handler;
import java.net.BindException;
+import java.net.SocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -27,55 +28,74 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.proto.ClientMessageDecoder;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.lang.IgniteException;
-import org.slf4j.Logger;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.jetbrains.annotations.Nullable;
/**
* Client handler module maintains TCP endpoint for thin client connections.
- *
*/
-public class ClientHandlerModule {
+public class ClientHandlerModule implements IgniteComponent {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ClientHandlerModule.class);
+
/** Configuration registry. */
- private ConfigurationRegistry registry;
+ private final ConfigurationRegistry registry;
- /** Ignite API entry poiny. */
- private final Ignite ignite;
+ /** Ignite tables API. */
+ private final IgniteTables igniteTables;
- /** Logger. */
- private final Logger log;
+ /** Netty channel. */
+ private Channel channel;
/**
* Constructor.
*
- * @param ignite Ignite.
- * @param log Logger.
+ * @param igniteTables Ignite.
+ * @param registry Configuration registry.
*/
- public ClientHandlerModule(Ignite ignite, Logger log) {
- this.ignite = ignite;
- this.log = log;
+ public ClientHandlerModule(IgniteTables igniteTables, ConfigurationRegistry registry) {
+ assert igniteTables != null;
+ assert registry != null;
+
+ this.igniteTables = igniteTables;
+ this.registry = registry;
}
- /**
- * Prepares to start the module.
- *
- * @param sysCfg Configuration registry.
- */
- public void prepareStart(ConfigurationRegistry sysCfg) {
- registry = sysCfg;
+ /** {@inheritDoc} */
+ @Override public void start() {
+ if (channel != null)
+ throw new IgniteException("ClientHandlerModule is already started.");
+
+ try {
+ channel = startEndpoint().channel();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws Exception {
+ if (channel != null) {
+ channel.close().await();
+
+ channel = null;
+ }
}
/**
- * Starts the module.
+ * Returns the local address where this handler is bound to.
*
- * @return channel future.
- * @throws InterruptedException If thread has been interrupted during the start.
+ * @return the local address of this module, or {@code null} if this module is not started.
*/
- public ChannelFuture start() throws InterruptedException {
- return startEndpoint();
+ @Nullable public SocketAddress localAddress() {
+ return channel == null ? null : channel.localAddress();
}
/**
@@ -86,18 +106,16 @@ public class ClientHandlerModule {
* @throws IgniteException When startup has failed.
*/
private ChannelFuture startEndpoint() throws InterruptedException {
- var configuration = registry.getConfiguration(ClientConnectorConfiguration.KEY);
+ var configuration = registry.getConfiguration(ClientConnectorConfiguration.KEY).value();
- // TODO: Handle defaults IGNITE-15164.
- int desiredPort = configuration.port().value() == null ? 10800 : configuration.port().value();
- int portRange = configuration.portRange().value() == null ? 100 : configuration.portRange().value();
+ int desiredPort = configuration.port();
+ int portRange = configuration.portRange();
int port = 0;
-
Channel ch = null;
+ // TODO: Reuse Netty infrastructure from network module IGNITE-15307.
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
-
ServerBootstrap b = new ServerBootstrap();
b.group(eventLoopGroup)
@@ -107,13 +125,13 @@ public class ClientHandlerModule {
protected void initChannel(Channel ch) {
ch.pipeline().addLast(
new ClientMessageDecoder(),
- new ClientInboundMessageHandler(ignite, log));
+ new ClientInboundMessageHandler(igniteTables));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
- for (int portCandidate = desiredPort; portCandidate < desiredPort + portRange; portCandidate++) {
+ for (int portCandidate = desiredPort; portCandidate <= desiredPort + portRange; portCandidate++) {
ChannelFuture bindRes = b.bind(portCandidate).await();
if (bindRes.isSuccess()) {
@@ -133,14 +151,14 @@ public class ClientHandlerModule {
String msg = "Cannot start thin client connector endpoint. " +
"All ports in range [" + desiredPort + ", " + (desiredPort + portRange) + "] are in use.";
- log.error(msg);
+ LOG.error(msg);
eventLoopGroup.shutdownGracefully();
throw new IgniteException(msg);
}
- log.info("Thin client connector started successfully on port " + port);
+ LOG.info("Thin client protocol started successfully on port " + port);
return ch.closeFuture();
}
diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f6ccd1a..cdbd01c 100644
--- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -17,7 +17,6 @@
package org.apache.ignite.client.handler;
-import java.io.IOException;
import java.util.BitSet;
import java.util.concurrent.CompletableFuture;
import io.netty.buffer.ByteBuf;
@@ -25,7 +24,6 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableDropRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
@@ -61,18 +59,19 @@ import org.apache.ignite.client.proto.ClientOp;
import org.apache.ignite.client.proto.ProtocolVersion;
import org.apache.ignite.client.proto.ServerMessageType;
import org.apache.ignite.lang.IgniteException;
-import org.slf4j.Logger;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.table.manager.IgniteTables;
/**
* Handles messages from thin clients.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
- /** Logger. */
- private final Logger log;
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ClientInboundMessageHandler.class);
- /** API entry point. */
- private final Ignite ignite;
+ /** Ignite tables API. */
+ private final IgniteTables igniteTables;
/** Context. */
private ClientContext clientContext;
@@ -80,15 +79,12 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/**
* Constructor.
*
- * @param ignite Ignite API entry point.
- * @param log Logger.
+ * @param igniteTables Ignite tables API entry point.
*/
- public ClientInboundMessageHandler(Ignite ignite, Logger log) {
- assert ignite != null;
- assert log != null;
+ public ClientInboundMessageHandler(IgniteTables igniteTables) {
+ assert igniteTables != null;
- this.ignite = ignite;
- this.log = log;
+ this.igniteTables = igniteTables;
}
/** {@inheritDoc} */
@@ -120,7 +116,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
clientContext = new ClientContext(clientVer, clientCode, features);
- log.debug("Handshake: " + clientContext);
+ LOG.debug("Handshake: " + clientContext);
var extensionsLen = unpacker.unpackMapHeader();
unpacker.skipValue(extensionsLen);
@@ -237,88 +233,88 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
ClientMessageUnpacker in,
ClientMessagePacker out,
int opCode
- ) throws IOException {
+ ) {
switch (opCode) {
case ClientOp.TABLE_DROP:
- return ClientTableDropRequest.process(in, ignite.tables());
+ return ClientTableDropRequest.process(in, igniteTables);
case ClientOp.TABLES_GET:
- return ClientTablesGetRequest.process(out, ignite.tables());
+ return ClientTablesGetRequest.process(out, igniteTables);
case ClientOp.SCHEMAS_GET:
- return ClientSchemasGetRequest.process(in, out, ignite.tables());
+ return ClientSchemasGetRequest.process(in, out, igniteTables);
case ClientOp.TABLE_GET:
- return ClientTableGetRequest.process(in, out, ignite.tables());
+ return ClientTableGetRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_UPSERT:
- return ClientTupleUpsertRequest.process(in, ignite.tables());
+ return ClientTupleUpsertRequest.process(in, igniteTables);
case ClientOp.TUPLE_UPSERT_SCHEMALESS:
- return ClientTupleUpsertSchemalessRequest.process(in, ignite.tables());
+ return ClientTupleUpsertSchemalessRequest.process(in, igniteTables);
case ClientOp.TUPLE_GET:
- return ClientTupleGetRequest.process(in, out, ignite.tables());
+ return ClientTupleGetRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_UPSERT_ALL:
- return ClientTupleUpsertAllRequest.process(in, ignite.tables());
+ return ClientTupleUpsertAllRequest.process(in, igniteTables);
case ClientOp.TUPLE_UPSERT_ALL_SCHEMALESS:
- return ClientTupleUpsertAllSchemalessRequest.process(in, ignite.tables());
+ return ClientTupleUpsertAllSchemalessRequest.process(in, igniteTables);
case ClientOp.TUPLE_GET_ALL:
- return ClientTupleGetAllRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_UPSERT:
- return ClientTupleGetAndUpsertRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAndUpsertRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_UPSERT_SCHEMALESS:
- return ClientTupleGetAndUpsertSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAndUpsertSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT:
- return ClientTupleInsertRequest.process(in, out, ignite.tables());
+ return ClientTupleInsertRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_SCHEMALESS:
- return ClientTupleInsertSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleInsertSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_ALL:
- return ClientTupleInsertAllRequest.process(in, out, ignite.tables());
+ return ClientTupleInsertAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_INSERT_ALL_SCHEMALESS:
- return ClientTupleInsertAllSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleInsertAllSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE:
- return ClientTupleReplaceRequest.process(in, out, ignite.tables());
+ return ClientTupleReplaceRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_SCHEMALESS:
- return ClientTupleReplaceSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleReplaceSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_EXACT:
- return ClientTupleReplaceExactRequest.process(in, out, ignite.tables());
+ return ClientTupleReplaceExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_REPLACE_EXACT_SCHEMALESS:
- return ClientTupleReplaceExactSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleReplaceExactSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_REPLACE:
- return ClientTupleGetAndReplaceRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAndReplaceRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_REPLACE_SCHEMALESS:
- return ClientTupleGetAndReplaceSchemalessRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAndReplaceSchemalessRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE:
- return ClientTupleDeleteRequest.process(in, out, ignite.tables());
+ return ClientTupleDeleteRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_ALL:
- return ClientTupleDeleteAllRequest.process(in, out, ignite.tables());
+ return ClientTupleDeleteAllRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_EXACT:
- return ClientTupleDeleteExactRequest.process(in, out, ignite.tables());
+ return ClientTupleDeleteExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_DELETE_ALL_EXACT:
- return ClientTupleDeleteAllExactRequest.process(in, out, ignite.tables());
+ return ClientTupleDeleteAllExactRequest.process(in, out, igniteTables);
case ClientOp.TUPLE_GET_AND_DELETE:
- return ClientTupleGetAndDeleteRequest.process(in, out, ignite.tables());
+ return ClientTupleGetAndDeleteRequest.process(in, out, igniteTables);
default:
throw new IgniteException("Unexpected operation code: " + opCode);
@@ -332,7 +328,7 @@ public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter {
/** {@inheritDoc} */
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- log.error(cause.getMessage(), cause);
+ LOG.error(cause.getMessage(), cause);
ctx.close();
}
diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
index 5294065..574cbbb 100644
--- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
+++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/ClientHandlerIntegrationTest.java
@@ -23,16 +23,15 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.List;
import java.util.Map;
-import io.netty.channel.ChannelFuture;
-import org.apache.ignite.app.Ignite;
+import java.util.Objects;
import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
-import org.apache.ignite.internal.configuration.ConfigurationRegistry;
+import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.storage.TestConfigurationStorage;
+import org.apache.ignite.table.manager.IgniteTables;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.msgpack.core.MessagePack;
-import org.slf4j.helpers.NOPLogger;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -47,24 +46,22 @@ public class ClientHandlerIntegrationTest {
/** Magic bytes. */
private static final byte[] MAGIC = new byte[]{0x49, 0x47, 0x4E, 0x49};
- private ChannelFuture serverFuture;
+ private ClientHandlerModule serverModule;
- private ConfigurationRegistry configurationRegistry;
+ private ConfigurationManager configurationManager;
private int serverPort;
@BeforeEach
- public void setUp() throws Exception {
- serverFuture = startServer();
- serverPort = ((InetSocketAddress)serverFuture.channel().localAddress()).getPort();
+ public void setUp() {
+ serverModule = startServer();
+ serverPort = ((InetSocketAddress) Objects.requireNonNull(serverModule.localAddress())).getPort();
}
@AfterEach
public void tearDown() throws Exception {
- serverFuture.cancel(true);
- serverFuture.await();
- serverFuture.channel().closeFuture().await();
- configurationRegistry.stop();
+ serverModule.stop();
+ configurationManager.stop();
}
@Test
@@ -178,20 +175,25 @@ public class ClientHandlerIntegrationTest {
}
}
- private ChannelFuture startServer() throws InterruptedException {
- configurationRegistry = new ConfigurationRegistry(
- List.of(ClientConnectorConfiguration.KEY),
- Map.of(),
- new TestConfigurationStorage(LOCAL)
- );
+ private ClientHandlerModule startServer() {
+ configurationManager = new ConfigurationManager(
+ List.of(ClientConnectorConfiguration.KEY),
+ Map.of(),
+ new TestConfigurationStorage(LOCAL));
- configurationRegistry.start();
+ configurationManager.start();
- var module = new ClientHandlerModule(mock(Ignite.class), NOPLogger.NOP_LOGGER);
+ var registry = configurationManager.configurationRegistry();
- module.prepareStart(configurationRegistry);
+ registry.getConfiguration(ClientConnectorConfiguration.KEY).change(
+ local -> local.changePort(10800).changePortRange(10)
+ ).join();
- return module.start();
+ var module = new ClientHandlerModule(mock(IgniteTables.class), registry);
+
+ module.start();
+
+ return module;
}
private void writeAndFlushLoop(Socket socket) throws Exception {
diff --git a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
index 64d7855..35eeaaa 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/AbstractClientTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.client;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
-import io.netty.channel.ChannelFuture;
+import java.util.Objects;
import io.netty.util.ResourceLeakDetector;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.client.fakes.FakeIgnite;
@@ -32,7 +32,6 @@ import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.slf4j.helpers.NOPLogger;
import static org.apache.ignite.configuration.annotation.ConfigurationType.LOCAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,7 +45,7 @@ public abstract class AbstractClientTest {
protected static ConfigurationRegistry configurationRegistry;
- protected static ChannelFuture serverFuture;
+ protected static ClientHandlerModule serverModule;
protected static Ignite server;
@@ -55,11 +54,11 @@ public abstract class AbstractClientTest {
protected static int serverPort;
@BeforeAll
- public static void beforeAll() throws Exception {
+ public static void beforeAll() {
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
- serverFuture = startServer(null);
- serverPort = ((InetSocketAddress)serverFuture.channel().localAddress()).getPort();
+ serverModule = startServer();
+ serverPort = ((InetSocketAddress) Objects.requireNonNull(serverModule.localAddress())).getPort();
client = startClient();
}
@@ -67,9 +66,7 @@ public abstract class AbstractClientTest {
@AfterAll
public static void afterAll() throws Exception {
client.close();
- serverFuture.cancel(true);
- serverFuture.await();
- serverFuture.channel().closeFuture().await();
+ serverModule.stop();
configurationRegistry.stop();
}
@@ -88,7 +85,7 @@ public abstract class AbstractClientTest {
return builder.build();
}
- public static ChannelFuture startServer(String host) throws InterruptedException {
+ public static ClientHandlerModule startServer() {
configurationRegistry = new ConfigurationRegistry(
List.of(ClientConnectorConfiguration.KEY),
Map.of(),
@@ -97,13 +94,16 @@ public abstract class AbstractClientTest {
configurationRegistry.start();
- server = new FakeIgnite();
+ configurationRegistry.getConfiguration(ClientConnectorConfiguration.KEY).change(
+ local -> local.changePort(10800).changePortRange(10)
+ ).join();
- var module = new ClientHandlerModule(server, NOPLogger.NOP_LOGGER);
+ server = new FakeIgnite();
- module.prepareStart(configurationRegistry);
+ var module = new ClientHandlerModule(server.tables(), configurationRegistry);
+ module.start();
- return module.start();
+ return module;
}
public static void assertTupleEquals(Tuple x, Tuple y) {
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index 89175bd..8085d21 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -89,6 +89,11 @@
<artifactId>ignite-calcite</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client-handler</artifactId>
+ </dependency>
+
<!-- 3rd party dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
@@ -127,6 +132,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java
new file mode 100644
index 0000000..e2ea59c
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITThinClientConnectionTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import com.google.common.collect.Lists;
+import org.apache.ignite.app.Ignite;
+import org.apache.ignite.app.IgnitionManager;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.handler.ClientHandlerModule;
+import org.apache.ignite.internal.app.IgnitionImpl;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.ColumnType;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.SchemaTable;
+import org.apache.ignite.table.Table;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests thin client connecting to a real server node.
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ITThinClientConnectionTest extends IgniteAbstractTest {
+ /** */
+ private static final String SCHEMA_NAME = "PUB";
+
+ /** */
+ private static final String TABLE_NAME = "tbl1";
+
+ /** Nodes bootstrap configuration. */
+ private static final Map<String, String> nodesBootstrapCfg = new LinkedHashMap<>() {{
+ put("node0", "{\n" +
+ " \"node\": {\n" +
+ " \"metastorageNodes\":[ \"node0\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3344,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\" ]\n" +
+ " }\n" +
+ "}");
+
+ put("node1", "{\n" +
+ " \"node\": {\n" +
+ " \"metastorageNodes\":[ \"node0\" ]\n" +
+ " },\n" +
+ " \"network\": {\n" +
+ " \"port\":3345,\n" +
+ " \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\" ]\n" +
+ " }\n" +
+ "}");
+ }};
+
+ /** */
+ private final List<Ignite> startedNodes = new ArrayList<>();
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(Lists.reverse(startedNodes));
+ }
+
+ /**
+ * Check that thin client can connect to any server node and work with table API.
+ */
+ @Test
+ void testThinClientConnectsToServerNodesAndExecutesBasicTableOperations() throws Exception {
+ nodesBootstrapCfg.forEach((nodeName, configStr) ->
+ startedNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
+ );
+
+ var keyCol = "key";
+ var valCol = "val";
+
+ SchemaTable schTbl = SchemaBuilders.tableBuilder(SCHEMA_NAME, TABLE_NAME).columns(
+ SchemaBuilders.column(keyCol, ColumnType.INT32).asNonNull().build(),
+ SchemaBuilders.column(valCol, ColumnType.string()).asNullable().build()
+ ).withPrimaryKey(keyCol).build();
+
+ startedNodes.get(0).tables().createTable(schTbl.canonicalName(), tblCh ->
+ SchemaConfigurationConverter.convert(schTbl, tblCh)
+ .changeReplicas(1)
+ .changePartitions(10)
+ );
+
+ var addrs = new String[]{"127.0.0.1:" + getClientPort("node0"), "127.0.0.1:" + getClientPort("node1")};
+
+ for (var addr : addrs) {
+ try (var client = IgniteClient.builder().addresses(addr).build()) {
+ List<Table> tables = client.tables().tables();
+ assertEquals(1, tables.size());
+
+ Table table = tables.get(0);
+ assertEquals(String.format("%s.%s", SCHEMA_NAME, TABLE_NAME), table.tableName());
+
+ var tuple = table.tupleBuilder().set(keyCol, 1).set(valCol, "Hello").build();
+ var keyTuple = table.tupleBuilder().set(keyCol, 1).build();
+
+ table.upsert(tuple);
+ assertEquals("Hello", table.get(keyTuple).stringValue(valCol));
+
+ assertTrue(table.delete(keyTuple));
+ }
+ }
+ }
+
+ /**
+ * Gets the client listener port.
+ *
+ * @param nodeName Node name.
+ * @return Port number.
+ * @throws Exception When failed.
+ */
+ private static int getClientPort(String nodeName) throws Exception {
+ InetSocketAddress addr = (InetSocketAddress) getClientHandlerModule(nodeName).localAddress();
+ assertNotNull(addr);
+
+ return addr.getPort();
+ }
+
+ /**
+ * Gets the client handler module for the give node.
+ *
+ * @param nodeName Node name.
+ * @return Client handler module.
+ * @throws Exception When failed.
+ */
+ private static ClientHandlerModule getClientHandlerModule(String nodeName) throws Exception {
+ Field field = IgnitionImpl.class.getDeclaredField("nodesStartedComponents");
+ field.setAccessible(true);
+
+ var componentMap = (Map<String, List<IgniteComponent>>) field.get(null);
+ assertNotNull(componentMap);
+
+ var components = componentMap.get(nodeName);
+ assertNotNull(components);
+
+ return components.stream()
+ .filter(ClientHandlerModule.class::isInstance)
+ .map(ClientHandlerModule.class::cast)
+ .findFirst()
+ .orElseThrow();
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 8836dd5..d43c6d9 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -32,7 +32,9 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
+import org.apache.ignite.client.handler.ClientHandlerModule;
import org.apache.ignite.configuration.RootKey;
+import org.apache.ignite.configuration.schemas.clientconnector.ClientConnectorConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkView;
import org.apache.ignite.configuration.schemas.rest.RestConfiguration;
@@ -114,9 +116,9 @@ public class IgnitionImpl implements Ignition {
* Mapping of a node name to a started node components list.
* Given map helps to stop node by stopping all it's components in an appropriate order both
* when node is already started which means that all components are ready and
- * if node is in a middle of a startup process which means that only part of it's components are prepared.
+ * if node is in a middle of a startup process which means that only part of its components are prepared.
*/
- private static Map<String, List<IgniteComponent>> nodesStartedComponents = new ConcurrentHashMap<>();
+ private static final Map<String, List<IgniteComponent>> nodesStartedComponents = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override public Ignite start(@NotNull String nodeName, @Nullable Path cfgPath, @NotNull Path workDir) {
@@ -210,7 +212,8 @@ public class IgnitionImpl implements Ignition {
List<RootKey<?, ?>> nodeRootKeys = List.of(
NetworkConfiguration.KEY,
NodeConfiguration.KEY,
- RestConfiguration.KEY
+ RestConfiguration.KEY,
+ ClientConnectorConfiguration.KEY
);
// Bootstrap node configuration manager.
@@ -356,6 +359,11 @@ public class IgnitionImpl implements Ignition {
new RestModule(nodeCfgMgr, clusterCfgMgr)
);
+ doStartComponent(
+ nodeName,
+ startedComponents,
+ new ClientHandlerModule(distributedTblMgr, nodeCfgMgr.configurationRegistry()));
+
// Deploy all resisted watches cause all components are ready and have registered their listeners.
metaStorageMgr.deployWatches();
diff --git a/parent/pom.xml b/parent/pom.xml
index a086749..423f33d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -293,6 +293,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>