You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2023/02/09 08:42:25 UTC
[ignite] branch master updated: IGNITE-18615 Implement monitoring events in java thin client (#10501)
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 1b537d444be IGNITE-18615 Implement monitoring events in java thin client (#10501)
1b537d444be is described below
commit 1b537d444be5ae5c007c3fd677782a3b5a5274c3
Author: Ivan Daschinskiy <iv...@apache.org>
AuthorDate: Thu Feb 9 11:42:13 2023 +0300
IGNITE-18615 Implement monitoring events in java thin client (#10501)
---
.../events/ConnectionClosedEvent.java} | 37 +--
.../client/events/ConnectionDescription.java | 82 ++++++
.../events/ConnectionEvent.java} | 29 +--
.../events/ConnectionEventListener.java} | 40 +--
.../ignite/client/events/HandshakeFailEvent.java | 64 +++++
.../events/HandshakeStartEvent.java} | 27 +-
.../events/HandshakeSuccessEvent.java} | 38 +--
.../apache/ignite/client/events/RequestEvent.java | 79 ++++++
.../events/RequestEventListener.java} | 33 +--
.../ignite/client/events/RequestFailEvent.java | 70 ++++++
.../events/RequestStartEvent.java} | 30 +--
.../ignite/client/events/RequestSuccessEvent.java | 55 ++++
.../ignite/configuration/ClientConfiguration.java | 21 ++
.../monitoring/EventListenerDemultiplexer.java | 222 ++++++++++++++++
.../client/thin/ClientChannelConfiguration.java | 10 +
.../internal/client/thin/ClientOperation.java | 3 +
.../internal/client/thin/ProtocolContext.java | 17 ++
.../internal/client/thin/ProtocolVersion.java | 2 +-
.../internal/client/thin/TcpClientChannel.java | 279 ++++++++++++++-------
.../internal/client/thin/io/ClientConnection.java | 15 ++
.../io/gridnioserver/GridNioClientConnection.java | 11 +
.../thin/io/gridnioserver/GridNioClientParser.java | 2 +-
.../org/apache/ignite/client/ReliabilityTest.java | 2 +-
.../client/thin/events/FakeIgniteServer.java | 261 +++++++++++++++++++
.../IgniteClientConnectionEventListenerTest.java | 233 +++++++++++++++++
.../IgniteClientRequestEventListenerTest.java | 155 ++++++++++++
.../org/apache/ignite/client/ClientTestSuite.java | 6 +-
27 files changed, 1613 insertions(+), 210 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
similarity index 59%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
index a31302b5f26..7a6cb901c3a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionClosedEvent.java
@@ -15,27 +15,34 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
import org.jetbrains.annotations.Nullable;
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
+/** */
+public class ConnectionClosedEvent extends ConnectionEvent {
+ /** */
+ private final Throwable throwable;
+
/**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
+ * @param conn Connection description.
+ * @param throwable Throwable that caused the failure if any.
*/
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ public ConnectionClosedEvent(
+ ConnectionDescription conn,
+ Throwable throwable
+ ) {
+ super(conn);
+
+ this.throwable = throwable;
+ }
/**
- * Closes the connection.
+ * Get a cause of the failure if any.
+ *
+ * @return A cause of the failure if any.
*/
- @Override void close();
+ @Nullable public Throwable throwable() {
+ return throwable;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java
new file mode 100644
index 00000000000..672ba735f5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionDescription.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.net.InetSocketAddress;
+import java.util.UUID;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class ConnectionDescription {
+ /** Local connection address. */
+ private final InetSocketAddress locAddr;
+
+ /** Remote connection address. */
+ private final InetSocketAddress rmtAddr;
+
+ /** Server node id. */
+ private final UUID srvNodeId;
+
+ /** */
+ private final String protocol;
+
+ /**
+ * @param locAddr Local connection address.
+ * @param rmtAddr Remote connection address.
+ * @param protocol String representation of a connection protocol details.
+ * @param srvNodeId Server node id.
+ */
+ public ConnectionDescription(InetSocketAddress locAddr, InetSocketAddress rmtAddr, String protocol, UUID srvNodeId) {
+ this.locAddr = locAddr;
+ this.rmtAddr = rmtAddr;
+ this.protocol = protocol;
+ this.srvNodeId = srvNodeId;
+ }
+
+ /**
+ * Gets local address of this connection.
+ *
+ * @return Local network address or {@code null} if non-socket communication is used.
+ */
+ @Nullable public InetSocketAddress localAddress() {
+ return locAddr;
+ }
+
+ /**
+ * Gets address of remote peer of this connection.
+ *
+ * @return Address of remote peer or {@code null} if non-socket communication is used.
+ */
+ @Nullable public InetSocketAddress remoteAddress() {
+ return rmtAddr;
+ }
+
+ /**
+ * @return Server node id.
+ */
+ @Nullable public UUID serverNodeId() {
+ return srvNodeId;
+ }
+
+ /**
+ * @return String representation of connection protocol.
+ */
+ @Nullable public String protocol() {
+ return protocol;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
index a31302b5f26..941fcf3a773 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEvent.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
-import java.nio.ByteBuffer;
+/** */
+public abstract class ConnectionEvent {
+ /** */
+ private final ConnectionDescription conn;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
/**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
+ * @param conn Connection description.
*/
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ protected ConnectionEvent(ConnectionDescription conn) {
+ this.conn = conn;
+ }
/**
- * Closes the connection.
+ * @return Connection description.
*/
- @Override void close();
+ public ConnectionDescription connectionDescription() {
+ return conn;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
similarity index 53%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
index a31302b5f26..52ad661cc85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/ConnectionEventListener.java
@@ -15,27 +15,37 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
-import java.nio.ByteBuffer;
+import java.util.EventListener;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public interface ConnectionEventListener extends EventListener {
+ /**
+ * @param event Handshake start event.
+ */
+ default void onHandshakeStart(HandshakeStartEvent event) {
+ // No-op.
+ }
+
+ /**
+ * @param event Handshake success event.
+ */
+ default void onHandshakeSuccess(HandshakeSuccessEvent event) {
+ // No-op.
+ }
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
/**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
+ * @param event Handshake fail event.
*/
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ default void onHandshakeFail(HandshakeFailEvent event) {
+ // No-op.
+ }
/**
- * Closes the connection.
+ * @param event Connection closed event (with or without exception).
*/
- @Override void close();
+ default void onConnectionClosed(ConnectionClosedEvent event) {
+ // No-op.
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java
new file mode 100644
index 00000000000..4af696d7be9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeFailEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class HandshakeFailEvent extends ConnectionEvent {
+ /** */
+ private final long elapsedTimeNanos;
+
+ /** */
+ private final Throwable throwable;
+
+ /**
+ * @param conn Connection description.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ * @param throwable Throwable that caused the failure.
+ */
+ public HandshakeFailEvent(
+ ConnectionDescription conn,
+ long elapsedTimeNanos,
+ Throwable throwable
+ ) {
+ super(conn);
+
+ this.elapsedTimeNanos = elapsedTimeNanos;
+ this.throwable = throwable;
+ }
+
+ /**
+ * Get the elapsed time of the handshake.
+ *
+ * @param timeUnit Desired time unit in which to return the elapsed time.
+ * @return the elapsed time.
+ */
+ public long elapsedTime(TimeUnit timeUnit) {
+ return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Get a cause of the failure.
+ *
+ * @return a cause of the failure.
+ */
+ public Throwable throwable() {
+ return throwable;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
index a31302b5f26..6c820d0ca5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeStartEvent.java
@@ -15,27 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
- /**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
- */
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+package org.apache.ignite.client.events;
+/** */
+public class HandshakeStartEvent extends ConnectionEvent {
/**
- * Closes the connection.
+ * @param conn Connection description.
*/
- @Override void close();
+ public HandshakeStartEvent(ConnectionDescription conn) {
+ super(conn);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
index a31302b5f26..570b84a2683 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/HandshakeSuccessEvent.java
@@ -15,27 +15,35 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
-import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public class HandshakeSuccessEvent extends ConnectionEvent {
+ /** */
+ private final long elapsedTimeNanos;
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
/**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
+ * @param conn Connection description.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
*/
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ public HandshakeSuccessEvent(
+ ConnectionDescription conn,
+ long elapsedTimeNanos
+ ) {
+ super(conn);
+
+ this.elapsedTimeNanos = elapsedTimeNanos;
+ }
/**
- * Closes the connection.
+ * Get the elapsed time of the handshake.
+ *
+ * @param timeUnit Desired time unit in which to return the elapsed time.
+ * @return the elapsed time.
*/
- @Override void close();
+ public long elapsedTime(TimeUnit timeUnit) {
+ return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java
new file mode 100644
index 00000000000..74a4e1785df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEvent.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+/** */
+public abstract class RequestEvent {
+ /** */
+ private final ConnectionDescription conn;
+
+ /** */
+ private final long requestId;
+
+ /** */
+ private final short opCode;
+
+ /** */
+ private final String opName;
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ */
+ protected RequestEvent(
+ ConnectionDescription conn,
+ long requestId,
+ short opCode,
+ String opName
+ ) {
+ this.conn = conn;
+ this.requestId = requestId;
+ this.opCode = opCode;
+ this.opName = opName;
+ }
+
+ /**
+ * @return Connection description.
+ */
+ public ConnectionDescription connectionDescription() {
+ return conn;
+ }
+
+ /**
+ * @return Request id.
+ */
+ public long requestId() {
+ return requestId;
+ }
+
+ /**
+ * @return Operation code.
+ */
+ public short operationCode() {
+ return opCode;
+ }
+
+ /**
+ * @return Operation name.
+ */
+ public String operationName() {
+ return opName;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
similarity index 58%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
index a31302b5f26..0a9da249a48 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestEventListener.java
@@ -15,27 +15,30 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
+package org.apache.ignite.client.events;
-import java.nio.ByteBuffer;
+import java.util.EventListener;
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
+/** */
+public interface RequestEventListener extends EventListener {
+ /**
+ * @param event Request start event.
+ */
+ default void onRequestStart(RequestStartEvent event) {
+ // No-op.
+ }
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
/**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
+ * @param event Request success event.
*/
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ default void onRequestSuccess(RequestSuccessEvent event) {
+ // No-op.
+ }
/**
- * Closes the connection.
+ * @param event Request failure event.
*/
- @Override void close();
+ default void onRequestFail(RequestFailEvent event) {
+ // No-op.
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java
new file mode 100644
index 00000000000..a9cab4569a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestFailEvent.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class RequestFailEvent extends RequestEvent {
+ /** */
+ private final long elapsedTimeNanos;
+
+ /** */
+ private final Throwable throwable;
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ * @param throwable Throwable that caused the failure.
+ */
+ public RequestFailEvent(
+ ConnectionDescription conn,
+ long requestId,
+ short opCode,
+ String opName,
+ long elapsedTimeNanos,
+ Throwable throwable
+ ) {
+ super(conn, requestId, opCode, opName);
+
+ this.elapsedTimeNanos = elapsedTimeNanos;
+ this.throwable = throwable;
+ }
+
+ /**
+ * Get the elapsed time of the request.
+ *
+ * @param timeUnit Desired time unit in which to return the elapsed time.
+ * @return the elapsed time.
+ */
+ public long elapsedTime(TimeUnit timeUnit) {
+ return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Get a cause of the failure.
+ *
+ * @return a cause of the failure.
+ */
+ public Throwable throwable() {
+ return throwable;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
similarity index 56%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
copy to modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
index a31302b5f26..da1aa57111b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestStartEvent.java
@@ -15,27 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin.io;
-
-import java.nio.ByteBuffer;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Client connection: abstracts away sending and receiving messages.
- */
-public interface ClientConnection extends AutoCloseable {
- /**
- * Sends a message.
- *
- * @param msg Message buffer.
- * @param onDone Callback to be invoked when asynchronous send operation completes.
- */
- void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+package org.apache.ignite.client.events;
+/** */
+public class RequestStartEvent extends RequestEvent {
/**
- * Closes the connection.
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
*/
- @Override void close();
+ public RequestStartEvent(ConnectionDescription conn, long requestId, short opCode, String opName) {
+ super(conn, requestId, opCode, opName);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java b/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java
new file mode 100644
index 00000000000..2c3e19be840
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/client/events/RequestSuccessEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.events;
+
+import java.util.concurrent.TimeUnit;
+
+/** */
+public class RequestSuccessEvent extends RequestEvent {
+ /** */
+ private final long elapsedTimeNanos;
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ */
+ public RequestSuccessEvent(
+ ConnectionDescription conn,
+ long requestId,
+ short opCode,
+ String opName,
+ long elapsedTimeNanos
+ ) {
+ super(conn, requestId, opCode, opName);
+
+ this.elapsedTimeNanos = elapsedTimeNanos;
+ }
+
+ /**
+ * Get the elapsed time of the request.
+ *
+ * @param timeUnit Desired time unit in which to return the elapsed time.
+ * @return The elapsed time.
+ */
+ public long elapsedTime(TimeUnit timeUnit) {
+ return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
index 84b680d0bea..a2d1cdbfbf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.EventListener;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
@@ -155,6 +156,9 @@ public final class ClientConfiguration implements Serializable {
/** Logger. */
private IgniteLogger logger;
+ /** */
+ private EventListener[] eventListeners;
+
/**
* @return Host addresses.
*/
@@ -840,4 +844,21 @@ public final class ClientConfiguration implements Serializable {
public IgniteLogger getLogger() {
return logger;
}
+
+ /**
+ * @param listeners Clent event listeners.
+ * @return {@code this} for chaining.
+ */
+ public ClientConfiguration setEventListeners(EventListener... listeners) {
+ eventListeners = listeners;
+
+ return this;
+ }
+
+ /**
+ * @return Client event listeners.
+ */
+ public EventListener[] getEventListeners() {
+ return eventListeners;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java
new file mode 100644
index 00000000000..af9f3c0b7e3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/monitoring/EventListenerDemultiplexer.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.monitoring;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EventListener;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionDescription;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.client.events.RequestEventListener;
+import org.apache.ignite.client.events.RequestFailEvent;
+import org.apache.ignite.client.events.RequestStartEvent;
+import org.apache.ignite.client.events.RequestSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Routes events to listeners, specified in the client configuration.
+ */
+public class EventListenerDemultiplexer {
+ /** Noop listener. */
+ private static final EventListenerDemultiplexer NO_OP = new EventListenerDemultiplexer();
+
+ /** */
+ final List<RequestEventListener> reqEventListeners;
+
+ /** */
+ final List<ConnectionEventListener> connEventListeners;
+
+ /** */
+ final IgniteLogger logger;
+
+ /** */
+ EventListenerDemultiplexer() {
+ reqEventListeners = null;
+ connEventListeners = null;
+ logger = NullLogger.INSTANCE;
+ }
+
+ /** */
+ EventListenerDemultiplexer(
+ List<RequestEventListener> reqEventListeners,
+ List<ConnectionEventListener> connEventListeners,
+ IgniteLogger logger
+ ) {
+ this.logger = logger;
+
+ if (!F.isEmpty(reqEventListeners))
+ this.reqEventListeners = Collections.unmodifiableList(reqEventListeners);
+ else
+ this.reqEventListeners = null;
+
+ if (!F.isEmpty(connEventListeners))
+ this.connEventListeners = Collections.unmodifiableList(connEventListeners);
+ else
+ this.connEventListeners = null;
+
+ }
+
+ /**
+ * Creates an event listener demultiplexer.
+ *
+ * @param cfg Client configuration.
+ */
+ public static EventListenerDemultiplexer create(ClientConfiguration cfg) {
+ if (F.isEmpty(cfg.getEventListeners()))
+ return NO_OP;
+
+ List<RequestEventListener> qryEventListeners = new ArrayList<>();
+ List<ConnectionEventListener> connEventListeners = new ArrayList<>();
+
+ for (EventListener l: cfg.getEventListeners()) {
+ if (l instanceof RequestEventListener)
+ qryEventListeners.add((RequestEventListener)l);
+ else if (l instanceof ConnectionEventListener)
+ connEventListeners.add((ConnectionEventListener)l);
+ }
+
+ if (F.isEmpty(qryEventListeners) && F.isEmpty(connEventListeners))
+ return NO_OP;
+
+ return new EventListenerDemultiplexer(qryEventListeners, connEventListeners, NullLogger.whenNull(cfg.getLogger()));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ */
+ public void onRequestStart(ConnectionDescription conn, long requestId, short opCode, String opName) {
+ if (F.isEmpty(reqEventListeners))
+ return;
+
+ executeForEach(reqEventListeners, l -> l.onRequestStart(new RequestStartEvent(conn, requestId, opCode, opName)));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ */
+ public void onRequestSuccess(
+ ConnectionDescription conn,
+ long requestId,
+ short opCode,
+ String opName,
+ long elapsedTimeNanos
+ ) {
+ if (F.isEmpty(reqEventListeners))
+ return;
+
+ executeForEach(reqEventListeners, l ->
+ l.onRequestSuccess(new RequestSuccessEvent(conn, requestId, opCode, opName, elapsedTimeNanos)));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param requestId Request id.
+ * @param opCode Operation code.
+ * @param opName Operation name.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ * @param throwable Throwable that caused the failure.
+ */
+ public void onRequestFail(
+ ConnectionDescription conn,
+ long requestId,
+ short opCode,
+ String opName,
+ long elapsedTimeNanos,
+ Throwable throwable
+ ) {
+ if (F.isEmpty(reqEventListeners))
+ return;
+
+ executeForEach(reqEventListeners, l ->
+ l.onRequestFail(new RequestFailEvent(conn, requestId, opCode, opName, elapsedTimeNanos, throwable)));
+ }
+
+ /**
+ * @param conn Connection description.
+ */
+ public void onHandshakeStart(ConnectionDescription conn) {
+ if (F.isEmpty(connEventListeners))
+ return;
+
+ executeForEach(connEventListeners, l -> l.onHandshakeStart(new HandshakeStartEvent(conn)));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ */
+ public void onHandshakeSuccess(ConnectionDescription conn, long elapsedTimeNanos) {
+ if (F.isEmpty(connEventListeners))
+ return;
+
+ executeForEach(connEventListeners, l -> l.onHandshakeSuccess(new HandshakeSuccessEvent(conn, elapsedTimeNanos)));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param elapsedTimeNanos Elapsed time in nanoseconds.
+ * @param throwable Throwable that caused the failure.
+ */
+ public void onHandshakeFail(ConnectionDescription conn, long elapsedTimeNanos, Throwable throwable) {
+ if (F.isEmpty(connEventListeners))
+ return;
+
+ executeForEach(connEventListeners, l -> l.onHandshakeFail(new HandshakeFailEvent(conn, elapsedTimeNanos, throwable)));
+ }
+
+ /**
+ * @param conn Connection description.
+ * @param throwable Throwable that caused the failure if any.
+ */
+ public void onConnectionClosed(ConnectionDescription conn, Throwable throwable) {
+ if (F.isEmpty(connEventListeners))
+ return;
+
+ executeForEach(connEventListeners, l -> l.onConnectionClosed(new ConnectionClosedEvent(conn, throwable)));
+ }
+
+ /** */
+ private <T> void executeForEach(List<T> listeners, Consumer<T> action) {
+ assert !F.isEmpty(listeners);
+
+ for (T listener: listeners) {
+ try {
+ action.accept(listener);
+ }
+ catch (Exception e) {
+ logger.warning("Exception thrown while consuming event in listener " + listener, e);
+ }
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
index e81dc9204c8..f22ce8f91e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientChannelConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.client.SslMode;
import org.apache.ignite.client.SslProtocol;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.monitoring.EventListenerDemultiplexer;
/**
* Configuration required to initialize {@link TcpClientChannel}.
@@ -110,6 +111,9 @@ final class ClientChannelConfiguration {
/** */
private final IgniteLogger logger;
+ /** */
+ private final EventListenerDemultiplexer eventListener;
+
/**
* Constructor.
*/
@@ -141,6 +145,7 @@ final class ClientChannelConfiguration {
this.heartbeatInterval = cfg.getHeartbeatInterval();
this.autoBinaryConfigurationEnabled = cfg.isAutoBinaryConfigurationEnabled();
this.logger = cfg.getLogger();
+ this.eventListener = EventListenerDemultiplexer.create(cfg);
}
/**
@@ -325,4 +330,9 @@ final class ClientChannelConfiguration {
public IgniteLogger getLogger() {
return logger;
}
+
+ /** */
+ public EventListenerDemultiplexer eventListener() {
+ return eventListener;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index ddd12c1bd47..c291a0e3e9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -24,6 +24,9 @@ import org.jetbrains.annotations.Nullable;
/** Operation codes. */
public enum ClientOperation {
+ /** Handshake */
+ HANDSHAKE(-1),
+
/** Resource close. */
RESOURCE_CLOSE(0),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
index ec8fb6e2f51..98ee17871a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolContext.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.client.thin;
import java.util.EnumSet;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
* Protocol Context.
@@ -30,6 +31,13 @@ public class ProtocolContext {
/** Features. */
private final EnumSet<ProtocolBitmaskFeature> features;
+ /**
+ * @param ver Protocol version.
+ */
+ public ProtocolContext(ProtocolVersion ver) {
+ this(ver, null);
+ }
+
/**
* @param ver Protocol version.
* @param features Supported features.
@@ -87,4 +95,13 @@ public class ProtocolContext {
public static boolean isFeatureSupported(ProtocolVersion ver, ProtocolVersionFeature feature) {
return ver.compareTo(feature.verIntroduced()) >= 0;
}
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(
+ ProtocolContext.class.getSimpleName(),
+ "version", version(), false,
+ "features", features.toString(), false
+ );
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
index bf2e112113c..4d3a3feb0e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolVersion.java
@@ -56,7 +56,7 @@ public final class ProtocolVersion implements Comparable<ProtocolVersion> {
private final short patch;
/** Constructor. */
- ProtocolVersion(short major, short minor, short patch) {
+ public ProtocolVersion(short major, short minor, short patch) {
this.major = major;
this.minor = minor;
this.patch = patch;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index a97298bfe7d..5d2d38f09c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -48,6 +48,7 @@ import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.ClientReconnectedException;
+import org.apache.ignite.client.events.ConnectionDescription;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
@@ -58,6 +59,7 @@ import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.monitoring.EventListenerDemultiplexer;
import org.apache.ignite.internal.client.thin.io.ClientConnection;
import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
@@ -162,6 +164,12 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Log. */
private final IgniteLogger log;
+ /** */
+ private final EventListenerDemultiplexer eventListener;
+
+ /** */
+ private final ConnectionDescription connDesc;
+
/** Last send operation timestamp. */
private volatile long lastSendMillis;
@@ -172,6 +180,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
log = NullLogger.whenNull(cfg.getLogger());
+ eventListener = cfg.eventListener();
+
for (ClientNotificationType type : ClientNotificationType.values()) {
if (type.keepNotificationsWithoutListener())
pendingNotifications[type.ordinal()] = new ConcurrentHashMap<>();
@@ -189,6 +199,10 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
+ assert protocolCtx != null : "Protocol context after handshake is null";
+
+ connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId);
+
heartbeatTimer = protocolCtx.isFeatureSupported(HEARTBEAT) && cfg.getHeartbeatEnabled()
? initHeartbeat(cfg.getHeartbeatInterval())
: null;
@@ -219,6 +233,11 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
*/
private void close(Exception cause) {
if (closed.compareAndSet(false, true)) {
+ // Skip notification if handshake is not successful or completed.
+ ConnectionDescription connDesc0 = connDesc;
+ if (connDesc0 != null)
+ eventListener.onConnectionClosed(connDesc0, cause);
+
if (heartbeatTimer != null)
heartbeatTimer.cancel();
@@ -279,17 +298,25 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChannel> payloadWriter)
throws ClientException {
long id = reqId.getAndIncrement();
+ long startTimeNanos = System.nanoTime();
PayloadOutputChannel payloadCh = new PayloadOutputChannel(this);
try {
- if (closed())
- throw new ClientConnectionException("Channel is closed");
+ if (closed()) {
+ ClientConnectionException err = new ClientConnectionException("Channel is closed");
- ClientRequestFuture fut = new ClientRequestFuture();
+ eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+ throw err;
+ }
+
+ ClientRequestFuture fut = new ClientRequestFuture(id, op, startTimeNanos);
pendingReqs.put(id, fut);
+ eventListener.onRequestStart(connDesc, id, op.code(), op.name());
+
BinaryOutputStream req = payloadCh.out();
req.writeInt(0); // Reserve an integer for the request size.
@@ -311,6 +338,8 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
// Potential double-close is handled in PayloadOutputChannel.
payloadCh.close();
+ eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, t);
+
throw t;
}
}
@@ -322,18 +351,29 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
*/
private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader)
throws ClientException {
+ long requestId = pendingReq.requestId;
+ ClientOperation op = pendingReq.operation;
+ long startTimeNanos = pendingReq.startTimeNanos;
+
try {
ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
- if (payload == null || payloadReader == null)
- return null;
+ T res = null;
+ if (payload != null && payloadReader != null)
+ res = payloadReader.apply(new PayloadInputChannel(this, payload));
+
+ eventListener.onRequestSuccess(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos);
- return payloadReader.apply(new PayloadInputChannel(this, payload));
+ return res;
}
catch (IgniteCheckedException e) {
log.warning("Failed to process response: " + e.getMessage(), e);
- throw convertException(e);
+ RuntimeException err = convertException(e);
+
+ eventListener.onRequestFail(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+ throw err;
}
}
@@ -346,22 +386,30 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
*/
private <T> CompletableFuture<T> receiveAsync(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader) {
CompletableFuture<T> fut = new CompletableFuture<>();
+ long requestId = pendingReq.requestId;
+ ClientOperation op = pendingReq.operation;
+ long startTimeNanos = pendingReq.startTimeNanos;
pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() -> {
try {
ByteBuffer payload = payloadFut.get();
- if (payload == null || payloadReader == null)
- fut.complete(null);
- else {
- T res = payloadReader.apply(new PayloadInputChannel(this, payload));
- fut.complete(res);
- }
+ T res = null;
+ if (payload != null && payloadReader != null)
+ res = payloadReader.apply(new PayloadInputChannel(this, payload));
+
+ eventListener.onRequestSuccess(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos);
+
+ fut.complete(res);
}
catch (Throwable t) {
log.warning("Failed to process response: " + t.getMessage(), t);
- fut.completeExceptionally(convertException(t));
+ RuntimeException err = convertException(t);
+
+ eventListener.onRequestFail(connDesc, requestId, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
+
+ fut.completeExceptionally(err);
}
}));
@@ -601,17 +649,114 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Client handshake. */
private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
- ClientRequestFuture fut = new ClientRequestFuture();
- pendingReqs.put(-1L, fut);
+ long requestId = -1L;
+ long startTime = System.nanoTime();
- handshakeReq(ver, user, pwd, userAttrs);
+ eventListener.onHandshakeStart(new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+ new ProtocolContext(ver).toString(), null));
- try {
- ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
- handshakeRes(res, ver, user, pwd, userAttrs);
- }
- catch (IgniteCheckedException e) {
- throw new ClientConnectionException(e.getMessage(), e);
+ while (true) {
+ ClientRequestFuture fut = new ClientRequestFuture(requestId, ClientOperation.HANDSHAKE);
+
+ pendingReqs.put(requestId, fut);
+
+ handshakeReq(ver, user, pwd, userAttrs);
+
+ try {
+ ByteBuffer buf = timeout > 0 ? fut.get(timeout) : fut.get();
+
+ BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
+
+ try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
+ boolean success = res.readBoolean();
+
+ if (success) {
+ byte[] features = EMPTY_BYTES;
+
+ if (ProtocolContext.isFeatureSupported(ver, BITMAP_FEATURES))
+ features = reader.readByteArray();
+
+ protocolCtx = new ProtocolContext(ver, ProtocolBitmaskFeature.enumSet(features));
+
+ if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
+ // Reading server UUID
+ srvNodeId = reader.readUuid();
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Handshake succeeded [protocolVersion=" + protocolCtx.version() + ", srvNodeId=" + srvNodeId + ']');
+
+ eventListener.onHandshakeSuccess(
+ new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), protocolCtx.toString(), srvNodeId),
+ System.nanoTime() - startTime
+ );
+
+ break;
+ }
+ else {
+ ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
+
+ String err = reader.readString();
+ int errCode = ClientStatus.FAILED;
+
+ if (res.remaining() > 0)
+ errCode = reader.readInt();
+
+ if (log.isDebugEnabled())
+ log.debug("Handshake failed [protocolVersion=" + srvVer + ", err=" + err + ", errCode=" + errCode + ']');
+
+ RuntimeException resultErr = null;
+ if (errCode == ClientStatus.AUTH_FAILED)
+ resultErr = new ClientAuthenticationException(err);
+ else if (ver.equals(srvVer))
+ resultErr = new ClientProtocolError(err);
+ else if (!supportedVers.contains(srvVer) ||
+ (!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user))) {
+ // Server version is not supported by this client OR server version is less than 1.1.0 supporting
+ // authentication and authentication is required.
+ resultErr = new ClientProtocolError(String.format(
+ "Protocol version mismatch: client %s / server %s. Server details: %s",
+ ver,
+ srvVer,
+ err
+ ));
+ }
+
+ if (resultErr != null) {
+ ConnectionDescription connDesc = new ConnectionDescription(sock.localAddress(), sock.remoteAddress(),
+ new ProtocolContext(ver).toString(), null);
+
+ long elapsedNanos = System.nanoTime() - startTime;
+
+ eventListener.onHandshakeFail(connDesc, elapsedNanos, resultErr);
+
+ throw resultErr;
+ }
+ else {
+ // Retry with server version.
+ if (log.isDebugEnabled())
+ log.debug("Retrying handshake with server version [protocolVersion=" + srvVer + ']');
+
+ ver = srvVer;
+ }
+ }
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ ClientException err;
+ if (e instanceof IOException)
+ err = handleIOError((IOException)e);
+ else
+ err = new ClientConnectionException(e.getMessage(), e);
+
+ eventListener.onHandshakeFail(
+ new ConnectionDescription(sock.localAddress(), sock.remoteAddress(), new ProtocolContext(ver).toString(), null),
+ System.nanoTime() - startTime,
+ err
+ );
+
+ throw err;
+ }
}
}
@@ -665,70 +810,6 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
return new ProtocolContext(ver, features);
}
- /** Receive and handle handshake response. */
- private void handshakeRes(ByteBuffer buf, ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
- throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
- BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
-
- try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
- boolean success = res.readBoolean();
-
- if (success) {
- byte[] features = EMPTY_BYTES;
-
- if (ProtocolContext.isFeatureSupported(proposedVer, BITMAP_FEATURES))
- features = reader.readByteArray();
-
- protocolCtx = new ProtocolContext(proposedVer, ProtocolBitmaskFeature.enumSet(features));
-
- if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
- // Reading server UUID
- srvNodeId = reader.readUuid();
- }
-
- if (log.isDebugEnabled())
- log.debug("Handshake succeeded [protocolVersion=" + protocolCtx.version() + ", srvNodeId=" + srvNodeId + ']');
- }
- else {
- ProtocolVersion srvVer = new ProtocolVersion(res.readShort(), res.readShort(), res.readShort());
-
- String err = reader.readString();
- int errCode = ClientStatus.FAILED;
-
- if (res.remaining() > 0)
- errCode = reader.readInt();
-
- if (log.isDebugEnabled())
- log.debug("Handshake failed [protocolVersion=" + srvVer + ", err=" + err + ", errCode=" + errCode + ']');
-
- if (errCode == ClientStatus.AUTH_FAILED)
- throw new ClientAuthenticationException(err);
- else if (proposedVer.equals(srvVer))
- throw new ClientProtocolError(err);
- else if (!supportedVers.contains(srvVer) ||
- (!ProtocolContext.isFeatureSupported(srvVer, AUTHORIZATION) && !F.isEmpty(user)))
- // Server version is not supported by this client OR server version is less than 1.1.0 supporting
- // authentication and authentication is required.
- throw new ClientProtocolError(String.format(
- "Protocol version mismatch: client %s / server %s. Server details: %s",
- proposedVer,
- srvVer,
- err
- ));
- else {
- // Retry with server version.
- if (log.isDebugEnabled())
- log.debug("Retrying handshake with server version [protocolVersion=" + srvVer + ']');
-
- handshake(srvVer, user, pwd, userAttrs);
- }
- }
- }
- catch (IOException e) {
- throw handleIOError(e);
- }
- }
-
/** Write bytes to the output stream. */
private void write(byte[] bytes, int len, @Nullable Runnable onDone) throws ClientConnectionException {
ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
@@ -804,10 +885,28 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
return res;
}
- /**
- *
- */
+ /** */
private static class ClientRequestFuture extends GridFutureAdapter<ByteBuffer> {
+ /** */
+ final long startTimeNanos;
+
+ /** */
+ final long requestId;
+
+ /** */
+ final ClientOperation operation;
+
+ /** */
+ ClientRequestFuture(long requestId, ClientOperation op) {
+ this(requestId, op, System.nanoTime());
+ }
+
+ /** */
+ ClientRequestFuture(long requestId, ClientOperation op, long startTimeNanos) {
+ this.requestId = requestId;
+ operation = op;
+ this.startTimeNanos = startTimeNanos;
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
index a31302b5f26..8bd01bbc6c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.thin.io;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
@@ -34,6 +35,20 @@ public interface ClientConnection extends AutoCloseable {
*/
void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException;
+ /**
+ * Gets local address of this session.
+ *
+ * @return Local network address or {@code null} if non-socket communication is used.
+ */
+ @Nullable InetSocketAddress localAddress();
+
+ /**
+ * Gets address of remote peer on this session.
+ *
+ * @return Address of remote peer or {@code null} if non-socket communication is used.
+ */
+ @Nullable InetSocketAddress remoteAddress();
+
/**
* Closes the connection.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
index 5a87444cbcf..fe2e894acde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.thin.io.gridnioserver;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
@@ -62,6 +63,16 @@ class GridNioClientConnection implements ClientConnection {
ses.addMeta(SES_META_CONN, this);
}
+ /** {@inheritDoc} */
+ @Override @Nullable public InetSocketAddress localAddress() {
+ return ses.localAddress();
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public InetSocketAddress remoteAddress() {
+ return ses.remoteAddress();
+ }
+
/** {@inheritDoc} */
@Override public void send(ByteBuffer msg, @Nullable Runnable onDone) throws IgniteCheckedException {
if (onDone != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
index 439c78a72c3..bb20ca63259 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Client message parser.
*/
-class GridNioClientParser implements GridNioParser {
+public class GridNioClientParser implements GridNioParser {
/** */
private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey();
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 56b4b648958..1a077004b8f 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -349,7 +349,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCnt = 19;
+ long expectedNullCnt = 20;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java
new file mode 100644
index 00000000000..546506325f8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/FakeIgniteServer.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.EnumSet;
+import java.util.UUID;
+import java.util.function.Consumer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.client.thin.ProtocolContext;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.internal.client.thin.ProtocolVersionFeature;
+import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientParser;
+import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
+import org.apache.ignite.internal.processors.platform.client.ClientStatus;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.apache.ignite.testframework.junits.JUnitAssertAware;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A fake ignite server for testing handshake and connection errors handling on the thin client side.
+ */
+public class FakeIgniteServer extends JUnitAssertAware implements GridNioServerListener<ByteBuffer>, AutoCloseable {
+ /** */
+ static final int HANDSHAKE_PERFORMED = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ public static final byte[] EMPTY_BYTES = new byte[0];
+
+ /** */
+ private final GridNioServer<ByteBuffer> srv;
+
+ /** */
+ private final EnumSet<ErrorType> errorTypes;
+
+ /** */
+ private final ProtocolVersion protoVer;
+
+ /** */
+ private final UUID nodeId = UUID.randomUUID();
+
+ /** */
+ public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger) {
+ this(addr, port, logger, null, null);
+ }
+
+ /** */
+ public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, EnumSet<ErrorType> errorTypes) {
+ this(addr, port, logger, null, errorTypes);
+ }
+
+ /** */
+ public FakeIgniteServer(InetAddress addr, int port, IgniteLogger logger, ProtocolVersion protoVer) {
+ this(addr, port, logger, protoVer, null);
+ }
+
+ /** */
+ public FakeIgniteServer(
+ InetAddress addr,
+ int port,
+ IgniteLogger logger,
+ ProtocolVersion protoVer,
+ EnumSet<ErrorType> errorTypes
+ ) {
+ this.protoVer = protoVer != null ? protoVer : ProtocolVersion.V1_7_0;
+
+ this.errorTypes = errorTypes != null ? errorTypes : EnumSet.noneOf(ErrorType.class);
+
+ try {
+ srv = GridNioServer.<ByteBuffer>builder()
+ .address(addr)
+ .port(port)
+ .listener(this)
+ .logger(logger)
+ .selectorCount(1)
+ .byteOrder(ByteOrder.LITTLE_ENDIAN)
+ .directBuffer(true)
+ .directMode(false)
+ .filters(
+ new GridNioCodecFilter(new GridNioClientParser(), logger, false)
+ )
+ .build();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to initialize fake server", e);
+ }
+ }
+
+ /** */
+ public void start() throws IgniteCheckedException {
+ srv.start();
+ }
+
+ /** */
+ public void stop() {
+ srv.stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws Exception {
+ srv.stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onConnected(GridNioSession ses) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(GridNioSession ses, ByteBuffer msg) {
+ if (ses.meta(HANDSHAKE_PERFORMED) == null) {
+ if (errorTypes.contains(ErrorType.HANDSHAKE_CONNECTION_ERROR)) {
+ ses.close();
+
+ return;
+ }
+
+ BinaryInputStream res = BinaryByteBufferInputStream.create(msg);
+ try (BinaryReaderExImpl reader = new BinaryReaderExImpl(null, res, null, null, true, true)) {
+ byte reqType = reader.readByte();
+
+ assertEquals(ClientListenerRequest.HANDSHAKE, reqType);
+
+ ProtocolVersion clientVer = new ProtocolVersion(reader.readShort(), reader.readShort(), reader.readShort());
+
+ ByteBuffer response = createMessage(writer -> {
+ if (errorTypes.contains(ErrorType.HANDSHAKE_ERROR) || errorTypes.contains(ErrorType.AUTHENTICATION_ERROR)
+ || protoVer.compareTo(clientVer) != 0) {
+ writer.writeBoolean(false);
+
+ writer.writeShort(protoVer.major());
+ writer.writeShort(protoVer.minor());
+ writer.writeShort(protoVer.patch());
+
+ if (errorTypes.contains(ErrorType.AUTHENTICATION_ERROR)) {
+ writer.writeString("Authentication failed");
+ writer.writeInt(ClientStatus.AUTH_FAILED);
+ }
+ else {
+ writer.writeString("Handshake failed");
+ writer.writeInt(ClientStatus.FAILED);
+ }
+ }
+ else {
+ writer.writeBoolean(true);
+
+ if (ProtocolContext.isFeatureSupported(protoVer, ProtocolVersionFeature.BITMAP_FEATURES))
+ writer.writeByteArray(EMPTY_BYTES);
+
+ if (ProtocolContext.isFeatureSupported(protoVer, ProtocolVersionFeature.PARTITION_AWARENESS))
+ writer.writeUuid(nodeId);
+
+ ses.addMeta(HANDSHAKE_PERFORMED, true);
+ }
+ });
+
+ ses.send(response);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ else {
+ if (errorTypes.contains(ErrorType.CONNECTION_ERROR))
+ ses.close();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionWriteTimeout(GridNioSession ses) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionIdleTimeout(GridNioSession ses) {
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+
+ }
+
+ /** */
+ private ByteBuffer createMessage(Consumer<BinaryRawWriter> writerAction) {
+ try (BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(32), null, null)) {
+ writer.writeInt(0);
+
+ writerAction.accept(writer);
+
+ writer.out().writeInt(0, writer.out().position() - 4); // actual size
+
+ return ByteBuffer.wrap(writer.out().arrayCopy(), 0, writer.out().position());
+ }
+ }
+
+ /**
+ * @return Fake node id.
+ */
+ public UUID nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Injected error types.
+ */
+ public enum ErrorType {
+ /** Generic handshake error. */
+ HANDSHAKE_ERROR,
+
+ /** Connection error on handshake. */
+ HANDSHAKE_CONNECTION_ERROR,
+
+ /** Authentication error. */
+ AUTHENTICATION_ERROR,
+
+ /** Connection error. */
+ CONNECTION_ERROR
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java
new file mode 100644
index 00000000000..8712b6985d3
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientConnectionEventListenerTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.EnumSet;
+import java.util.EventListener;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.ConnectionClosedEvent;
+import org.apache.ignite.client.events.ConnectionEvent;
+import org.apache.ignite.client.events.ConnectionEventListener;
+import org.apache.ignite.client.events.HandshakeFailEvent;
+import org.apache.ignite.client.events.HandshakeStartEvent;
+import org.apache.ignite.client.events.HandshakeSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ProtocolVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Tests connection event listeners of a thin client.
+ */
+public class IgniteClientConnectionEventListenerTest extends GridCommonAbstractTest {
+ /** */
+ private static final InetAddress LOCALHOST;
+
+ /** */
+ private static final int SRV_PORT = 10800;
+
+ static {
+ try {
+ LOCALHOST = InetAddress.getByName("127.0.0.1");
+ }
+ catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** */
+ @Test
+ public void testBasic() throws Exception {
+ ProtocolVersion srvVer = ProtocolVersion.V1_6_0;
+ try (FakeIgniteServer srv = new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), srvVer)) {
+ srv.start();
+
+ Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+ ConnectionEventListener lsnr = new ConnectionEventListener() {
+ @Override public void onHandshakeStart(HandshakeStartEvent event) {
+ evSet.put(event.getClass(), event);
+ }
+
+ @Override public void onHandshakeSuccess(HandshakeSuccessEvent event) {
+ evSet.put(event.getClass(), event);
+ }
+
+ @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+ evSet.put(event.getClass(), event);
+ }
+ };
+
+ long startNano = System.nanoTime();
+ try (IgniteClient ignored = startClient(lsnr)) {
+ GridTestUtils.waitForCondition(() -> evSet.size() == 2, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+ HandshakeStartEvent hsStartEv = (HandshakeStartEvent)evSet.get(HandshakeStartEvent.class);
+
+ assertEquals(hsStartEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.LATEST_VER
+ + ", features=[]]");
+ assertEquals(LOCALHOST, hsStartEv.connectionDescription().remoteAddress().getAddress());
+ assertEquals(SRV_PORT, hsStartEv.connectionDescription().remoteAddress().getPort());
+ assertEquals(LOCALHOST, hsStartEv.connectionDescription().localAddress().getAddress());
+ assertEquals(null, hsStartEv.connectionDescription().serverNodeId());
+
+ HandshakeSuccessEvent hsSuccEv = (HandshakeSuccessEvent)evSet.get(HandshakeSuccessEvent.class);
+
+ assertEquals(hsSuccEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+ assertEquals(LOCALHOST, hsSuccEv.connectionDescription().remoteAddress().getAddress());
+ assertEquals(SRV_PORT, hsSuccEv.connectionDescription().remoteAddress().getPort());
+ assertEquals(LOCALHOST, hsSuccEv.connectionDescription().localAddress().getAddress());
+ assertEquals(srv.nodeId(), hsSuccEv.connectionDescription().serverNodeId());
+ assertTrue(System.nanoTime() - startNano >= hsSuccEv.elapsedTime(TimeUnit.NANOSECONDS));
+ }
+
+ GridTestUtils.waitForCondition(() -> evSet.size() == 3, GridTestUtils.DFLT_TEST_TIMEOUT);
+
+ ConnectionClosedEvent closedEv = (ConnectionClosedEvent)evSet.get(ConnectionClosedEvent.class);
+
+ assertEquals(closedEv.connectionDescription().protocol(), "ProtocolContext [version=" + srvVer + ", features=[]]");
+ assertEquals(LOCALHOST, closedEv.connectionDescription().remoteAddress().getAddress());
+ assertEquals(SRV_PORT, closedEv.connectionDescription().remoteAddress().getPort());
+ assertEquals(LOCALHOST, closedEv.connectionDescription().localAddress().getAddress());
+ assertEquals(srv.nodeId(), closedEv.connectionDescription().serverNodeId());
+ }
+ }
+
+ /** */
+ @Test
+ public void testUnsupportedProtocolFail() {
+ ProtocolVersion unsupportedProto = new ProtocolVersion((short)255, (short)0, (short)0);
+ assertTrue(unsupportedProto.compareTo(ProtocolVersion.LATEST_VER) > 0);
+
+ long startNano = System.nanoTime();
+ testFail(
+ () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), unsupportedProto),
+ (HandshakeFailEvent event, Throwable hsErr) -> {
+ assertTrue(System.nanoTime() - startNano >= event.elapsedTime(TimeUnit.NANOSECONDS));
+ assertEquals(hsErr, event.throwable());
+ },
+ HandshakeFailEvent.class
+ );
+ }
+
+ /** */
+ @Test
+ public void testHandshakeFail() {
+ Stream.of(FakeIgniteServer.ErrorType.HANDSHAKE_CONNECTION_ERROR, FakeIgniteServer.ErrorType.HANDSHAKE_ERROR,
+ FakeIgniteServer.ErrorType.AUTHENTICATION_ERROR).forEach(errType -> {
+ AtomicLong startNano = new AtomicLong(System.nanoTime());
+ testFail(
+ () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(errType)),
+ (HandshakeFailEvent event, Throwable hsErr) -> {
+ assertTrue(System.nanoTime() - startNano.get() >= event.elapsedTime(TimeUnit.NANOSECONDS));
+ assertEquals(hsErr, event.throwable());
+ },
+ HandshakeFailEvent.class
+ );
+ });
+ }
+
+ /** */
+ @Test
+ public void testConnectionLost() {
+ testFail(
+ () -> new FakeIgniteServer(LOCALHOST, SRV_PORT, log(), EnumSet.of(FakeIgniteServer.ErrorType.CONNECTION_ERROR)),
+ IgniteClient::cacheNames,
+ (ev, t) -> {},
+ ConnectionClosedEvent.class
+ );
+ }
+
+ /** */
+ private <Event extends ConnectionEvent> void testFail(
+ Supplier<FakeIgniteServer> srvFactory,
+ BiConsumer<Event, Throwable> checkEventAction,
+ Class<Event> eventCls
+ ) {
+ testFail(srvFactory, client -> fail(), checkEventAction, eventCls);
+ }
+
+ /** */
+ private <Event extends ConnectionEvent> void testFail(
+ Supplier<FakeIgniteServer> srvFactory,
+ Consumer<IgniteClient> clientAction,
+ BiConsumer<Event, Throwable> checkEventAction,
+ Class<Event> eventCls
+ ) {
+ try (FakeIgniteServer srv = srvFactory.get()) {
+ srv.start();
+
+ Throwable hsErr = null;
+ Map<Class<? extends ConnectionEvent>, ConnectionEvent> evSet = new ConcurrentHashMap<>();
+ ConnectionEventListener lsnr = new ConnectionEventListener() {
+ @Override public void onConnectionClosed(ConnectionClosedEvent event) {
+ evSet.put(event.getClass(), event);
+ }
+
+ @Override public void onHandshakeFail(HandshakeFailEvent event) {
+ evSet.put(event.getClass(), event);
+ }
+ };
+
+ try (IgniteClient cli = startClient(lsnr)) {
+ clientAction.accept(cli);
+ }
+ catch (Throwable e) {
+ hsErr = e;
+ }
+
+ GridTestUtils.waitForCondition(() -> !evSet.isEmpty(), GridTestUtils.DFLT_TEST_TIMEOUT);
+ assertEquals(1, evSet.size());
+
+ Event failEv = (Event)evSet.get(eventCls);
+
+ assertNotNull(failEv);
+ assertEquals(failEv.connectionDescription().protocol(), "ProtocolContext [version=" + ProtocolVersion.V1_7_0
+ + ", features=[]]");
+ assertEquals(LOCALHOST, failEv.connectionDescription().remoteAddress().getAddress());
+ assertEquals(SRV_PORT, failEv.connectionDescription().remoteAddress().getPort());
+ assertEquals(LOCALHOST, failEv.connectionDescription().localAddress().getAddress());
+
+ if (failEv.connectionDescription().serverNodeId() != null)
+ assertEquals(srv.nodeId(), failEv.connectionDescription().serverNodeId());
+
+ checkEventAction.accept(failEv, hsErr);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Failed event test", e);
+ }
+ }
+
+ /** */
+ private IgniteClient startClient(EventListener... listeners) {
+ String addr = LOCALHOST.getHostName() + ":" + SRV_PORT;
+
+ return Ignition.startClient(new ClientConfiguration().setAddresses(addr).setEventListeners(listeners));
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
new file mode 100644
index 00000000000..1e704752b64
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/events/IgniteClientRequestEventListenerTest.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.events;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.events.ConnectionDescription;
+import org.apache.ignite.client.events.RequestEvent;
+import org.apache.ignite.client.events.RequestEventListener;
+import org.apache.ignite.client.events.RequestFailEvent;
+import org.apache.ignite.client.events.RequestStartEvent;
+import org.apache.ignite.client.events.RequestSuccessEvent;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.AbstractThinClientTest;
+import org.apache.ignite.internal.client.thin.ClientOperation;
+import org.junit.Test;
+
+/**
+ * Tests query event listeners of a thin client.
+ */
+public class IgniteClientRequestEventListenerTest extends AbstractThinClientTest {
+ /** */
+ Map<Class<? extends RequestEvent>, RequestEvent> evSet = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrids(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+
+ super.afterTestsStopped();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected ClientConfiguration getClientConfiguration() {
+ return super.getClientConfiguration()
+ .setEventListeners(new RequestEventListener() {
+ @Override public void onRequestStart(RequestStartEvent event) {
+ if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ evSet.put(event.getClass(), event);
+ }
+
+ @Override public void onRequestSuccess(RequestSuccessEvent event) {
+ if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ evSet.put(event.getClass(), event);
+ }
+
+ @Override public void onRequestFail(RequestFailEvent event) {
+ if (event.operationCode() != ClientOperation.GET_BINARY_CONFIGURATION.code())
+ evSet.put(event.getClass(), event);
+ }
+ });
+ }
+
+ /** */
+ @Test
+ public void testQuerySuccessEvents() {
+ long startTime = System.nanoTime();
+ try (IgniteClient cli = startClient(0)) {
+ cli.cacheNames();
+
+ assertEquals(2, evSet.size());
+
+ RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
+
+ assertTrue(startEvent.requestId() >= 0);
+
+ ConnectionDescription connDesc = startEvent.connectionDescription();
+ assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+ assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+ assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+ assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+ assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
+ assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
+
+ RequestSuccessEvent successEvent = (RequestSuccessEvent)evSet.get(RequestSuccessEvent.class);
+ assertEquals(successEvent.requestId(), successEvent.requestId());
+
+ connDesc = startEvent.connectionDescription();
+ assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+ assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+ assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+ assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+ assertEquals(ClientOperation.CACHE_GET_NAMES.code(), startEvent.operationCode());
+ assertEquals(ClientOperation.CACHE_GET_NAMES.name(), startEvent.operationName());
+
+ assertTrue(System.nanoTime() - startTime >= successEvent.elapsedTime(TimeUnit.NANOSECONDS));
+
+ }
+ }
+
+ /** */
+ @Test
+ public void testQueryFailEvents() {
+ long startTime = System.nanoTime();
+ try (IgniteClient cli = startClient(0)) {
+ cli.cache("non-existent").put(1, 1);
+
+ fail();
+ }
+ catch (ClientException err) {
+ assertEquals(2, evSet.size());
+
+ RequestStartEvent startEvent = (RequestStartEvent)evSet.get(RequestStartEvent.class);
+
+ assertTrue(startEvent.requestId() >= 0);
+
+ ConnectionDescription connDesc = startEvent.connectionDescription();
+ assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+ assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+ assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+ assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+ assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
+ assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
+
+ RequestFailEvent failEvent = (RequestFailEvent)evSet.get(RequestFailEvent.class);
+ assertEquals(failEvent.requestId(), failEvent.requestId());
+
+ connDesc = startEvent.connectionDescription();
+ assertEquals(clientHost(grid(0).localNode()), connDesc.remoteAddress().getAddress().getHostAddress());
+ assertEquals(clientPort(grid(0).localNode()), connDesc.remoteAddress().getPort());
+ assertEquals(clientHost(grid(0).localNode()), connDesc.localAddress().getAddress().getHostAddress());
+ assertEquals(grid(0).localNode().id(), connDesc.serverNodeId());
+ assertEquals(ClientOperation.CACHE_PUT.code(), startEvent.operationCode());
+ assertEquals(ClientOperation.CACHE_PUT.name(), startEvent.operationName());
+
+ assertEquals(err, failEvent.throwable());
+
+ assertTrue(System.nanoTime() - startTime >= failEvent.elapsedTime(TimeUnit.NANOSECONDS));
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 83b6f0fa86d..a6f6ae7ee76 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -37,6 +37,8 @@ import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessResour
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessStableTopologyTest;
import org.apache.ignite.internal.client.thin.ThinClientPartitionAwarenessUnstableTopologyTest;
import org.apache.ignite.internal.client.thin.TimeoutTest;
+import org.apache.ignite.internal.client.thin.events.IgniteClientConnectionEventListenerTest;
+import org.apache.ignite.internal.client.thin.events.IgniteClientRequestEventListenerTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -80,7 +82,9 @@ import org.junit.runners.Suite;
BinaryConfigurationTest.class,
IgniteSetTest.class,
DataReplicationOperationsTest.class,
- MetadataRegistrationTest.class
+ MetadataRegistrationTest.class,
+ IgniteClientConnectionEventListenerTest.class,
+ IgniteClientRequestEventListenerTest.class
})
public class ClientTestSuite {
// No-op.