You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2020/12/02 16:39:53 UTC
[ignite] branch master updated: IGNITE-13496 Java thin: make async
API non-blocking with GridNioServer
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 338165a IGNITE-13496 Java thin: make async API non-blocking with GridNioServer
338165a is described below
commit 338165afadd3b6979b4655ee2f03f3b9c2228236
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Wed Dec 2 19:39:25 2020 +0300
IGNITE-13496 Java thin: make async API non-blocking with GridNioServer
Refactor Java Thin Client to use GridNioServer in client mode:
* Client threads are never blocked
* Single worker thread is shared across all connections within `IgniteClient`
Benchmark results (i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275):
Before
Benchmark Mode Cnt Score Error Units
JmhThinClientCacheBenchmark.get thrpt 10 65916.805 ± 2118.954 ops/s
JmhThinClientCacheBenchmark.put thrpt 10 62304.444 ± 2521.371 ops/s
After
Benchmark Mode Cnt Score Error Units
JmhThinClientCacheBenchmark.get thrpt 10 92501.557 ± 1380.384 ops/s
JmhThinClientCacheBenchmark.put thrpt 10 82907.446 ± 7572.537 ops/s
---
.../jmh/thin/JmhThinClientAbstractBenchmark.java | 135 ++++
.../jmh/thin/JmhThinClientCacheBenchmark.java | 81 +++
.../streams/BinaryByteBufferInputStream.java | 91 +--
.../internal/client/thin/ClientComputeImpl.java | 7 +-
.../internal/client/thin/ClientSslUtils.java | 293 +++++++++
.../internal/client/thin/NotificationListener.java | 4 +-
.../internal/client/thin/PayloadInputChannel.java | 8 +-
.../internal/client/thin/ReliableChannel.java | 63 +-
.../internal/client/thin/TcpClientChannel.java | 679 +++------------------
.../internal/client/thin/TcpIgniteClient.java | 27 +-
.../ClientConnection.java} | 25 +-
.../thin/io/ClientConnectionMultiplexer.java | 52 ++
.../ClientConnectionStateHandler.java} | 19 +-
.../client/thin/io/ClientMessageDecoder.java | 92 +++
.../ClientMessageHandler.java} | 19 +-
.../io/gridnioserver/GridNioClientConnection.java | 93 +++
.../GridNioClientConnectionMultiplexer.java | 147 +++++
.../io/gridnioserver/GridNioClientListener.java | 73 +++
.../thin/io/gridnioserver/GridNioClientParser.java | 59 ++
.../ignite/client/ConnectToStartingNodeTest.java | 18 +-
.../apache/ignite/client/SslParametersTest.java | 4 +-
.../internal/client/thin/ReliableChannelTest.java | 9 +-
.../ThinClientAbstractPartitionAwarenessTest.java | 9 +-
...lientPartitionAwarenessResourceReleaseTest.java | 14 +-
24 files changed, 1228 insertions(+), 793 deletions(-)
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
new file mode 100644
index 0000000..6b6dc53
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientAbstractBenchmark.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.stream.IntStream;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+
+/**
+ * Base class for thin client benchmarks.
+ */
+@State(Scope.Benchmark)
+public abstract class JmhThinClientAbstractBenchmark extends JmhAbstractBenchmark {
+ /** Property: nodes count. */
+ protected static final String PROP_DATA_NODES = "ignite.jmh.thin.dataNodes";
+
+ /** Default amount of nodes. */
+ protected static final int DFLT_DATA_NODES = 4;
+
+ /** Items count. */
+ protected static final int CNT = 1000;
+
+ /** Cache value. */
+ protected static final byte[] PAYLOAD = new byte[1000];
+
+ /** IP finder shared across nodes. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Default cache name. */
+ private static final String DEFAULT_CACHE_NAME = "default";
+
+ /** Target node. */
+ protected Ignite node;
+
+ /** Target cache. */
+ protected ClientCache<Integer, byte[]> cache;
+
+ /** Thin client. */
+ protected IgniteClient client;
+
+ /**
+ * Setup routine. Child classes must invoke this method first.
+ *
+ */
+ @Setup
+ public void setup() {
+ System.out.println();
+ System.out.println("--------------------");
+ System.out.println("IGNITE BENCHMARK INFO: ");
+ System.out.println("\tdata nodes: " + intProperty(PROP_DATA_NODES, DFLT_DATA_NODES));
+ System.out.println("--------------------");
+ System.out.println();
+
+ int nodesCnt = intProperty(PROP_DATA_NODES, DFLT_DATA_NODES);
+
+ A.ensure(nodesCnt >= 1, "nodesCnt >= 1");
+
+ node = Ignition.start(configuration("node0"));
+
+ for (int i = 1; i < nodesCnt; i++)
+ Ignition.start(configuration("node" + i));
+
+ String[] addrs = IntStream
+ .range(10800, 10800 + nodesCnt)
+ .mapToObj(p -> "127.0.0.1:" + p)
+ .toArray(String[]::new);
+
+ ClientConfiguration cfg = new ClientConfiguration()
+ .setAddresses(addrs)
+ .setPartitionAwarenessEnabled(true);
+
+ client = Ignition.startClient(cfg);
+
+ cache = client.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ System.out.println("Loading test data...");
+
+ for (int i = 0; i < CNT; i++)
+ cache.put(i, PAYLOAD);
+
+ System.out.println("Test data loaded: " + CNT);
+ }
+
+ /**
+ * Tear down routine.
+ *
+ */
+ @TearDown
+ public void tearDown() throws Exception {
+ client.close();
+ Ignition.stopAll(true);
+ }
+
+ /**
+ * Create Ignite configuration.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @return Configuration.
+ */
+ protected IgniteConfiguration configuration(String igniteInstanceName) {
+
+ return new IgniteConfiguration()
+ .setIgniteInstanceName(igniteInstanceName)
+ .setLocalHost("127.0.0.1")
+ .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+ }
+}
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
new file mode 100644
index 0000000..88e6a87
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/thin/JmhThinClientCacheBenchmark.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.thin;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Mode;
+
+/**
+ * Thin client cache benchmark.
+ *
+ * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275:
+ * Benchmark Mode Cnt Score Error Units
+ * JmhThinClientCacheBenchmark.get thrpt 10 92501.557 ± 1380.384 ops/s
+ * JmhThinClientCacheBenchmark.put thrpt 10 82907.446 ± 7572.537 ops/s
+ *
+ * JmhThinClientCacheBenchmark.get avgt 10 41.505 ± 1.018 us/op
+ * JmhThinClientCacheBenchmark.put avgt 10 44.623 ± 0.779 us/op
+ */
+public class JmhThinClientCacheBenchmark extends JmhThinClientAbstractBenchmark {
+ /**
+ * Cache put benchmark.
+ */
+ @Benchmark
+ public void put() {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ cache.put(key, PAYLOAD);
+ }
+
+ /**
+ * Cache get benchmark.
+ */
+ @Benchmark
+ public Object get() {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ return cache.get(key);
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Arguments.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ JmhIdeBenchmarkRunner runner = JmhIdeBenchmarkRunner.create()
+ .forks(1)
+ .threads(4)
+ .benchmarks(JmhThinClientCacheBenchmark.class.getSimpleName())
+ .jvmArguments("-Xms4g", "-Xmx4g");
+
+ runner
+ .benchmarkModes(Mode.Throughput)
+ .run();
+
+ runner
+ .benchmarkModes(Mode.AverageTime)
+ .outputTimeUnit(TimeUnit.MICROSECONDS)
+ .run();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
index d277948..fe138e6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryByteBufferInputStream.java
@@ -18,14 +18,14 @@
package org.apache.ignite.internal.binary.streams;
import java.nio.ByteBuffer;
-import org.apache.ignite.binary.BinaryObjectException;
+import java.util.Arrays;
/**
- *
+ * Input stream over {@link ByteBuffer}.
*/
public class BinaryByteBufferInputStream implements BinaryInputStream {
/** */
- private ByteBuffer buf;
+ private final ByteBuffer buf;
/**
* @param buf Buffer to wrap.
@@ -44,15 +44,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public byte readByte() {
- ensureHasData(1);
-
return buf.get();
}
/** {@inheritDoc} */
@Override public byte[] readByteArray(int cnt) {
- ensureHasData(cnt);
-
byte[] data = new byte[cnt];
buf.get(data);
@@ -62,22 +58,16 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public int read(byte[] arr, int off, int cnt) {
- ensureHasData(cnt);
-
return 0;
}
/** {@inheritDoc} */
@Override public boolean readBoolean() {
- ensureHasData(1);
-
- return false;
+ return readByte() == 1;
}
/** {@inheritDoc} */
@Override public boolean[] readBooleanArray(int cnt) {
- ensureHasData(cnt);
-
boolean[] res = new boolean[cnt];
for (int i = 0; i < cnt; i++)
@@ -88,15 +78,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public short readShort() {
- ensureHasData(2);
-
return buf.getShort();
}
/** {@inheritDoc} */
@Override public short[] readShortArray(int cnt) {
- ensureHasData(2 * cnt);
-
short[] res = new short[cnt];
for (int i = 0; i < cnt; i++)
@@ -107,15 +93,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public char readChar() {
- ensureHasData(2);
-
return buf.getChar();
}
/** {@inheritDoc} */
@Override public char[] readCharArray(int cnt) {
- ensureHasData(2 * cnt);
-
char[] res = new char[cnt];
for (int i = 0; i < cnt; i++)
@@ -126,15 +108,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public int readInt() {
- ensureHasData(4);
-
return buf.getInt();
}
/** {@inheritDoc} */
@Override public int[] readIntArray(int cnt) {
- ensureHasData(4 * cnt);
-
int[] res = new int[cnt];
for (int i = 0; i < cnt; i++)
@@ -145,15 +123,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public float readFloat() {
- ensureHasData(4);
-
return buf.getFloat();
}
/** {@inheritDoc} */
@Override public float[] readFloatArray(int cnt) {
- ensureHasData(4 * cnt);
-
float[] res = new float[cnt];
for (int i = 0; i < cnt; i++)
@@ -164,15 +138,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public long readLong() {
- ensureHasData(8);
-
return buf.getLong();
}
/** {@inheritDoc} */
@Override public long[] readLongArray(int cnt) {
- ensureHasData(8 * cnt);
-
long[] res = new long[cnt];
for (int i = 0; i < cnt; i++)
@@ -183,15 +153,11 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public double readDouble() {
- ensureHasData(8);
-
return buf.getDouble();
}
/** {@inheritDoc} */
@Override public double[] readDoubleArray(int cnt) {
- ensureHasData(8 * cnt);
-
double[] res = new double[cnt];
for (int i = 0; i < cnt; i++)
@@ -207,47 +173,17 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public byte readBytePositioned(int pos) {
- int oldPos = buf.position();
-
- buf.position(pos);
-
- ensureHasData(1);
-
- byte res = buf.get();
-
- buf.position(oldPos);
-
- return res;
+ return buf.get(pos);
}
/** {@inheritDoc} */
@Override public short readShortPositioned(int pos) {
- int oldPos = buf.position();
-
- buf.position(pos);
-
- ensureHasData(2);
-
- short res = buf.getShort();
-
- buf.position(oldPos);
-
- return res;
+ return buf.getShort(pos);
}
/** {@inheritDoc} */
@Override public int readIntPositioned(int pos) {
- int oldPos = buf.position();
-
- buf.position(pos);
-
- ensureHasData(4);
-
- byte res = buf.get();
-
- buf.position(oldPos);
-
- return res;
+ return buf.getInt(pos);
}
/** {@inheritDoc} */
@@ -277,7 +213,9 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
/** {@inheritDoc} */
@Override public byte[] arrayCopy() {
- return buf.array();
+ byte[] arr = buf.array();
+
+ return Arrays.copyOf(arr, arr.length);
}
/** {@inheritDoc} */
@@ -289,13 +227,4 @@ public class BinaryByteBufferInputStream implements BinaryInputStream {
@Override public boolean hasArray() {
return false;
}
-
- /**
- * @param cnt Remaining bytes.
- */
- private void ensureHasData(int cnt) {
- if (buf.remaining() < cnt)
- throw new BinaryObjectException("Not enough data to read the value " +
- "[requiredBytes=" + cnt + ", remainingBytes=" + buf.remaining() + ']');
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
index d4cb415..65d1c2d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientComputeImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client.thin;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -40,7 +41,7 @@ import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -353,11 +354,11 @@ class ClientComputeImpl implements ClientCompute, NotificationListener {
ClientChannel ch,
ClientOperation op,
long rsrcId,
- byte[] payload,
+ ByteBuffer payload,
Exception err
) {
if (op == ClientOperation.COMPUTE_TASK_FINISHED) {
- Object res = payload == null ? null : utils.readObject(new BinaryHeapInputStream(payload), false);
+ Object res = payload == null ? null : utils.readObject(BinaryByteBufferInputStream.create(payload), false);
ClientComputeTask<Object> task = addTask(ch, rsrcId);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java
new file mode 100644
index 0000000..4f964d8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientSslUtils.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.function.BiFunction;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+import javax.cache.configuration.Factory;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.ignite.client.SslMode;
+import org.apache.ignite.client.SslProtocol;
+import org.apache.ignite.configuration.ClientConfiguration;
+
+import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
+import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
+
+public class ClientSslUtils {
+ /** */
+ public static final char[] EMPTY_CHARS = new char[0];
+
+ /** Trust manager ignoring all certificate checks. */
+ private static final TrustManager ignoreErrorsTrustMgr = new X509TrustManager() {
+ /** */
+ @Override public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+
+ /** */
+ @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) {
+ // No-op.
+ }
+
+ /** */
+ @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) {
+ // No-op.
+ }
+ };
+
+ /**
+ * Gets SSL context for the given client configuration.
+ *
+ * @param cfg Configuration.
+ * @return {@link SSLContext} when SSL is enabled in the configuration; null otherwise.
+ */
+ public static SSLContext getSslContext(ClientConfiguration cfg) {
+ if (cfg.getSslMode() == SslMode.DISABLED)
+ return null;
+
+ Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory();
+
+ if (sslCtxFactory != null) {
+ try {
+ return sslCtxFactory.create();
+ }
+ catch (Exception e) {
+ throw new ClientError("SSL Context Factory failed", e);
+ }
+ }
+
+ BiFunction<String, String, String> or = (val, dflt) -> val == null || val.isEmpty() ? dflt : val;
+
+ String keyStore = or.apply(
+ cfg.getSslClientCertificateKeyStorePath(),
+ System.getProperty("javax.net.ssl.keyStore")
+ );
+
+ String keyStoreType = or.apply(
+ cfg.getSslClientCertificateKeyStoreType(),
+ or.apply(System.getProperty("javax.net.ssl.keyStoreType"), DFLT_STORE_TYPE)
+ );
+
+ String keyStorePwd = or.apply(
+ cfg.getSslClientCertificateKeyStorePassword(),
+ System.getProperty("javax.net.ssl.keyStorePassword")
+ );
+
+ String trustStore = or.apply(
+ cfg.getSslTrustCertificateKeyStorePath(),
+ System.getProperty("javax.net.ssl.trustStore")
+ );
+
+ String trustStoreType = or.apply(
+ cfg.getSslTrustCertificateKeyStoreType(),
+ or.apply(System.getProperty("javax.net.ssl.trustStoreType"), DFLT_STORE_TYPE)
+ );
+
+ String trustStorePwd = or.apply(
+ cfg.getSslTrustCertificateKeyStorePassword(),
+ System.getProperty("javax.net.ssl.trustStorePassword")
+ );
+
+ String algorithm = or.apply(cfg.getSslKeyAlgorithm(), DFLT_KEY_ALGORITHM);
+
+ String proto = toString(cfg.getSslProtocol());
+
+ if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, trustStorePwd, trustStoreType)
+ .allMatch(s -> s == null || s.isEmpty())
+ ) {
+ try {
+ return SSLContext.getDefault();
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError("Default SSL context cryptographic algorithm is not available", e);
+ }
+ }
+
+ KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, keyStoreType, keyStorePwd);
+
+ TrustManager[] trustManagers = cfg.isSslTrustAll() ?
+ new TrustManager[] {ignoreErrorsTrustMgr} :
+ getTrustManagers(algorithm, trustStore, trustStoreType, trustStorePwd);
+
+ try {
+ SSLContext sslCtx = SSLContext.getInstance(proto);
+
+ sslCtx.init(keyManagers, trustManagers, null);
+
+ return sslCtx;
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError("SSL context cryptographic algorithm is not available", e);
+ }
+ catch (KeyManagementException e) {
+ throw new ClientError("Failed to create SSL Context", e);
+ }
+ }
+
+ /**
+ * @return String representation of {@link SslProtocol} as required by {@link SSLContext}.
+ */
+ private static String toString(SslProtocol proto) {
+ switch (proto) {
+ case TLSv1_1:
+ return "TLSv1.1";
+
+ case TLSv1_2:
+ return "TLSv1.2";
+
+ default:
+ return proto.toString();
+ }
+ }
+
+ /** */
+ private static KeyManager[] getKeyManagers(
+ String algorithm,
+ String keyStore,
+ String keyStoreType,
+ String keyStorePwd
+ ) {
+ KeyManagerFactory keyMgrFactory;
+
+ try {
+ keyMgrFactory = KeyManagerFactory.getInstance(algorithm);
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError("Key manager cryptographic algorithm is not available", e);
+ }
+
+ Predicate<String> empty = s -> s == null || s.isEmpty();
+
+ if (!empty.test(keyStore) && !empty.test(keyStoreType)) {
+ char[] pwd = (keyStorePwd == null) ? EMPTY_CHARS : keyStorePwd.toCharArray();
+
+ KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, pwd);
+
+ try {
+ keyMgrFactory.init(store, pwd);
+ }
+ catch (UnrecoverableKeyException e) {
+ throw new ClientError("Could not recover key store key", e);
+ }
+ catch (KeyStoreException e) {
+ throw new ClientError(
+ String.format("Client key store provider of type [%s] is not available", keyStoreType),
+ e
+ );
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError("Client key store integrity check algorithm is not available", e);
+ }
+ }
+
+ return keyMgrFactory.getKeyManagers();
+ }
+
+ /** */
+ private static TrustManager[] getTrustManagers(
+ String algorithm,
+ String trustStore,
+ String trustStoreType,
+ String trustStorePwd
+ ) {
+ TrustManagerFactory trustMgrFactory;
+
+ try {
+ trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError("Trust manager cryptographic algorithm is not available", e);
+ }
+
+ Predicate<String> empty = s -> s == null || s.isEmpty();
+
+ if (!empty.test(trustStore) && !empty.test(trustStoreType)) {
+ char[] pwd = (trustStorePwd == null) ? EMPTY_CHARS : trustStorePwd.toCharArray();
+
+ KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, pwd);
+
+ try {
+ trustMgrFactory.init(store);
+ }
+ catch (KeyStoreException e) {
+ throw new ClientError(
+ String.format("Trust key store provider of type [%s] is not available", trustStoreType),
+ e
+ );
+ }
+ }
+
+ return trustMgrFactory.getTrustManagers();
+ }
+
+ /** */
+ private static KeyStore loadKeyStore(String lb, String path, String type, char[] pwd) {
+ KeyStore store;
+
+ try {
+ store = KeyStore.getInstance(type);
+ }
+ catch (KeyStoreException e) {
+ throw new ClientError(
+ String.format("%s key store provider of type [%s] is not available", lb, type),
+ e
+ );
+ }
+
+ try (InputStream in = new FileInputStream(new File(path))) {
+
+ store.load(in, pwd);
+
+ return store;
+ }
+ catch (FileNotFoundException e) {
+ throw new ClientError(String.format("%s key store file [%s] does not exist", lb, path), e);
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new ClientError(
+ String.format("%s key store integrity check algorithm is not available", lb),
+ e
+ );
+ }
+ catch (CertificateException e) {
+ throw new ClientError(String.format("Could not load certificate from %s key store", lb), e);
+ }
+ catch (IOException e) {
+ throw new ClientError(String.format("Could not read %s key store", lb), e);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
index ae1b7fa..3aee483 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.client.thin;
+import java.nio.ByteBuffer;
+
/**
* Server to client notification listener.
*/
@@ -30,5 +32,5 @@ interface NotificationListener {
* @param payload Notification payload or {@code null} if there is no payload.
* @param err Error.
*/
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
+ public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, ByteBuffer payload, Exception err);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
index 76af7f2..f9d5978 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/PayloadInputChannel.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.client.thin;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
/**
@@ -33,8 +35,8 @@ class PayloadInputChannel {
/**
* Constructor.
*/
- PayloadInputChannel(ClientChannel ch, byte[] payload) {
- in = new BinaryHeapInputStream(payload);
+ PayloadInputChannel(ClientChannel ch, ByteBuffer payload) {
+ in = BinaryByteBufferInputStream.create(payload);
this.ch = ch;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
index e7005be..195088d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ReliableChannel.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.client.thin;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -31,13 +32,11 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -51,26 +50,21 @@ import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.gridnioserver.GridNioClientConnectionMultiplexer;
import org.apache.ignite.internal.util.HostAndPortRange;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.NotNull;
/**
* Communication channel with failover and partition awareness.
*/
final class ReliableChannel implements AutoCloseable, NotificationListener {
- /** Timeout to wait for executor service to shutdown (in milliseconds). */
- private static final long EXECUTOR_SHUTDOWN_TIMEOUT = 10_000L;
-
/** Do nothing helper function. */
private static final Consumer<Integer> DO_NOTHING = (v) -> {};
- /** Async runner thread name. */
- static final String ASYNC_RUNNER_THREAD_NAME = "thin-client-channel-async-init";
-
/** Channel factory. */
- private final Function<ClientChannelConfiguration, ClientChannel> chFactory;
+ private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
@@ -96,19 +90,6 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/** Listeners of channel close events. */
private final Collection<Consumer<ClientChannel>> channelCloseLsnrs = new CopyOnWriteArrayList<>();
- /** Async tasks thread pool. */
- private final ExecutorService asyncRunner = Executors.newSingleThreadExecutor(
- new ThreadFactory() {
- @Override public Thread newThread(@NotNull Runnable r) {
- Thread thread = new Thread(r, ASYNC_RUNNER_THREAD_NAME);
-
- thread.setDaemon(true);
-
- return thread;
- }
- }
- );
-
/** Channels reinit was scheduled. */
private final AtomicBoolean scheduledChannelsReinit = new AtomicBoolean();
@@ -130,6 +111,9 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/** Guard channels and curChIdx together. */
private final ReadWriteLock curChannelsGuard = new ReentrantReadWriteLock();
+ /** Connection manager. */
+ private final ClientConnectionMultiplexer connMgr;
+
/** Cache addresses returned by {@code ThinClientAddressFinder}. */
private volatile String[] prevHostAddrs;
@@ -137,9 +121,9 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
* Constructor.
*/
ReliableChannel(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration clientCfg,
- IgniteBinary binary
+ BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
+ ClientConfiguration clientCfg,
+ IgniteBinary binary
) {
if (chFactory == null)
throw new NullPointerException("chFactory");
@@ -153,20 +137,16 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
partitionAwarenessEnabled = clientCfg.isPartitionAwarenessEnabled();
affinityCtx = new ClientCacheAffinityContext(binary);
+
+ connMgr = new GridNioClientConnectionMultiplexer(clientCfg);
+ connMgr.start();
}
/** {@inheritDoc} */
@Override public synchronized void close() {
closed = true;
- asyncRunner.shutdown();
-
- try {
- asyncRunner.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- }
- catch (InterruptedException ignore) {
- // No-op.
- }
+ connMgr.stop();
List<ClientChannelHolder> holders = channels;
@@ -430,7 +410,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
ClientChannel ch,
ClientOperation op,
long rsrcId,
- byte[] payload,
+ ByteBuffer payload,
Exception err
) {
for (NotificationListener lsnr : notificationLsnrs) {
@@ -579,7 +559,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
* Asynchronously try to establish a connection to all configured servers.
*/
private void initAllChannelsAsync() {
- asyncRunner.submit(
+ ForkJoinPool.commonPool().submit(
() -> {
List<ClientChannelHolder> holders = channels;
@@ -608,7 +588,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
if (scheduledChannelsReinit.compareAndSet(false, true)) {
// If partition awareness is disabled then only schedule and wait for the default channel to fail.
if (partitionAwarenessEnabled)
- asyncRunner.submit(this::channelsInit);
+ ForkJoinPool.commonPool().submit(this::channelsInit);
}
}
}
@@ -867,6 +847,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/**
* Channels holder.
*/
+ @SuppressWarnings("PackageVisibleInnerClass") // Visible for tests.
class ClientChannelHolder {
/** Channel configuration. */
private final ClientChannelConfiguration chCfg;
@@ -937,7 +918,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
if (!ignoreThrottling && applyReconnectionThrottling())
throw new ClientConnectionException("Reconnect is not allowed due to applied throttling");
- ClientChannel channel = chFactory.apply(chCfg);
+ ClientChannel channel = chFactory.apply(chCfg, connMgr);
if (channel.serverNodeId() != null) {
channel.addTopologyChangeListener(ReliableChannel.this::onTopologyChanged);
@@ -1008,6 +989,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/**
* Get holders reference. For test purposes.
*/
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
List<ClientChannelHolder> getChannelHolders() {
return channels;
}
@@ -1015,6 +997,7 @@ final class ReliableChannel implements AutoCloseable, NotificationListener {
/**
* Get node channels reference. For test purposes.
*/
+ @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") // For tests.
Map<UUID, ClientChannelHolder> getNodeChannels() {
return nodeChannels;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
index 25df909..109c2a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientChannel.java
@@ -17,21 +17,9 @@
package org.apache.ignite.internal.client.thin;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
@@ -44,22 +32,8 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Stream;
-import javax.cache.configuration.Factory;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.client.ClientAuthenticationException;
import org.apache.ignite.client.ClientAuthorizationException;
@@ -67,19 +41,20 @@ import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.ClientReconnectedException;
-import org.apache.ignite.client.SslMode;
-import org.apache.ignite.client.SslProtocol;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryCachingMetadataHandler;
import org.apache.ignite.internal.binary.BinaryContext;
-import org.apache.ignite.internal.binary.BinaryPrimitives;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
@@ -103,19 +78,14 @@ import static org.apache.ignite.internal.client.thin.ProtocolVersion.V1_7_0;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.AUTHORIZATION;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.BITMAP_FEATURES;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.PARTITION_AWARENESS;
-import static org.apache.ignite.ssl.SslContextFactory.DFLT_KEY_ALGORITHM;
-import static org.apache.ignite.ssl.SslContextFactory.DFLT_STORE_TYPE;
/**
* Implements {@link ClientChannel} over TCP.
*/
-class TcpClientChannel implements ClientChannel {
+class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientConnectionStateHandler {
/** Protocol version used by default on first connection attempt. */
private static final ProtocolVersion DEFAULT_VERSION = LATEST_VER;
- /** Receiver thread prefix. */
- static final String RECEIVER_THREAD_PREFIX = "thin-client-channel#";
-
/** Supported protocol versions. */
private static final Collection<ProtocolVersion> supportedVers = Arrays.asList(
V1_7_0,
@@ -128,30 +98,24 @@ class TcpClientChannel implements ClientChannel {
V1_0_0
);
+ /** Preallocated empty bytes. */
+ public static final byte[] EMPTY_BYTES = new byte[0];
+
/** Protocol context. */
- private ProtocolContext protocolCtx;
+ private volatile ProtocolContext protocolCtx;
/** Server node ID. */
- private UUID srvNodeId;
+ private volatile UUID srvNodeId;
/** Server topology version. */
- private AffinityTopologyVersion srvTopVer;
+ private volatile AffinityTopologyVersion srvTopVer;
/** Channel. */
- private final Socket sock;
-
- /** Output stream. */
- private final OutputStream out;
-
- /** Data input. */
- private final ByteCountingDataInput dataInput;
+ private final ClientConnection sock;
/** Request id. */
private final AtomicLong reqId = new AtomicLong(1);
- /** Send lock. */
- private final Lock sndLock = new ReentrantLock();
-
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();
@@ -167,14 +131,11 @@ class TcpClientChannel implements ClientChannel {
/** Executor for async operation listeners. */
private final Executor asyncContinuationExecutor;
- /** Receiver thread (processes incoming messages). */
- private Thread receiverThread;
-
/** Send/receive timeout in milliseconds. */
private final int timeout;
/** Constructor. */
- TcpClientChannel(ClientChannelConfiguration cfg)
+ TcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer connMgr)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
validateConfiguration(cfg);
@@ -183,21 +144,9 @@ class TcpClientChannel implements ClientChannel {
timeout = cfg.getTimeout();
- try {
- sock = createSocket(cfg);
-
- out = sock.getOutputStream();
- dataInput = new ByteCountingDataInput(sock.getInputStream());
-
- handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
+ sock = connMgr.open(cfg.getAddress(), this, this);
- // Disable timeout on socket after handshake, instead, get future result with timeout in "receive" method.
- if (timeout > 0)
- sock.setSoTimeout(0);
- }
- catch (IOException e) {
- throw handleIOError("addr=" + cfg.getAddress(), e);
- }
+ handshake(DEFAULT_VERSION, cfg.getUserName(), cfg.getUserPassword(), cfg.getUserAttributes());
}
/** {@inheritDoc} */
@@ -205,28 +154,25 @@ class TcpClientChannel implements ClientChannel {
close(null);
}
+ /** {@inheritDoc} */
+ @Override public void onMessage(ByteBuffer buf) {
+ processNextMessage(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(@Nullable Exception e) {
+ close(e);
+ }
+
/**
* Close the channel with cause.
*/
private void close(Throwable cause) {
if (closed.compareAndSet(false, true)) {
- U.closeQuiet(dataInput);
- U.closeQuiet(out);
U.closeQuiet(sock);
- sndLock.lock(); // Lock here to prevent creation of new pending requests.
-
- try {
- for (ClientRequestFuture pendingReq : pendingReqs.values())
- pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
-
- if (receiverThread != null)
- receiverThread.interrupt();
- }
- finally {
- sndLock.unlock();
- }
-
+ for (ClientRequestFuture pendingReq : pendingReqs.values())
+ pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
}
}
@@ -251,7 +197,8 @@ class TcpClientChannel implements ClientChannel {
ClientRequestFuture fut = send(op, payloadWriter);
return receiveAsync(fut, payloadReader);
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
CompletableFuture<T> fut = new CompletableFuture<>();
fut.completeExceptionally(t);
@@ -268,15 +215,10 @@ class TcpClientChannel implements ClientChannel {
throws ClientException {
long id = reqId.getAndIncrement();
- // Only one thread at a time can have access to write to the channel.
- sndLock.lock();
-
try (PayloadOutputChannel payloadCh = new PayloadOutputChannel(this)) {
if (closed())
throw new ClientConnectionException("Channel is closed");
- initReceiverThread(); // Start the receiver thread with the first request.
-
ClientRequestFuture fut = new ClientRequestFuture();
pendingReqs.put(id, fut);
@@ -292,7 +234,8 @@ class TcpClientChannel implements ClientChannel {
req.writeInt(0, req.position() - 4); // Actual size.
- write(req.array(), req.position());
+ // arrayCopy is required, because buffer is pooled, and write is async.
+ write(req.arrayCopy(), req.position());
return fut;
}
@@ -301,9 +244,6 @@ class TcpClientChannel implements ClientChannel {
throw t;
}
- finally {
- sndLock.unlock();
- }
}
/**
@@ -314,7 +254,7 @@ class TcpClientChannel implements ClientChannel {
private <T> T receive(ClientRequestFuture pendingReq, Function<PayloadInputChannel, T> payloadReader)
throws ClientException {
try {
- byte[] payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
+ ByteBuffer payload = timeout > 0 ? pendingReq.get(timeout) : pendingReq.get();
if (payload == null || payloadReader == null)
return null;
@@ -338,7 +278,7 @@ class TcpClientChannel implements ClientChannel {
pendingReq.listen(payloadFut -> asyncContinuationExecutor.execute(() -> {
try {
- byte[] payload = payloadFut.get();
+ ByteBuffer payload = payloadFut.get();
if (payload == null || payloadReader == null)
fut.complete(null);
@@ -346,7 +286,8 @@ class TcpClientChannel implements ClientChannel {
T res = payloadReader.apply(new PayloadInputChannel(this, payload));
fut.complete(res);
}
- } catch (Throwable t) {
+ }
+ catch (Throwable t) {
fut.completeExceptionally(convertException(t));
}
}));
@@ -389,58 +330,29 @@ class TcpClientChannel implements ClientChannel {
}
/**
- * Init and start receiver thread if it wasn't started before.
- *
- * Note: Method should be called only under external synchronization.
- */
- private void initReceiverThread() {
- if (receiverThread == null) {
- Socket sock = this.sock;
-
- String sockInfo = sock == null ? null : sock.getInetAddress().getHostName() + ":" + sock.getPort();
-
- receiverThread = new Thread(() -> {
- try {
- while (!closed())
- processNextMessage();
- }
- catch (Throwable e) {
- close(e);
- }
- }, RECEIVER_THREAD_PREFIX + sockInfo);
-
- receiverThread.setDaemon(true);
-
- receiverThread.start();
- }
- }
-
- /**
* Process next message from the input stream and complete corresponding future.
*/
- private void processNextMessage() throws ClientProtocolError, ClientConnectionException {
- // blocking read a message header not to fall into a busy loop
- int msgSize = dataInput.readInt(2048);
-
- if (msgSize <= 0)
- throw new ClientProtocolError(String.format("Invalid message size: %s", msgSize));
+ private void processNextMessage(ByteBuffer buf) throws ClientProtocolError, ClientConnectionException {
+ BinaryInputStream dataInput = BinaryByteBufferInputStream.create(buf);
- long bytesReadOnStartMsg = dataInput.totalBytesRead();
+ if (protocolCtx == null) {
+ // Process handshake.
+ pendingReqs.remove(-1L).onDone(buf);
+ return;
+ }
- long resId = dataInput.spinReadLong();
+ long resId = dataInput.readLong();
int status = 0;
ClientOperation notificationOp = null;
- BinaryInputStream resIn;
-
if (protocolCtx.isFeatureSupported(PARTITION_AWARENESS)) {
- short flags = dataInput.spinReadShort();
+ short flags = dataInput.readShort();
if ((flags & ClientFlag.AFFINITY_TOPOLOGY_CHANGED) != 0) {
- long topVer = dataInput.spinReadLong();
- int minorTopVer = dataInput.spinReadInt();
+ long topVer = dataInput.readLong();
+ int minorTopVer = dataInput.readInt();
srvTopVer = new AffinityTopologyVersion(topVer, minorTopVer);
@@ -449,7 +361,7 @@ class TcpClientChannel implements ClientChannel {
}
if ((flags & ClientFlag.NOTIFICATION) != 0) {
- short notificationCode = dataInput.spinReadShort();
+ short notificationCode = dataInput.readShort();
notificationOp = ClientOperation.fromCode(notificationCode);
@@ -458,28 +370,25 @@ class TcpClientChannel implements ClientChannel {
}
if ((flags & ClientFlag.ERROR) != 0)
- status = dataInput.spinReadInt();
+ status = dataInput.readInt();
}
else
- status = dataInput.spinReadInt();
+ status = dataInput.readInt();
- int hdrSize = (int)(dataInput.totalBytesRead() - bytesReadOnStartMsg);
+ int hdrSize = dataInput.position();
+ int msgSize = buf.limit();
- byte[] res = null;
+ ByteBuffer res = null;
Exception err = null;
if (status == 0) {
if (msgSize > hdrSize)
- res = dataInput.spinRead(msgSize - hdrSize);
+ res = buf;
}
- else if (status == ClientStatus.SECURITY_VIOLATION) {
- dataInput.spinRead(msgSize - hdrSize); // Read message to the end.
-
+ else if (status == ClientStatus.SECURITY_VIOLATION)
err = new ClientAuthorizationException();
- } else {
- resIn = new BinaryHeapInputStream(dataInput.spinRead(msgSize - hdrSize));
-
- String errMsg = ClientUtils.createBinaryReader(null, resIn).readString();
+ else {
+ String errMsg = ClientUtils.createBinaryReader(null, dataInput).readString();
err = new ClientServerError(errMsg, status, resId);
}
@@ -543,31 +452,21 @@ class TcpClientChannel implements ClientChannel {
throw new IllegalArgumentException(error);
}
- /** Create socket. */
- private static Socket createSocket(ClientChannelConfiguration cfg) throws IOException {
- Socket sock = cfg.getSslMode() == SslMode.REQUIRED ?
- new ClientSslSocketFactory(cfg).create() :
- new Socket(cfg.getAddress().getHostName(), cfg.getAddress().getPort());
-
- sock.setTcpNoDelay(cfg.isTcpNoDelay());
-
- if (cfg.getTimeout() > 0)
- sock.setSoTimeout(cfg.getTimeout());
-
- if (cfg.getSendBufferSize() > 0)
- sock.setSendBufferSize(cfg.getSendBufferSize());
-
- if (cfg.getReceiveBufferSize() > 0)
- sock.setReceiveBufferSize(cfg.getReceiveBufferSize());
-
- return sock;
- }
-
/** Client handshake. */
private void handshake(ProtocolVersion ver, String user, String pwd, Map<String, String> userAttrs)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
+ ClientRequestFuture fut = new ClientRequestFuture();
+ pendingReqs.put(-1L, fut);
+
handshakeReq(ver, user, pwd, userAttrs);
- handshakeRes(ver, user, pwd, userAttrs);
+
+ try {
+ ByteBuffer res = timeout > 0 ? fut.get(timeout) : fut.get();
+ handshakeRes(res, ver, user, pwd, userAttrs);
+ }
+ catch (IgniteCheckedException e) {
+ throw new ClientConnectionException(e.getMessage(), e);
+ }
}
/** Send handshake request. */
@@ -604,7 +503,7 @@ class TcpClientChannel implements ClientChannel {
writer.out().writeInt(0, writer.out().position() - 4);// actual size
- write(writer.array(), writer.out().position());
+ write(writer.out().arrayCopy(), writer.out().position());
}
}
@@ -621,20 +520,15 @@ class TcpClientChannel implements ClientChannel {
}
/** Receive and handle handshake response. */
- private void handshakeRes(ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
+ private void handshakeRes(ByteBuffer buf, ProtocolVersion proposedVer, String user, String pwd, Map<String, String> userAttrs)
throws ClientConnectionException, ClientAuthenticationException, ClientProtocolError {
- int resSize = dataInput.readInt();
-
- if (resSize <= 0)
- throw new ClientProtocolError(String.format("Invalid handshake response size: %s", resSize));
-
- BinaryInputStream res = new BinaryHeapInputStream(dataInput.read(resSize));
+ BinaryInputStream res = BinaryByteBufferInputStream.create(buf);
try (BinaryReaderExImpl reader = ClientUtils.createBinaryReader(null, res)) {
boolean success = res.readBoolean();
if (success) {
- byte[] features = new byte[0];
+ byte[] features = EMPTY_BYTES;
if (ProtocolContext.isFeatureSupported(proposedVer, BITMAP_FEATURES))
features = reader.readByteArray();
@@ -680,12 +574,13 @@ class TcpClientChannel implements ClientChannel {
/** Write bytes to the output stream. */
private void write(byte[] bytes, int len) throws ClientConnectionException {
+ ByteBuffer buf = ByteBuffer.wrap(bytes, 0, len);
+
try {
- out.write(bytes, 0, len);
- out.flush();
+ sock.send(buf);
}
- catch (IOException e) {
- throw handleIOError(e);
+ catch (IgniteCheckedException e) {
+ throw new ClientConnectionException(e.getMessage(), e);
}
}
@@ -705,424 +600,8 @@ class TcpClientChannel implements ClientChannel {
}
/**
- * Auxiliary class to read byte buffers and numeric values, counting total bytes read.
- * Numeric values are read in the little-endian byte order.
- */
- private class ByteCountingDataInput implements AutoCloseable {
- /** Input stream. */
- private final InputStream in;
-
- /** Total bytes read from the input stream. */
- private long totalBytesRead;
-
- /** Temporary buffer to read long, int and short values. */
- private final byte[] tmpBuf = new byte[Long.BYTES];
-
- /**
- * @param in Input stream.
- */
- public ByteCountingDataInput(InputStream in) {
- this.in = in;
- }
-
- /** Read bytes from the input stream. */
- public byte[] read(int len) throws ClientConnectionException {
- byte[] bytes = new byte[len];
-
- read(bytes, len, 0);
-
- return bytes;
- }
-
- /** Read bytes from the input stream. */
- public byte[] spinRead(int len) {
- byte[] bytes = new byte[len];
-
- read(bytes, len, Integer.MAX_VALUE);
-
- return bytes;
- }
-
- /**
- * Read bytes from the input stream to the buffer.
- *
- * @param bytes Bytes buffer.
- * @param len Length.
- * @param tryReadCnt Number of reads before falling into blocking read.
- */
- public void read(byte[] bytes, int len, int tryReadCnt) throws ClientConnectionException {
- int offset = 0;
-
- try {
- while (offset < len) {
- int toRead;
-
- if (tryReadCnt == 0)
- toRead = len - offset;
- else if ((toRead = Math.min(in.available(), len - offset)) == 0) {
- tryReadCnt--;
-
- continue;
- }
-
- int read = in.read(bytes, offset, toRead);
-
- if (read < 0)
- throw handleIOError(null);
-
- offset += read;
- totalBytesRead += read;
- }
- }
- catch (IOException e) {
- throw handleIOError(e);
- }
- }
-
- /**
- * Read long value from the input stream.
- */
- public long readLong() throws ClientConnectionException {
- return readLong(0);
- }
-
- /**
- * Read long value from the input stream.
- */
- public long spinReadLong() throws ClientConnectionException {
- return readLong(Integer.MAX_VALUE);
- }
-
- /**
- * Read long value from the input stream.
- *
- * @param tryReadCnt Number of reads before falling into blocking read.
- */
- private long readLong(int tryReadCnt) throws ClientConnectionException {
- read(tmpBuf, Long.BYTES, tryReadCnt);
-
- return BinaryPrimitives.readLong(tmpBuf, 0);
- }
-
- /**
- * Read int value from the input stream.
- */
- public int readInt() throws ClientConnectionException {
- return readInt(0);
- }
-
- /**
- * Read int value from the input stream.
- */
- public int spinReadInt() throws ClientConnectionException {
- return readInt(Integer.MAX_VALUE);
- }
-
- /**
- * Read int value from the input stream.
- *
- * @param tryReadCnt Number of reads before falling into blocking read.
- */
- private int readInt(int tryReadCnt) throws ClientConnectionException {
- read(tmpBuf, Integer.BYTES, tryReadCnt);
-
- return BinaryPrimitives.readInt(tmpBuf, 0);
- }
-
- /**
- * Read short value from the input stream.
- */
- public short readShort() throws ClientConnectionException {
- return readShort(0);
- }
-
- /**
- * Read short value from the input stream.
- */
- public short spinReadShort() throws ClientConnectionException {
- return readShort(Integer.MAX_VALUE);
- }
-
- /**
- * Read short value from the input stream.
- *
- * @param tryReadCnt Number of reads before falling into blocking read.
- */
- public short readShort(int tryReadCnt) throws ClientConnectionException {
- read(tmpBuf, Short.BYTES, tryReadCnt);
-
- return BinaryPrimitives.readShort(tmpBuf, 0);
- }
-
- /**
- * Gets total bytes read from the input stream.
- */
- public long totalBytesRead() {
- return totalBytesRead;
- }
-
- /**
- * Close input stream.
- */
- @Override public void close() throws IOException {
- in.close();
- }
- }
-
- /**
*
*/
- private static class ClientRequestFuture extends GridFutureAdapter<byte[]> {
- }
-
- /** SSL Socket Factory. */
- private static class ClientSslSocketFactory {
- /** Trust manager ignoring all certificate checks. */
- private static final TrustManager ignoreErrorsTrustMgr = new X509TrustManager() {
- @Override public X509Certificate[] getAcceptedIssuers() {
- return null;
- }
-
- @Override public void checkServerTrusted(X509Certificate[] arg0, String arg1) {
- }
-
- @Override public void checkClientTrusted(X509Certificate[] arg0, String arg1) {
- }
- };
-
- /** Config. */
- private final ClientChannelConfiguration cfg;
-
- /** Constructor. */
- ClientSslSocketFactory(ClientChannelConfiguration cfg) {
- this.cfg = cfg;
- }
-
- /** Create SSL socket. */
- SSLSocket create() throws IOException {
- InetSocketAddress addr = cfg.getAddress();
-
- SSLSocket sock = (SSLSocket)getSslSocketFactory(cfg).createSocket(addr.getHostName(), addr.getPort());
-
- sock.setUseClientMode(true);
-
- sock.startHandshake();
-
- return sock;
- }
-
- /** Create SSL socket factory. */
- private static SSLSocketFactory getSslSocketFactory(ClientChannelConfiguration cfg) {
- Factory<SSLContext> sslCtxFactory = cfg.getSslContextFactory();
-
- if (sslCtxFactory != null) {
- try {
- return sslCtxFactory.create().getSocketFactory();
- }
- catch (Exception e) {
- throw new ClientError("SSL Context Factory failed", e);
- }
- }
-
- BiFunction<String, String, String> or = (val, dflt) -> val == null || val.isEmpty() ? dflt : val;
-
- String keyStore = or.apply(
- cfg.getSslClientCertificateKeyStorePath(),
- System.getProperty("javax.net.ssl.keyStore")
- );
-
- String keyStoreType = or.apply(
- cfg.getSslClientCertificateKeyStoreType(),
- or.apply(System.getProperty("javax.net.ssl.keyStoreType"), DFLT_STORE_TYPE)
- );
-
- String keyStorePwd = or.apply(
- cfg.getSslClientCertificateKeyStorePassword(),
- System.getProperty("javax.net.ssl.keyStorePassword")
- );
-
- String trustStore = or.apply(
- cfg.getSslTrustCertificateKeyStorePath(),
- System.getProperty("javax.net.ssl.trustStore")
- );
-
- String trustStoreType = or.apply(
- cfg.getSslTrustCertificateKeyStoreType(),
- or.apply(System.getProperty("javax.net.ssl.trustStoreType"), DFLT_STORE_TYPE)
- );
-
- String trustStorePwd = or.apply(
- cfg.getSslTrustCertificateKeyStorePassword(),
- System.getProperty("javax.net.ssl.trustStorePassword")
- );
-
- String algorithm = or.apply(cfg.getSslKeyAlgorithm(), DFLT_KEY_ALGORITHM);
-
- String proto = toString(cfg.getSslProtocol());
-
- if (Stream.of(keyStore, keyStorePwd, keyStoreType, trustStore, trustStorePwd, trustStoreType)
- .allMatch(s -> s == null || s.isEmpty())
- ) {
- try {
- return SSLContext.getDefault().getSocketFactory();
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError("Default SSL context cryptographic algorithm is not available", e);
- }
- }
-
- KeyManager[] keyManagers = getKeyManagers(algorithm, keyStore, keyStoreType, keyStorePwd);
-
- TrustManager[] trustManagers = cfg.isSslTrustAll() ?
- new TrustManager[] {ignoreErrorsTrustMgr} :
- getTrustManagers(algorithm, trustStore, trustStoreType, trustStorePwd);
-
- try {
- SSLContext sslCtx = SSLContext.getInstance(proto);
-
- sslCtx.init(keyManagers, trustManagers, null);
-
- return sslCtx.getSocketFactory();
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError("SSL context cryptographic algorithm is not available", e);
- }
- catch (KeyManagementException e) {
- throw new ClientError("Failed to create SSL Context", e);
- }
- }
-
- /**
- * @return String representation of {@link SslProtocol} as required by {@link SSLContext}.
- */
- private static String toString(SslProtocol proto) {
- switch (proto) {
- case TLSv1_1:
- return "TLSv1.1";
-
- case TLSv1_2:
- return "TLSv1.2";
-
- default:
- return proto.toString();
- }
- }
-
- /** */
- private static KeyManager[] getKeyManagers(
- String algorithm,
- String keyStore,
- String keyStoreType,
- String keyStorePwd
- ) {
- KeyManagerFactory keyMgrFactory;
-
- try {
- keyMgrFactory = KeyManagerFactory.getInstance(algorithm);
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError("Key manager cryptographic algorithm is not available", e);
- }
-
- Predicate<String> empty = s -> s == null || s.isEmpty();
-
- if (!empty.test(keyStore) && !empty.test(keyStoreType)) {
- char[] pwd = (keyStorePwd == null) ? new char[0] : keyStorePwd.toCharArray();
-
- KeyStore store = loadKeyStore("Client", keyStore, keyStoreType, pwd);
-
- try {
- keyMgrFactory.init(store, pwd);
- }
- catch (UnrecoverableKeyException e) {
- throw new ClientError("Could not recover key store key", e);
- }
- catch (KeyStoreException e) {
- throw new ClientError(
- String.format("Client key store provider of type [%s] is not available", keyStoreType),
- e
- );
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError("Client key store integrity check algorithm is not available", e);
- }
- }
-
- return keyMgrFactory.getKeyManagers();
- }
-
- /** */
- private static TrustManager[] getTrustManagers(
- String algorithm,
- String trustStore,
- String trustStoreType,
- String trustStorePwd
- ) {
- TrustManagerFactory trustMgrFactory;
-
- try {
- trustMgrFactory = TrustManagerFactory.getInstance(algorithm);
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError("Trust manager cryptographic algorithm is not available", e);
- }
-
- Predicate<String> empty = s -> s == null || s.isEmpty();
-
- if (!empty.test(trustStore) && !empty.test(trustStoreType)) {
- char[] pwd = (trustStorePwd == null) ? new char[0] : trustStorePwd.toCharArray();
-
- KeyStore store = loadKeyStore("Trust", trustStore, trustStoreType, pwd);
-
- try {
- trustMgrFactory.init(store);
- }
- catch (KeyStoreException e) {
- throw new ClientError(
- String.format("Trust key store provider of type [%s] is not available", trustStoreType),
- e
- );
- }
- }
-
- return trustMgrFactory.getTrustManagers();
- }
-
- /** */
- private static KeyStore loadKeyStore(String lb, String path, String type, char[] pwd) {
- KeyStore store;
-
- try {
- store = KeyStore.getInstance(type);
- }
- catch (KeyStoreException e) {
- throw new ClientError(
- String.format("%s key store provider of type [%s] is not available", lb, type),
- e
- );
- }
-
- try (InputStream in = new FileInputStream(new File(path))) {
-
- store.load(in, pwd);
-
- return store;
- }
- catch (FileNotFoundException e) {
- throw new ClientError(String.format("%s key store file [%s] does not exist", lb, path), e);
- }
- catch (NoSuchAlgorithmException e) {
- throw new ClientError(
- String.format("%s key store integrity check algorithm is not available", lb),
- e
- );
- }
- catch (CertificateException e) {
- throw new ClientError(String.format("Could not load certificate from %s key store", lb), e);
- }
- catch (IOException e) {
- throw new ClientError(String.format("Could not read %s key store", lb), e);
- }
- }
+ private static class ClientRequestFuture extends GridFutureAdapter<ByteBuffer> {
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 9cea6a4..c67184a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -24,8 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
-import java.util.function.Function;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryObjectException;
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.MarshallerContext;
@@ -101,8 +102,8 @@ public class TcpIgniteClient implements IgniteClient {
* Constructor with custom channel factory.
*/
TcpIgniteClient(
- Function<ClientChannelConfiguration, ClientChannel> chFactory,
- ClientConfiguration cfg
+ BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
+ ClientConfiguration cfg
) throws ClientException {
final ClientBinaryMetadataHandler metadataHandler = new ClientBinaryMetadataHandler();
@@ -116,18 +117,24 @@ public class TcpIgniteClient implements IgniteClient {
ch = new ReliableChannel(chFactory, cfg, binary);
- ch.channelsInit();
+ try {
+ ch.channelsInit();
- ch.addChannelFailListener(() -> metadataHandler.onReconnect());
+ ch.addChannelFailListener(() -> metadataHandler.onReconnect());
- transactions = new TcpClientTransactions(ch, marsh,
- new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
+ transactions = new TcpClientTransactions(ch, marsh,
+ new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
- cluster = new ClientClusterImpl(ch, marsh);
+ cluster = new ClientClusterImpl(ch, marsh);
- compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
+ compute = new ClientComputeImpl(ch, marsh, cluster.defaultClusterGroup());
- services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+ services = new ClientServicesImpl(ch, marsh, cluster.defaultClusterGroup());
+ }
+ catch (Exception e) {
+ ch.close();
+ throw e;
+ }
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
index ae1b7fa..eed90b6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnection.java
@@ -15,20 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteCheckedException;
/**
- * Server to client notification listener.
+ * Client connection: abstracts away sending and receiving messages.
*/
-interface NotificationListener {
+public interface ClientConnection extends AutoCloseable {
/**
- * Accept notification.
+ * Sends a message.
*
- * @param ch Client channel which was notified.
- * @param op Client operation.
- * @param rsrcId Resource id.
- * @param payload Notification payload or {@code null} if there is no payload.
- * @param err Error.
+ * @param msg Message buffer.
+ */
+ void send(ByteBuffer msg) throws IgniteCheckedException;
+
+ /**
+ * Closes the connection.
*/
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
+ @Override void close();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java
new file mode 100644
index 0000000..891e2b3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionMultiplexer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.net.InetSocketAddress;
+
+import org.apache.ignite.client.ClientConnectionException;
+
+/**
+ * Client connection multiplexer: manages multiple connections with a shared resource pool (worker threads, etc).
+ */
+public interface ClientConnectionMultiplexer {
+ /**
+ * Initializes this instance.
+ */
+ void start();
+
+ /**
+ * Stops this instance.
+ */
+ void stop();
+
+ /**
+ * Opens a new connection.
+ *
+ * @param addr Address.
+ * @param msgHnd Incoming message handler.
+ * @param stateHnd Connection state handler.
+ * @return Created connection.
+ * @throws ClientConnectionException when connection can't be established.
+ */
+ ClientConnection open(
+ InetSocketAddress addr,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd)
+ throws ClientConnectionException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
index ae1b7fa..3f9481e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientConnectionStateHandler.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import org.jetbrains.annotations.Nullable;
/**
- * Server to client notification listener.
+ * Handles thin client connection state.
*/
-interface NotificationListener {
+public interface ClientConnectionStateHandler {
/**
- * Accept notification.
- *
- * @param ch Client channel which was notified.
- * @param op Client operation.
- * @param rsrcId Resource id.
- * @param payload Notification payload or {@code null} if there is no payload.
- * @param err Error.
+ * Handles connection loss.
+ * @param e Exception that caused the disconnect, can be null.
*/
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
+ void onDisconnected(@Nullable Exception e);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
new file mode 100644
index 0000000..06ab441
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageDecoder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Decodes thin client messages from partial buffers.
+ */
+public class ClientMessageDecoder {
+ /** */
+ private byte[] data;
+
+ /** */
+ private int cnt = -4;
+
+ /** */
+ private int msgSize;
+
+ /**
+ * Applies the next partial buffer.
+ *
+ * @param buf Buffer.
+ * @return Decoded message, or null when not yet complete.
+ */
+ public byte[] apply(ByteBuffer buf) {
+ boolean msgReady = read(buf);
+
+ return msgReady ? data : null;
+ }
+
+ /**
+ * Reads the buffer.
+ *
+ * @param buf Buffer.
+ * @return True when a complete message has been received; false otherwise.
+ */
+ @SuppressWarnings("DuplicatedCode") // A little duplication is better than a little dependency.
+ private boolean read(ByteBuffer buf) {
+ if (cnt < 0) {
+ for (; cnt < 0 && buf.hasRemaining(); cnt++)
+ msgSize |= (buf.get() & 0xFF) << (8 * (4 + cnt));
+
+ if (cnt < 0)
+ return false;
+
+ data = new byte[msgSize];
+ }
+
+ assert data != null;
+ assert cnt >= 0;
+ assert msgSize > 0;
+
+ int remaining = buf.remaining();
+
+ if (remaining > 0) {
+ int missing = msgSize - cnt;
+
+ if (missing > 0) {
+ int len = Math.min(missing, remaining);
+
+ buf.get(data, cnt, len);
+
+ cnt += len;
+ }
+ }
+
+ if (cnt == msgSize) {
+ cnt = -4;
+ msgSize = 0;
+
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
similarity index 62%
copy from modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
copy to modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
index ae1b7fa..a52859f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/NotificationListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/ClientMessageHandler.java
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client.thin;
+package org.apache.ignite.internal.client.thin.io;
+
+import java.nio.ByteBuffer;
/**
- * Server to client notification listener.
+ * Handles thin client responses and server -> client notifications.
*/
-interface NotificationListener {
+public interface ClientMessageHandler {
/**
- * Accept notification.
- *
- * @param ch Client channel which was notified.
- * @param op Client operation.
- * @param rsrcId Resource id.
- * @param payload Notification payload or {@code null} if there is no payload.
- * @param err Error.
+ * Handles messages from the server.
+ * @param buf Buffer.
*/
- public void acceptNotification(ClientChannel ch, ClientOperation op, long rsrcId, byte[] payload, Exception err);
+ void onMessage(ByteBuffer buf);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
new file mode 100644
index 0000000..e81d6f4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnection.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+
+/**
+ * Client connection.
+ */
+class GridNioClientConnection implements ClientConnection {
+ /** */
+ static final int SES_META_CONN = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** */
+ private final GridNioSession ses;
+
+ /** */
+ private final ClientMessageHandler msgHnd;
+
+ /** */
+ private final ClientConnectionStateHandler stateHnd;
+
+ /**
+ * Ctor.
+ *
+ * @param ses Session.
+ */
+ public GridNioClientConnection(GridNioSession ses,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd) {
+ assert ses != null;
+ assert msgHnd != null;
+ assert stateHnd != null;
+
+ this.ses = ses;
+ this.msgHnd = msgHnd;
+ this.stateHnd = stateHnd;
+
+ ses.addMeta(SES_META_CONN, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void send(ByteBuffer msg) throws IgniteCheckedException {
+ ses.sendNoFuture(msg, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ ses.close();
+ }
+
+ /**
+ * Handles incoming message.
+ *
+ * @param msg Message.
+ */
+ void onMessage(ByteBuffer msg) {
+ assert msg != null;
+
+ msgHnd.onMessage(msg);
+ }
+
+ /**
+ * Handles disconnect.
+ *
+ * @param e Exception that caused the disconnect.
+ */
+ void onDisconnected(Exception e) {
+ stateHnd.onDisconnected(e);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
new file mode 100644
index 0000000..74a7025
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientConnectionMultiplexer.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLContext;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.client.ClientConnectionException;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.ClientSslUtils;
+import org.apache.ignite.internal.client.thin.io.ClientConnection;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionStateHandler;
+import org.apache.ignite.internal.client.thin.io.ClientMessageHandler;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioFuture;
+import org.apache.ignite.internal.util.nio.GridNioFutureImpl;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.ssl.GridNioSslFilter;
+import org.apache.ignite.logger.NullLogger;
+
+/**
+ * Client connection multiplexer based on {@link org.apache.ignite.internal.util.nio.GridNioServer}.
+ */
+public class GridNioClientConnectionMultiplexer implements ClientConnectionMultiplexer {
+ /** Worker thread prefix. */
+ private static final String THREAD_PREFIX = "thin-client-channel";
+
+ /** */
+ private static final int CLIENT_MODE_PORT = -1;
+
+ /** */
+ private final GridNioServer<ByteBuffer> srv;
+
+ /** */
+ private final SSLContext sslCtx;
+
+ /**
+ * Constructor.
+ *
+ * @param cfg Client config.
+ */
+ public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {
+ IgniteLogger gridLog = new NullLogger();
+
+ GridNioFilter[] filters;
+
+ GridNioFilter codecFilter = new GridNioCodecFilter(new GridNioClientParser(), gridLog, false);
+
+ sslCtx = ClientSslUtils.getSslContext(cfg);
+
+ if (sslCtx != null) {
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, true, ByteOrder.nativeOrder(), gridLog);
+ sslFilter.directMode(false);
+ filters = new GridNioFilter[] {codecFilter, sslFilter};
+ }
+ else
+ filters = new GridNioFilter[] {codecFilter};
+
+ try {
+ srv = GridNioServer.<ByteBuffer>builder()
+ .port(CLIENT_MODE_PORT)
+ .listener(new GridNioClientListener())
+ .filters(filters)
+ .logger(gridLog)
+ .selectorCount(1) // Using more selectors does not seem to improve performance.
+ .byteOrder(ByteOrder.nativeOrder())
+ .directBuffer(true)
+ .directMode(false)
+ .igniteInstanceName("thinClient")
+ .serverName(THREAD_PREFIX)
+ .idleTimeout(Long.MAX_VALUE)
+ .socketReceiveBufferSize(cfg.getReceiveBufferSize())
+ .socketSendBufferSize(cfg.getSendBufferSize())
+ .tcpNoDelay(true)
+ .build();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() {
+ srv.start();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ srv.stop();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientConnection open(InetSocketAddress addr,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd)
+ throws ClientConnectionException {
+ try {
+ SocketChannel ch = SocketChannel.open();
+ ch.socket().connect(new InetSocketAddress(addr.getHostName(), addr.getPort()), Integer.MAX_VALUE);
+
+ Map<Integer, Object> meta = new HashMap<>();
+ GridNioFuture<?> sslHandshakeFut = null;
+
+ if (sslCtx != null) {
+ sslHandshakeFut = new GridNioFutureImpl<>(null);
+
+ meta.put(GridNioSslFilter.HANDSHAKE_FUT_META_KEY, sslHandshakeFut);
+ }
+
+ GridNioSession ses = srv.createSession(ch, meta, false, null).get();
+
+ if (sslHandshakeFut != null)
+ sslHandshakeFut.get();
+
+ return new GridNioClientConnection(ses, msgHnd, stateHnd);
+ }
+ catch (Exception e) {
+ throw new ClientConnectionException(e.getMessage(), e);
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java
new file mode 100644
index 0000000..f33835d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientListener.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.util.nio.GridNioServerListener;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client event listener.
+ */
+class GridNioClientListener implements GridNioServerListener<ByteBuffer> {
+ /** {@inheritDoc} */
+ @Override public void onConnected(GridNioSession ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ GridNioClientConnection conn = ses.meta(GridNioClientConnection.SES_META_CONN);
+
+ // Conn can be null when connection fails during initialization in open method.
+ if (conn != null)
+ conn.onDisconnected(e);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageSent(GridNioSession ses, ByteBuffer msg) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(GridNioSession ses, ByteBuffer msg) {
+ GridNioClientConnection conn = ses.meta(GridNioClientConnection.SES_META_CONN);
+
+ assert conn != null : "Session must have an associated connection";
+
+ conn.onMessage(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionWriteTimeout(GridNioSession ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionIdleTimeout(GridNioSession ses) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+ // No-op.
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
new file mode 100644
index 0000000..439c78a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/io/gridnioserver/GridNioClientParser.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin.io.gridnioserver;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import org.apache.ignite.internal.client.thin.io.ClientMessageDecoder;
+import org.apache.ignite.internal.util.nio.GridNioParser;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client message parser.
+ */
+class GridNioClientParser implements GridNioParser {
+ /** */
+ private static final int SES_META_DECODER = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Object decode(GridNioSession ses, ByteBuffer buf) {
+ ClientMessageDecoder decoder = ses.meta(SES_META_DECODER);
+
+ if (decoder == null) {
+ decoder = new ClientMessageDecoder();
+
+ ses.addMeta(SES_META_DECODER, decoder);
+ }
+
+ byte[] bytes = decoder.apply(buf);
+
+ if (bytes == null)
+ return null; // Message is not yet completely received.
+
+ // Thin client protocol is little-endian. ByteBuffer will handle conversion as necessary on big-endian systems.
+ return ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) {
+ return (ByteBuffer)msg;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
index bbb2c87..2d75d5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ConnectToStartingNodeTest.java
@@ -71,14 +71,20 @@ public class ConnectToStartingNodeTest extends AbstractThinClientTest {
IgniteInternalFuture<IgniteClient> futStartClient = GridTestUtils.runAsync(
() -> startClient(grid()));
- // Server doesn't accept connection before discovery SPI started.
- assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+ try {
+ // Server doesn't accept connection before discovery SPI started.
+ assertFalse(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
- barrier.await();
+ barrier.await();
- futStartGrid.get();
+ futStartGrid.get();
- // Server accept connection after discovery SPI started.
- assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+ // Server accept connection after discovery SPI started.
+ assertTrue(GridTestUtils.waitForCondition(futStartClient::isDone, 500L));
+ }
+ finally {
+ if (futStartClient.isDone())
+ futStartClient.get().close();
+ }
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
index 0f0791b..c6def06 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/SslParametersTest.java
@@ -288,7 +288,7 @@ public class SslParametersTest extends GridCommonAbstractTest {
cipherSuites,
protocols,
ClientConnectionException.class,
- "Ignite cluster is unavailable"
+ "SSL handshake failed"
);
}
@@ -307,7 +307,7 @@ public class SslParametersTest extends GridCommonAbstractTest {
this.cipherSuites = F.isEmpty(cipherSuites) ? null : cipherSuites;
this.protocols = F.isEmpty(protocols) ? null : protocols;
- GridTestUtils.assertThrows(
+ GridTestUtils.assertThrowsAnyCause(
null,
new Callable<Object>() {
@Override public Object call() {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
index 61adf66..686a193 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ReliableChannelTest.java
@@ -26,6 +26,7 @@ import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -35,6 +36,7 @@ import org.apache.ignite.client.ClientAuthorizationException;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.testframework.GridTestUtils;
@@ -51,7 +53,8 @@ import static org.mockito.Mockito.mock;
*/
public class ReliableChannelTest {
/** Mock factory for creating new channels. */
- private final Function<ClientChannelConfiguration, ClientChannel> chFactory = cfg -> new TestClientChannel();
+ private final BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory =
+ (cfg, hnd) -> new TestClientChannel();
/** */
private final String[] dfltAddrs = new String[]{"127.0.0.1:10800", "127.0.0.1:10801", "127.0.0.1:10802"};
@@ -259,7 +262,7 @@ public class ReliableChannelTest {
.setAddresses(dfltAddrs)
.setPartitionAwarenessEnabled(true);
- ReliableChannel rc = new ReliableChannel(cfg -> new TestFailureClientChannel(), ccfg, null);
+ ReliableChannel rc = new ReliableChannel((cfg, hnd) -> new TestFailureClientChannel(), ccfg, null);
rc.channelsInit();
}
@@ -302,7 +305,7 @@ public class ReliableChannelTest {
// Emulate cluster is down after TcpClientChannel#send operation.
AtomicInteger step = new AtomicInteger();
- ReliableChannel rc = new ReliableChannel(cfg -> {
+ ReliableChannel rc = new ReliableChannel((cfg, hnd) -> {
if (step.getAndIncrement() == 0)
return new TestAsyncServiceFailureClientChannel();
else
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
index dd716d6..7eda71f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientAbstractPartitionAwarenessTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.client.thin.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -185,11 +186,11 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
* @param chIdxs Channels to wait for initialization.
*/
protected void initClient(ClientConfiguration clientCfg, int... chIdxs) throws IgniteInterruptedCheckedException {
- client = new TcpIgniteClient(cfg -> {
+ client = new TcpIgniteClient((cfg, hnd) -> {
try {
log.info("Establishing connection to " + cfg.getAddress());
- TcpClientChannel ch = new TestTcpClientChannel(cfg);
+ TcpClientChannel ch = new TestTcpClientChannel(cfg, hnd);
log.info("Channel initialized: " + ch);
@@ -323,8 +324,8 @@ public abstract class ThinClientAbstractPartitionAwarenessTest extends GridCommo
/**
* @param cfg Config.
*/
- public TestTcpClientChannel(ClientChannelConfiguration cfg) {
- super(cfg);
+ public TestTcpClientChannel(ClientChannelConfiguration cfg, ClientConnectionMultiplexer hnd) {
+ super(cfg, hnd);
this.cfg = cfg;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
index 7dc6222..2909c4e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/ThinClientPartitionAwarenessResourceReleaseTest.java
@@ -23,13 +23,13 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
-import static org.apache.ignite.internal.client.thin.ReliableChannel.ASYNC_RUNNER_THREAD_NAME;
-import static org.apache.ignite.internal.client.thin.TcpClientChannel.RECEIVER_THREAD_PREFIX;
-
/**
* Test resource releasing by thin client.
*/
public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientAbstractPartitionAwarenessTest {
+ /** Worker thread prefix. */
+ private static final String THREAD_PREFIX = "thin-client-channel";
+
/**
* Test that resources are correctly released after closing client with partition awareness.
*/
@@ -46,15 +46,13 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
assertFalse(channels[0].isClosed());
assertFalse(channels[1].isClosed());
- assertEquals(1, threadsCount(ASYNC_RUNNER_THREAD_NAME));
- assertEquals(2, threadsCount(RECEIVER_THREAD_PREFIX));
+ assertEquals(1, threadsCount(THREAD_PREFIX));
client.close();
assertTrue(channels[0].isClosed());
assertTrue(channels[1].isClosed());
- assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(ASYNC_RUNNER_THREAD_NAME) == 0, 1_000L));
- assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(RECEIVER_THREAD_PREFIX) == 0, 1_000L));
+ assertTrue(GridTestUtils.waitForCondition(() -> threadsCount(THREAD_PREFIX) == 0, 1_000L));
}
/**
@@ -68,7 +66,7 @@ public class ThinClientPartitionAwarenessResourceReleaseTest extends ThinClientA
for (long id : threadIds) {
ThreadInfo info = U.getThreadMx().getThreadInfo(id);
- if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().startsWith(name))
+ if (info != null && info.getThreadState() != Thread.State.TERMINATED && info.getThreadName().contains(name))
cnt++;
}