You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "ivandasch (via GitHub)" <gi...@apache.org> on 2023/01/26 19:13:34 UTC

[GitHub] [ignite] ivandasch opened a new pull request, #10501: IGNITE-18615 First steps.

ivandasch opened a new pull request, #10501:
URL: https://github.com/apache/ignite/pull/10501

   Thank you for submitting the pull request to the Apache Ignite.
   
   In order to streamline the review of the contribution 
   we ask you to ensure the following steps have been taken:
   
   ### The Contribution Checklist
   - [ ] There is a single JIRA ticket related to the pull request. 
   - [ ] The web-link to the pull request is attached to the JIRA ticket.
   - [ ] The JIRA ticket has the _Patch Available_ state.
   - [ ] The pull request body describes changes that have been made. 
   The description explains _WHAT_ and _WHY_ was made instead of _HOW_.
   - [ ] The pull request title is treated as the final commit message. 
   The following pattern must be used: `IGNITE-XXXX Change summary` where `XXXX` - number of JIRA issue.
   - [ ] A reviewer has been mentioned through the JIRA comments 
   (see [the Maintainers list](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute#HowtoContribute-ReviewProcessandMaintainers)) 
   - [ ] The pull request has been checked by the Teamcity Bot and 
   the `green visa` attached to the JIRA ticket (see [TC.Bot: Check PR](https://mtcga.gridgain.com/prs.html))
   
   ### Notes
   - [How to Contribute](https://cwiki.apache.org/confluence/display/IGNITE/How+to+Contribute)
   - [Coding abbreviation rules](https://cwiki.apache.org/confluence/display/IGNITE/Abbreviation+Rules)
   - [Coding Guidelines](https://cwiki.apache.org/confluence/display/IGNITE/Coding+Guidelines)
   - [Apache Ignite Teamcity Bot](https://cwiki.apache.org/confluence/display/IGNITE/Apache+Ignite+Teamcity+Bot)
   
   If you need any help, please email dev@ignite.apache.org or ask anу advice on http://asf.slack.com _#ignite_ channel.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] alex-plekhanov commented on a diff in pull request #10501: IGNITE-18615 Implement thin client monitoring events.

Posted by "alex-plekhanov (via GitHub)" <gi...@apache.org>.
alex-plekhanov commented on code in PR #10501:
URL: https://github.com/apache/ignite/pull/10501#discussion_r1098374858


##########
modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.EventListener;
+
+/** */
+public interface ConnectionEventListener extends EventListener {
+    /**
+     * @param event Handshake start event.
+     */
+    default void onHandshakeStart(HandshakeStartEvent event) {
+        // No-op.
+    }
+
+    /**
+     * @param event Handshake success event.
+     */
+    default void onHandshakeSuccess(HandshakeSuccessEvent event) {
+        // No-op.
+    }
+
+    /**
+     * @param event Handshake fail event.
+     */
+    default void onHandshakeFail(HandshakeFailEvent event) {
+        // No-op.
+    }
+
+    /**
+     * @param event Authentication fail event.
+     */
+    default void onAuthenticationFail(AuthenticationFailEvent event) {
+        // No-op.
+    }
+
+    /**
+     * @param event Connection closed event (without exception).

Review Comment:
   Why "without exception"? ConnectionClosedEvent contains exception.



##########
modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class HandshakeFailEvent extends ConnectionEvent {
+    /** */
+    private final long elapsedTimeNanos;
+
+    /** */
+    private final Throwable throwable;
+
+    /**
+     * @param conn Connection description.
+     * @param elapsedTimeNanos Elapsed time in nanoseconds.
+     * @param throwable Throwable that caused the failure.
+     */
+    public HandshakeFailEvent(
+        ConnectionDescription conn,
+        long elapsedTimeNanos,
+        Throwable throwable
+    ) {
+        super(conn);
+
+        this.elapsedTimeNanos = elapsedTimeNanos;
+        this.throwable = throwable;
+    }
+
+    /**
+     * Get the elapsed time of the query.

Review Comment:
   `query` -> `request`? (In other classes too)



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.EventListener;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.AuthenticationFailEvent;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionEvent;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests connection event listeners of a thin client.
+ */
+public class IgniteClientConnectionEventListenerTest extends GridCommonAbstractTest {
+    /** */
+    private static final InetAddress LOCALHOST;
+
+    /** */
+    private static final int SRV_PORT = 10800;
+
+    static {
+        try {
+            LOCALHOST = InetAddress.getByName("127.0.0.1");
+        }
+        catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** */
+    @Test
+    public void testBasic() throws Exception {
+        ProtocolVersion srvVer = ProtocolVersion.V1_6_0;
+        try (FakeIgniteServer srv = new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), srvVer)) {
+            srv.start();
+
+            Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+            ConnectionEventListener lsnr = new ConnectionEventListener() {
+                @Override public void onHandshakeStart(HandshakeStartEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onHandshakeSuccess(HandshakeSuccessEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+            };
+
+            long startNano = System.nanoTime();
+            try (IgniteClient ignored = startClient(lsnr)) {
+                GridTestUtils.waitForCondition(() -> evSet.size() == 2, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+                HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class);
+
+                assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER
+                    + ", features=[]]");
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress());
+                assertEquals(null, hsStartEv.connectionDescription().serverNodeId());
+
+                HandshakeSuccessEvent hsSuccEv = (HandshakeSuccessEvent)evSet.get(HandshakeSuccessEvent.class);
+
+                assertEquals(hsSuccEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsSuccEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().localAddress().getAddress());
+                assertEquals(srv.nodeId(), hsSuccEv.connectionDescription().serverNodeId());
+                assertTrue(System.nanoTime() - startNano >= hsSuccEv.elapsedTime(TimeUnit.NANOSECONDS));
+            }
+
+            GridTestUtils.waitForCondition(() -> evSet.size() == 3, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+            ConnectionClosedEvent closedEv = (ConnectionClosedEvent)evSet.get(ConnectionClosedEvent.class);
+
+            assertEquals(closedEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+            assertEquals(LOCALHOST, closedEv.connectionDescription().remoteAddress().getAddress());
+            assertEquals(SRV_PORT, closedEv.connectionDescription().remoteAddress().getPort());
+            assertEquals(LOCALHOST, closedEv.connectionDescription().localAddress().getAddress());
+            assertEquals(srv.nodeId(), closedEv.connectionDescription().serverNodeId());
+        }
+    }
+
+    /** */
+    @Test
+    public void testUnsupportedProtocolFail() throws Exception {
+        ProtocolVersion unsupportedProto = new ProtocolVersion((short)1, (short)8, (short)0);
+        assertTrue(unsupportedProto.compareTo(ProtocolVersion.LATEST_VER) > 0);
+
+        long startNano = System.nanoTime();
+        testFail(
+            () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), unsupportedProto),
+            (HandshakeFailEvent event, Throwable hsErr) -> {
+                assertTrue(System.nanoTime() - startNano >= event.elapsedTime(TimeUnit.NANOSECONDS));
+                assertEquals(hsErr, event.throwable());
+            },
+            HandshakeFailEvent.class
+        );
+    }
+
+    /** */
+    @Test
+    public void testAuthenticationFail() throws Exception {
+        long startNano = System.nanoTime();
+        testFail(
+            () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(FakeIgniteServer.ErrorType.AUTHENTICATION_ERROR)),
+            (AuthenticationFailEvent event, Throwable hsErr) -> {
+                assertTrue(System.nanoTime() - startNano >= event.elapsedTime(TimeUnit.NANOSECONDS));
+                assertEquals(hsErr, event.throwable());
+            },
+            AuthenticationFailEvent.class
+        );
+    }
+
+    /** */
+    @Test
+    public void testHandshakeFail() throws Exception {
+        AtomicLong startNano = new AtomicLong(System.nanoTime());
+        testFail(
+            () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(FakeIgniteServer.ErrorType.HANDSHAKE_ERROR)),
+            (HandshakeFailEvent event, Throwable hsErr) -> {
+                assertTrue(System.nanoTime() - startNano.get() >= event.elapsedTime(TimeUnit.NANOSECONDS));
+                assertEquals(hsErr, event.throwable());
+            },
+            HandshakeFailEvent.class
+        );
+
+        startNano.set(System.nanoTime());

Review Comment:
   startNano is not used after this line, why do we set it?



##########
modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.monitoring;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EventListener;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.events.AuthenticationFailEvent;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionDescription;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.client.events.RequestEventListener;
+import org.apache.ignite.client.events.RequestFailEvent;
+import org.apache.ignite.client.events.RequestStartEvent;
+import org.apache.ignite.client.events.RequestSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Routes events to listeners, specified in the client configuration.
+ */
+public class EventListenerDemultiplexer {
+    /** Noop listener. */
+    private static final EventListenerDemultiplexer NO_OP = new EventListenerDemultiplexer();
+
+    /** */
+    final List<RequestEventListener> qryEventListeners;

Review Comment:
   `qryEventListeners` -> `reqEventListeners`?



##########
modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+/** */
+public abstract class RequestEvent {
+    /** */
+    private final ConnectionDescription conn;
+
+    /** */
+    private final long qryId;
+
+    /** */
+    private final short opCode;
+
+    /** */
+    private final String opName;
+
+    /**
+     * @param conn Connection description.
+     * @param qryId Query id.
+     * @param opCode Operation code.
+     * @param opName Operation name.
+     */
+    protected RequestEvent(
+        ConnectionDescription conn,
+        long qryId,
+        short opCode,
+        String opName
+    ) {
+        this.conn = conn;
+        this.qryId = qryId;
+        this.opCode = opCode;
+        this.opName = opName;
+    }
+
+    /**
+     * @return Connection description.
+     */
+    public ConnectionDescription connectionDescription() {
+        return conn;
+    }
+
+    /**
+     * @return Query id.
+     */
+    public long queryId() {

Review Comment:
   requestId



##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java:
##########
@@ -601,17 +649,117 @@ else if (cfg.getHeartbeatInterval() <= 0)
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
-        ClientRequestFuture fut = new ClientRequestFuture();
-        pendingReqs.put(-1L, fut);
+        long requestId = -1L;
+        long startTime = System.nanoTime();
 
-        handshakeReq(ver, user, pwd, userAttrs);
+        eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+            new ProtocolContext(ver, null).toString(), null));
 
-        try {
-            ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
-            handshakeRes(res, ver, user, pwd, userAttrs);
-        }
-        catch (IgniteCheckedException e) {
-            throw new ClientConnectionException(e.getMessage(), e);
+        while (true) {
+            ClientRequestFuture fut = new ClientRequestFuture(requestId, ClientOperation.HANDSHAKE);
+
+            pendingReqs.put(requestId, fut);
+
+            handshakeReq(ver, user, pwd, userAttrs);
+
+            try {
+                ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get();
+
+                BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
+
+                try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
+                    boolean success = res.readBoolean();
+
+                    if (success) {
+                        byte[] features = EMPTY_BYTES;
+
+                        if (ProtocolContext.isFeatureSupported(ver, BITMAP_FEATURES))
+                            features = reader.readByteArray();
+
+                        protocolCtx = new ProtocolContext(ver, ProtocolBitmaskFeature.enumSet(features));
+
+                        if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
+                            // Reading server UUID
+                            srvNodeId = reader.readUuid();
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Handshake succeeded [protocolVersion=" + protocolCtx.version() + ", srvNodeId=" + srvNodeId + ']');
+
+                        eventListener.onHandshakeSuccess(
+                            new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId),
+                            System.nanoTime() - startTime
+                        );
+
+                        break;
+                    }
+                    else {
+                        ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
+
+                        String err = reader.readString();
+                        int errCode = ClientStatus.FAILED;
+
+                        if (res.remaining() > 0)
+                            errCode = reader.readInt();
+
+                        if (log.isDebugEnabled())
+                            log.debug("Handshake failed [protocolVersion=" + srvVer + ", err=" + err + ", errCode=" + errCode + ']');
+
+                        RuntimeException resultErr = null;
+                        if (errCode == ClientStatus.AUTH_FAILED)
+                            resultErr = new ClientAuthenticationException(err);
+                        else if (ver.equals(srvVer))
+                            resultErr = new ClientProtocolError(err);
+                        else if (!supportedVers.contains(srvVer) ||
+                            (!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user))) {
+                            // Server version is not supported by this client OR server version is less than 1.1.0 supporting
+                            // authentication and authentication is required.
+                            resultErr = new ClientProtocolError(String.format(
+                                "Protocol version mismatch: client %s / server %s. Server details: %s",
+                                ver,
+                                srvVer,
+                                err
+                            ));
+                        }
+
+                        if (resultErr != null) {
+                            ConnectionDescription connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+                                new ProtocolContext(ver).toString(), null);
+
+                            long elapsedNanos = System.nanoTime() - startTime;
+
+                            if (errCode == ClientStatus.AUTH_FAILED)
+                                eventListener.onAuthenticationFail(connDesc, elapsedNanos, user, resultErr);
+                            else
+                                eventListener.onHandshakeFail(connDesc, elapsedNanos, resultErr);
+
+                            throw resultErr;
+                        }
+                        else {
+                            // Retry with server version.
+                            if (log.isDebugEnabled())
+                                log.debug("Retrying handshake with server version [protocolVersion=" + srvVer + ']');
+
+                            ver = srvVer;
+                        }
+                    }
+                }
+            }
+            catch (IOException | IgniteCheckedException e) {
+                ClientException err;
+                if (e instanceof IOException)
+                    err = handleIOError((IOException)e);
+                else
+                    err = new ClientConnectionException(e.getMessage(), e);
+
+                eventListener.onHandshakeFail(
+                    new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), new ProtocolContext(ver).toString(), null),
+                    startTime - System.nanoTime(),

Review Comment:
   System.nanoTime() - startTime



##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousRequestEventBufferTest.java:
##########
@@ -33,7 +33,7 @@
 /**
  *
  */
-public class CacheContinuousQueryEventBufferTest extends GridCommonAbstractTest {
+public class CacheContinuousRequestEventBufferTest extends GridCommonAbstractTest {

Review Comment:
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] alex-plekhanov commented on a diff in pull request #10501: IGNITE-18615 Implement thin client monitoring events.

Posted by "alex-plekhanov (via GitHub)" <gi...@apache.org>.
alex-plekhanov commented on code in PR #10501:
URL: https://github.com/apache/ignite/pull/10501#discussion_r1099840935


##########
modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java:
##########
@@ -601,17 +649,114 @@ else if (cfg.getHeartbeatInterval() <= 0)
     /** Client handshake. */
     private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
         throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
-        ClientRequestFuture fut = new ClientRequestFuture();
-        pendingReqs.put(-1L, fut);
+        long requestId = -1L;
+        long startTime = System.nanoTime();
 
-        handshakeReq(ver, user, pwd, userAttrs);
+        eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+            new ProtocolContext(ver, null).toString(), null));

Review Comment:
   Since new ProtocolContext constructor was introduced, `new ProtocolContext(ver)` can be used



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java:
##########
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.EnumSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.client.thin.ProtocolContext;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.internal.client.thin.ProtocolVersionFeature;
+import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientParser;
+import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.testframework.junits.JUnitAssertAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A fake ignite server for testing handshake and connection errors handling on the thin client side.
+ */
+public class FakeIgniteServer extends JUnitAssertAware implements GridNioServerListener<ByteBuffer>, AutoCloseable {
+    /** */
+    static final int HANDSHAKE_PERFORMED = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** */
+    public static final byte[] EMPTY_BYTES = new byte[0];
+
+    /** */
+    private final GridNioServer<ByteBuffer> srv;
+
+    /** */
+    private final EnumSet<ErrorType> errorTypes;
+
+    /** */
+    private final ProtocolVersion protoVer;
+
+    /** */
+    private final UUID nodeId = UUID.randomUUID();
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger) {
+        this(addr, port, logger, null, null);
+    }
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, EnumSet<ErrorType> errorTypes) {
+        this(addr, port, logger, null, errorTypes);
+    }
+
+    /** */
+    public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, ProtocolVersion protoVer) {
+        this(addr, port, logger, protoVer, null);
+    }
+
+    /** */
+    public FakeIgniteServer(
+        InetAddress addr,
+        int port,
+        IgniteLogger logger,
+        ProtocolVersion protoVer,
+        EnumSet<ErrorType> errorTypes
+    ) {
+        this.protoVer = protoVer != null ? protoVer : ProtocolVersion.LATEST_VER;

Review Comment:
   Perhaps, it would be better to fix protocol version here (use 1.7.0 for example), to avoid fake server modification on each protocol change



##########
modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java:
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.EventListener;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionEvent;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests connection event listeners of a thin client.
+ */
+public class IgniteClientConnectionEventListenerTest extends GridCommonAbstractTest {
+    /** */
+    private static final InetAddress LOCALHOST;
+
+    /** */
+    private static final int SRV_PORT = 10800;
+
+    static {
+        try {
+            LOCALHOST = InetAddress.getByName("127.0.0.1");
+        }
+        catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** */
+    @Test
+    public void testBasic() throws Exception {
+        ProtocolVersion srvVer = ProtocolVersion.V1_6_0;
+        try (FakeIgniteServer srv = new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), srvVer)) {
+            srv.start();
+
+            Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+            ConnectionEventListener lsnr = new ConnectionEventListener() {
+                @Override public void onHandshakeStart(HandshakeStartEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onHandshakeSuccess(HandshakeSuccessEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+
+                @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+                    evSet.put(event.getClass(), event);
+                }
+            };
+
+            long startNano = System.nanoTime();
+            try (IgniteClient ignored = startClient(lsnr)) {
+                GridTestUtils.waitForCondition(() -> evSet.size() == 2, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+                HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class);
+
+                assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER
+                    + ", features=[]]");
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress());
+                assertEquals(null, hsStartEv.connectionDescription().serverNodeId());
+
+                HandshakeSuccessEvent hsSuccEv = (HandshakeSuccessEvent)evSet.get(HandshakeSuccessEvent.class);
+
+                assertEquals(hsSuccEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().remoteAddress().getAddress());
+                assertEquals(SRV_PORT, hsSuccEv.connectionDescription().remoteAddress().getPort());
+                assertEquals(LOCALHOST, hsSuccEv.connectionDescription().localAddress().getAddress());
+                assertEquals(srv.nodeId(), hsSuccEv.connectionDescription().serverNodeId());
+                assertTrue(System.nanoTime() - startNano >= hsSuccEv.elapsedTime(TimeUnit.NANOSECONDS));
+            }
+
+            GridTestUtils.waitForCondition(() -> evSet.size() == 3, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+            ConnectionClosedEvent closedEv = (ConnectionClosedEvent)evSet.get(ConnectionClosedEvent.class);
+
+            assertEquals(closedEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+            assertEquals(LOCALHOST, closedEv.connectionDescription().remoteAddress().getAddress());
+            assertEquals(SRV_PORT, closedEv.connectionDescription().remoteAddress().getPort());
+            assertEquals(LOCALHOST, closedEv.connectionDescription().localAddress().getAddress());
+            assertEquals(srv.nodeId(), closedEv.connectionDescription().serverNodeId());
+        }
+    }
+
+    /** */
+    @Test
+    public void testUnsupportedProtocolFail() {
+        ProtocolVersion unsupportedProto = new ProtocolVersion((short)1, (short)8, (short)0);
+        assertTrue(unsupportedProto.compareTo(ProtocolVersion.LATEST_VER) > 0);

Review Comment:
   Why can't we use some big value for major version? Test needs to be fixed after next version update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite] ivandasch merged pull request #10501: IGNITE-18615 Implement thin client monitoring events.

Posted by "ivandasch (via GitHub)" <gi...@apache.org>.
ivandasch merged PR #10501:
URL: https://github.com/apache/ignite/pull/10501


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org