You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by GitBox <gi...@apache.org> on 2020/11/20 07:27:42 UTC

[GitHub] [ignite] ptupitsyn opened a new pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

ptupitsyn opened a new pull request #8483:
URL: https://github.com/apache/ignite/pull/8483


   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533428951



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles thin client connection state.
+ */
+public interface ClientConnectionStateHandler {
+    /**
+     * Handles connection loss

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529624952



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       > Workers (threads) are useful for performing CPU-intensive JavaScript operations. They will not help much with I/O-intensive work. Node.js’s built-in asynchronous I/O operations are more efficient than Workers can be.
   
   And, AFAIU you use ForkJoinPool for user code...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533421936



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        srv.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientConnection open(InetSocketAddress addr,
+                                           ClientMessageHandler msgHnd,
+                                           ClientConnectionStateHandler stateHnd)
+            throws ClientConnectionException {
+        try {
+            java.nio.channels.SocketChannel ch = java.nio.channels.SocketChannel.open();
+            ch.socket().connect(new InetSocketAddress(addr.getHostName(), addr.getPort()), Integer.MAX_VALUE);
+
+            Map<Integer, Object> meta = new HashMap<>();
+            GridNioFuture<?> sslHandshakeFut = null;
+
+            if (sslCtx != null) {
+                sslHandshakeFut = new GridNioFutureImpl<>(null);
+
+                meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
+            }
+
+            GridNioSession ses = srv.createSession(ch, meta, false, null).get();
+
+            if (sslHandshakeFut != null)
+                sslHandshakeFut.get();
+
+            return new GridNioClientConnection(ses, msgHnd, stateHnd);
+        } catch (Exception e) {

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533430057



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        srv.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientConnection open(InetSocketAddress addr,
+                                           ClientMessageHandler msgHnd,
+                                           ClientConnectionStateHandler stateHnd)
+            throws ClientConnectionException {
+        try {
+            java.nio.channels.SocketChannel ch = java.nio.channels.SocketChannel.open();

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533422076



##########
File path: modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
##########
@@ -71,14 +71,20 @@ public void testClientConnectBeforeDiscoveryStart() throws Exception {
         IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync(
             () -> startClient(grid()));
 
-        // Server doesn't accept connection before discovery SPI started.
-        assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        try {
+            // Server doesn't accept connection before discovery SPI started.
+            assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
 
-        barrier.await();
+            barrier.await();
+
+            futStartGrid.get();
 
-        futStartGrid.get();
+            // Server accept connection after discovery SPI started.
+            assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        } finally {

Review comment:
       fixed

##########
File path: modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
##########
@@ -71,14 +71,20 @@ public void testClientConnectBeforeDiscoveryStart() throws Exception {
         IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync(
             () -> startClient(grid()));
 
-        // Server doesn't accept connection before discovery SPI started.
-        assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        try {
+            // Server doesn't accept connection before discovery SPI started.
+            assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
 
-        barrier.await();
+            barrier.await();
+
+            futStartGrid.get();
 
-        futStartGrid.get();
+            // Server accept connection after discovery SPI started.
+            assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        } finally {
+            if (futStartClient.isDone())
+                futStartClient.get().close();
+        }
 

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533429553



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};

Review comment:
       fixed

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r530963983



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       @ivandasch I've done some basic benchmarks (see JmhThinClientCacheBenchmark) and could not come up with a config where multiple selectors work faster than one (various cluster sizes, thread counts, entry sizes). You are right, let's keep this hardcoded to `1` and not bother with making it configurable. Thank you for raising this point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529559051



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       > With Partition Awareness a single IgniteClient connects to all server nodes
   
   1 thread easily can handle thousands connections. Look at nodejs for another example. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529556694



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       This is not a bottleneck at all, it's common practice to create 1 thread per selector and for clients 1 selector is more than enough. Remember, tarantool or redis use 1 thread for all network activity and this is more than enough. This is not even a server (sometimes it's desirable to split acceptor task to another selector)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533421536



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
##########
@@ -116,18 +117,23 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
 
         ch = new ReliableChannel(chFactory, cfg, binary);
 
-        ch.channelsInit();
+        try {
+            ch.channelsInit();
 
-        ch.addChannelFailListener(() -> metadataHandler.onReconnect());
+            ch.addChannelFailListener(() -> metadataHandler.onReconnect());
 
-        transactions = new TcpClientTransactions(ch, marsh,
-            new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+            transactions = new TcpClientTransactions(ch, marsh,
+                    new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
 
-        cluster = new ClientClusterImpl(ch, marsh);
+            cluster = new ClientClusterImpl(ch, marsh);
 
-        compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
+            compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
 
-        services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+            services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+        } catch (Exception e) {

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r527494615



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       To be honest, there is no need to create more than one selector. One thread for processing IO event 
   is more than enough




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533422362



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -680,12 +571,12 @@ else if (!supportedVers.contains(srvVer) ||
 
     /** Write bytes to the output stream. */
     private void write(byte[] bytes, int len) throws ClientConnectionException {
+        ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
+
         try {
-            out.write(bytes, 0, len);
-            out.flush();
-        }
-        catch (IOException e) {
-            throw handleIOError(e);
+            sock.send(buf);
+        } catch (IgniteCheckedException e) {

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r531084116



##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Benchmark                         Mode  Cnt      Score      Error  Units
+ * JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
+ * JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s

Review comment:
       +1 
   Also, it will be greate if we measure also latency (org.openjdk.jmh.annotations.Mode#AverageTime), not only trouhgput

##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Benchmark                         Mode  Cnt      Score      Error  Units
+ * JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
+ * JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s

Review comment:
       +1 
   Also, it will be great if we measure also latency (org.openjdk.jmh.annotations.Mode#AverageTime), not only trouhgput




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529559051



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       >> With Partition Awareness a single IgniteClient connects to all server nodes
   1 thread easily can handle thousands connections. Look and nodejs for another example. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529552255



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       @ivandasch I don't think so:
   * With Partition Awareness a single `IgniteClient` connects to all server nodes
   * `IgniteClient` can be used from multiple threads
   
   Single worker thread can easily become a bottleneck.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529624952



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       > Workers (threads) are useful for performing CPU-intensive JavaScript operations. They will not help much with I/O-intensive work. Node.js’s built-in asynchronous I/O operations are more efficient than Workers can be.
   
   Citation above is from your link... And, AFAIU you use ForkJoinPool for user code...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529650995



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       Yes, GridNioServer workers are used solely for encoding and decoding requests in form of byte buffers and arrays - essentially copying some bytes around, which is trivial, so maybe one thread is really enough. We'll see. I'm happy to use a single worker and avoid one more config parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn merged pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn merged pull request #8483:
URL: https://github.com/apache/ignite/pull/8483


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533187395



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
##########
@@ -116,18 +117,23 @@ private TcpIgniteClient(ClientConfiguration cfg) throws ClientException {
 
         ch = new ReliableChannel(chFactory, cfg, binary);
 
-        ch.channelsInit();
+        try {
+            ch.channelsInit();
 
-        ch.addChannelFailListener(() -> metadataHandler.onReconnect());
+            ch.addChannelFailListener(() -> metadataHandler.onReconnect());
 
-        transactions = new TcpClientTransactions(ch, marsh,
-            new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+            transactions = new TcpClientTransactions(ch, marsh,
+                    new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
 
-        cluster = new ClientClusterImpl(ch, marsh);
+            cluster = new ClientClusterImpl(ch, marsh);
 
-        compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
+            compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
 
-        services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+            services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+        } catch (Exception e) {

Review comment:
       NL between `}` and `catch` according to code style

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        srv.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientConnection open(InetSocketAddress addr,
+                                           ClientMessageHandler msgHnd,
+                                           ClientConnectionStateHandler stateHnd)
+            throws ClientConnectionException {
+        try {
+            java.nio.channels.SocketChannel ch = java.nio.channels.SocketChannel.open();
+            ch.socket().connect(new InetSocketAddress(addr.getHostName(), addr.getPort()), Integer.MAX_VALUE);
+
+            Map<Integer, Object> meta = new HashMap<>();
+            GridNioFuture<?> sslHandshakeFut = null;
+
+            if (sslCtx != null) {
+                sslHandshakeFut = new GridNioFutureImpl<>(null);
+
+                meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
+            }
+
+            GridNioSession ses = srv.createSession(ch, meta, false, null).get();
+
+            if (sslHandshakeFut != null)
+                sslHandshakeFut.get();
+
+            return new GridNioClientConnection(ses, msgHnd, stateHnd);
+        } catch (Exception e) {

Review comment:
       NL between `}` and `catch` according to code style

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};

Review comment:
       Space before ` {`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};

Review comment:
       Space before ` {`

##########
File path: modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
##########
@@ -71,14 +71,20 @@ public void testClientConnectBeforeDiscoveryStart() throws Exception {
         IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync(
             () -> startClient(grid()));
 
-        // Server doesn't accept connection before discovery SPI started.
-        assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        try {
+            // Server doesn't accept connection before discovery SPI started.
+            assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
 
-        barrier.await();
+            barrier.await();
+
+            futStartGrid.get();
 
-        futStartGrid.get();
+            // Server accept connection after discovery SPI started.
+            assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        } finally {
+            if (futStartClient.isDone())
+                futStartClient.get().close();
+        }
 

Review comment:
       Redundant NL

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decodes thin client messages from partial buffers.
+  */

Review comment:
       Wrong indent

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -543,31 +450,20 @@ else if (addr.getPort() < 1024 || addr.getPort() > 49151)
             throw new IllegalArgumentException(error);
     }
 
-    /** Create socket. */
-    private static Socket createSocket(ClientChannelConfiguration cfg) throws IOException {
-        Socket sock = cfg.getSslMode() == SslMode.REQUIRED ?
-            new ClientSslSocketFactory(cfg).create() :
-            new Socket(cfg.getAddress().getHostName(), cfg.getAddress().getPort());
-
-        sock.setTcpNoDelay(cfg.isTcpNoDelay());
-
-        if (cfg.getTimeout() > 0)
-            sock.setSoTimeout(cfg.getTimeout());
-
-        if (cfg.getSendBufferSize() > 0)
-            sock.setSendBufferSize(cfg.getSendBufferSize());
-
-        if (cfg.getReceiveBufferSize() > 0)
-            sock.setReceiveBufferSize(cfg.getReceiveBufferSize());
-
-        return sock;
-    }
-
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
+        ClientRequestFuture fut = new ClientRequestFuture();
+        pendingReqs.put(-1L, fut);
+
         handshakeReq(ver, user, pwd, userAttrs);
-        handshakeRes(ver, user, pwd, userAttrs);
+
+        try {
+            ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
+            handshakeRes(res, ver, user, pwd, userAttrs);
+        } catch (IgniteCheckedException e) {

Review comment:
       NL between `}` and `catch` according to code style

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {

Review comment:
       NL between `}` and `catch` according to code style

##########
File path: modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
##########
@@ -71,14 +71,20 @@ public void testClientConnectBeforeDiscoveryStart() throws Exception {
         IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync(
             () -> startClient(grid()));
 
-        // Server doesn't accept connection before discovery SPI started.
-        assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        try {
+            // Server doesn't accept connection before discovery SPI started.
+            assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
 
-        barrier.await();
+            barrier.await();
+
+            futStartGrid.get();
 
-        futStartGrid.get();
+            // Server accept connection after discovery SPI started.
+            assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+        } finally {

Review comment:
       NL after `}`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       I don't quite understand. In `PayloadOutputChannel` constructor we have created a new `BinaryHeapOutputStream`, which doesn't leaks to anywhere. Why do we need another array copy here?

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -680,12 +571,12 @@ else if (!supportedVers.contains(srvVer) ||
 
     /** Write bytes to the output stream. */
     private void write(byte[] bytes, int len) throws ClientConnectionException {
+        ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
+
         try {
-            out.write(bytes, 0, len);
-            out.flush();
-        }
-        catch (IOException e) {
-            throw handleIOError(e);
+            sock.send(buf);
+        } catch (IgniteCheckedException e) {

Review comment:
       NL between `}` and `catch` according to code style

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client message parser.
+ */
+class GridNioClientParser implements GridNioParser {
+    /** */
+    private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) {
+        ClientMessageDecoder decoder = ses.meta(SES_META_DECODER);
+
+        if (decoder == null) {
+            decoder = new ClientMessageDecoder();
+
+            ses.addMeta(SES_META_DECODER, decoder);
+        }
+
+        byte[] bytes = decoder.apply(buf);
+
+        if (bytes == null)
+            return null; // Message is not yet completely received.
+
+        return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());

Review comment:
       I think here `ByteOrder.LITTLE_ENDIAN` should be used since this `ByteBuffer` is passed to payload readers

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Handles thin client connection state.
+ */
+public interface ClientConnectionStateHandler {
+    /**
+     * Handles connection loss

Review comment:
       Point at the end

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else

Review comment:
       NL after `}`

##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        srv.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        srv.stop();
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientConnection open(InetSocketAddress addr,
+                                           ClientMessageHandler msgHnd,
+                                           ClientConnectionStateHandler stateHnd)
+            throws ClientConnectionException {
+        try {
+            java.nio.channels.SocketChannel ch = java.nio.channels.SocketChannel.open();

Review comment:
       Add `java.nio.channels.SocketChannel` to imports?

##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.stream.IntStream;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Base class for thin client benchmarks.
+ */
+@State(Scope.Benchmark)
+public class JmhThinClientAbstractBenchmark extends JmhAbstractBenchmark {

Review comment:
       `public abstract class`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r527494615



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       To be honest, there is no need to create more than one selector.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r534708797



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       If the timeout is too low, there are no guarantees that the message already sent when control returned to the user thread after `pendingReq.get(timeout)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533433937



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       Got it. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r530963983



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       @ivandasch I've did some basic benchmarks (see JmhThinClientCacheBenchmark) and could not come up with a config where multiple selectors work faster than one (various cluster sizes, thread counts, entry sizes). You are right, let's keep this hardcoded to `1` and not bother with making it configurable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r530963983



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       @ivandasch I've done some basic benchmarks (see JmhThinClientCacheBenchmark) and could not come up with a config where multiple selectors work faster than one (various cluster sizes, thread counts, entry sizes). You are right, let's keep this hardcoded to `1` and not bother with making it configurable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] alex-plekhanov commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
alex-plekhanov commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533915757



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       We can also make micro-optimization - avoid `arrayCopy` for sync requests without a timeout, in this case, the thread can't reuse buffer again until the response is received. And latency for such operations will be a little bit better. (Perhaps this can be made in another ticket)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#issuecomment-734261463


   > Refactor Java Thin Client to use GridNioServer in client mode:
   > 
   > * True non-blocking async
   > * Worker threads are shared across connections within single `IgniteClient`
   > 
   > Benchmark results:
   > 
   > ```
   > BEFORE (master)
   > Benchmark                         Mode  Cnt      Score      Error  Units
   > JmhThinClientCacheBenchmark.get  thrpt   10  65916.805 ± 2118.954  ops/s
   > JmhThinClientCacheBenchmark.put  thrpt   10  62304.444 ± 2521.371  ops/s
   > 
   > AFTER (ignite-13496)
   > Benchmark                         Mode  Cnt      Score      Error  Units
   > JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
   > JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s
   > ```
   
   Great results! Good  (and expected by me) news that only one IO loop thread is enough to outperform traditional approach.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533532985



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client message parser.
+ */
+class GridNioClientParser implements GridNioParser {
+    /** */
+    private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) {
+        ClientMessageDecoder decoder = ses.meta(SES_META_DECODER);
+
+        if (decoder == null) {
+            decoder = new ClientMessageDecoder();
+
+            ses.addMeta(SES_META_DECODER, decoder);
+        }
+
+        byte[] bytes = decoder.apply(buf);
+
+        if (bytes == null)
+            return null; // Message is not yet completely received.
+
+        return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());

Review comment:
       Good catch! Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] isapego commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
isapego commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r531030844



##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Benchmark                         Mode  Cnt      Score      Error  Units
+ * JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
+ * JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s

Review comment:
       Maybe we should also specify the hardware used to get those results.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533428348



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       `PayloadOutputChannel` creates `BinaryHeapOutputStream`, which uses `BinaryMemoryAllocator.THREAD_LOCAL.chunk()`. This thread-local chunk will be released as soon as we leave the scope, because `PayloadOutputChannel` is wrapped in a try-with-resources block. However, `write` passes the buffer to the NIO framework, which may process it later, causing "use-after-free" of sorts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r532443963



##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Benchmark                         Mode  Cnt      Score      Error  Units
+ * JmhThinClientCacheBenchmark.get  thrpt   10  92501.557 ± 1380.384  ops/s
+ * JmhThinClientCacheBenchmark.put  thrpt   10  82907.446 ± 7572.537  ops/s

Review comment:
       Added `Mode.AverageTime`, added system config to the results comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ivandasch commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ivandasch commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529559051



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       > With Partition Awareness a single IgniteClient connects to all server nodes
   1 thread easily can handle thousands connections. Look at nodejs for another example. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533422847



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -543,31 +450,20 @@ else if (addr.getPort() < 1024 || addr.getPort() > 49151)
             throw new IllegalArgumentException(error);
     }
 
-    /** Create socket. */
-    private static Socket createSocket(ClientChannelConfiguration cfg) throws IOException {
-        Socket sock = cfg.getSslMode() == SslMode.REQUIRED ?
-            new ClientSslSocketFactory(cfg).create() :
-            new Socket(cfg.getAddress().getHostName(), cfg.getAddress().getPort());
-
-        sock.setTcpNoDelay(cfg.isTcpNoDelay());
-
-        if (cfg.getTimeout() > 0)
-            sock.setSoTimeout(cfg.getTimeout());
-
-        if (cfg.getSendBufferSize() > 0)
-            sock.setSendBufferSize(cfg.getSendBufferSize());
-
-        if (cfg.getReceiveBufferSize() > 0)
-            sock.setReceiveBufferSize(cfg.getReceiveBufferSize());
-
-        return sock;
-    }
-
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
+        ClientRequestFuture fut = new ClientRequestFuture();
+        pendingReqs.put(-1L, fut);
+
         handshakeReq(ver, user, pwd, userAttrs);
-        handshakeRes(ver, user, pwd, userAttrs);
+
+        try {
+            ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
+            handshakeRes(res, ver, user, pwd, userAttrs);
+        } catch (IgniteCheckedException e) {

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533429394



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: [DRAFT] IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r529576171



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioServerClientConnectionMultiplexer.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.logger.java.JavaLogger;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioServerClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv; // TODO: <ByteBuffer> possible?
+
+    public GridNioServerClientConnectionMultiplexer() {
+        IgniteLogger gridLog = new JavaLogger(false);
+
+        ClientMessageDecoder decoder = new ClientMessageDecoder();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioParser() {
+            @Override
+            public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+                byte[] bytes = decoder.apply(buf);
+
+                return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder());
+            }
+
+            @Override
+            public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+                return (ByteBuffer)msg;
+            }
+        }, gridLog, false);
+
+//        if (sslCtx != null) {
+//            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+//
+//            sslFilter.directMode(false);
+//
+//            filters = new GridNioFilter[]{codecFilter, sslFilter};
+//        }
+//        else
+        filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .address(InetAddress.getLoopbackAddress()) // TODO: Remove?
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioServerListener<ByteBuffer>() {
+                        @Override
+                        public void onConnected(GridNioSession ses) {
+                            System.out.println("Connect");
+                        }
+
+                        @Override
+                        public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                            System.out.println("Disconnect");
+                        }
+
+                        @Override
+                        public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+                        }
+
+                        @Override
+                        public void onMessage(GridNioSession ses, ByteBuffer msg) {
+                            GridNioServerClientConnection conn = ses.meta(GridNioServerClientConnection.SES_META_CONN);
+
+                            conn.onMessage(msg);
+                        }
+
+                        @Override
+                        public void onSessionWriteTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onSessionIdleTimeout(GridNioSession ses) {
+
+                        }
+
+                        @Override
+                        public void onFailure(FailureType failureType, Throwable failure) {
+                            System.out.println("Fail");
+                        }
+                    })
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // TODO: Get from settings

Review comment:
       Ok, you may be right. I'll do some benchmarks and we'll see.
   
   > nodejs for another example
   
   Not a good example because of single-threaded javascript nature; also, not anymore https://nodejs.org/api/worker_threads.html




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533527354



##########
File path: modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.stream.IntStream;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Base class for thin client benchmarks.
+ */
+@State(Scope.Benchmark)
+public class JmhThinClientAbstractBenchmark extends JmhAbstractBenchmark {

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533421783



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+    /** Worker thread prefix. */
+    private static final String THREAD_PREFIX = "thin-client-channel";
+
+    /** */
+    private static final int CLIENT_MODE_PORT = -1;
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final SSLContext sslCtx;
+
+    /**
+     * Constructor.
+     *
+     * @param cfg Client config.
+     */
+    public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+        IgniteLogger gridLog = new NullLogger();
+
+        GridNioFilter[] filters;
+
+        GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+        sslCtx = ClientSslUtils.getSslContext(cfg);
+
+        if (sslCtx != null) {
+            GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+            sslFilter.directMode(false);
+            filters = new GridNioFilter[]{codecFilter, sslFilter};
+        } else
+            filters = new GridNioFilter[]{codecFilter};
+
+        try {
+            srv = GridNioServer.<ByteBuffer>builder()
+                    .port(CLIENT_MODE_PORT)
+                    .listener(new GridNioClientListener())
+                    .filters(filters)
+                    .logger(gridLog)
+                    .selectorCount(1) // Using more selectors does not seem to improve performance.
+                    .byteOrder(ByteOrder.nativeOrder())
+                    .directBuffer(true)
+                    .directMode(false)
+                    .igniteInstanceName("thinClient")
+                    .serverName(THREAD_PREFIX)
+                    .idleTimeout(Long.MAX_VALUE)
+                    .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+                    .socketSendBufferSize(cfg.getSendBufferSize())
+                    .tcpNoDelay(true)
+                    .build();
+        } catch (IgniteCheckedException e) {

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r533428761



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decodes thin client messages from partial buffers.
+  */

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ignite] ptupitsyn commented on a change in pull request #8483: IGNITE-13496 Java thin: make async API non-blocking with GridNioServer

Posted by GitBox <gi...@apache.org>.
ptupitsyn commented on a change in pull request #8483:
URL: https://github.com/apache/ignite/pull/8483#discussion_r534284916



##########
File path: modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
##########
@@ -292,7 +233,8 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
 
             req.writeInt(0, req.position() - 4); // Actual size.
 
-            write(req.array(), req.position());
+            // arrayCopy is required, because buffer is pooled, and write is async.
+            write(req.arrayCopy(), req.position());

Review comment:
       @alex-plekhanov ticket filed: https://issues.apache.org/jira/browse/IGNITE-13804
   
   > without a timeout
   
   How does the timeout affect this logic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org