You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by al...@apache.org on 2021/07/29 09:19:00 UTC
[ignite] branch master updated: IGNITE-15138 Java thin client: Fix
issue with explicit binary type configuration - Fixes #9278.
This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cc9162b IGNITE-15138 Java thin client: Fix issue with explicit binary type configuration - Fixes #9278.
cc9162b is described below
commit cc9162b08c75dca012693904776f2c9f611bf9be
Author: Aleksey Plekhanov <pl...@gmail.com>
AuthorDate: Thu Jul 29 12:12:30 2021 +0300
IGNITE-15138 Java thin client: Fix issue with explicit binary type configuration - Fixes #9278.
Signed-off-by: Aleksey Plekhanov <pl...@gmail.com>
---
.../internal/client/thin/TcpIgniteClient.java | 38 +++++++++--
.../org/apache/ignite/client/IgniteBinaryTest.java | 78 ++++++++++++++++++++++
.../test/java/org/apache/ignite/client/Person.java | 14 +++-
3 files changed, 121 insertions(+), 9 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
index 617948d..a3b4403 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpIgniteClient.java
@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -108,9 +109,9 @@ public class TcpIgniteClient implements IgniteClient {
BiFunction<ClientChannelConfiguration, ClientConnectionMultiplexer, ClientChannel> chFactory,
ClientConfiguration cfg
) throws ClientException {
- final ClientBinaryMetadataHandler metadataHandler = new ClientBinaryMetadataHandler();
+ final ClientBinaryMetadataHandler metadataHnd = new ClientBinaryMetadataHandler();
- marsh = new ClientBinaryMarshaller(metadataHandler, new ClientMarshallerContext());
+ marsh = new ClientBinaryMarshaller(metadataHnd, new ClientMarshallerContext());
marsh.setBinaryConfiguration(cfg.getBinaryConfiguration());
@@ -123,7 +124,10 @@ public class TcpIgniteClient implements IgniteClient {
try {
ch.channelsInit();
- ch.addChannelFailListener(() -> metadataHandler.onReconnect());
+ ch.addChannelFailListener(metadataHnd::onReconnect);
+
+ // Send postponed metadata after channel init.
+ metadataHnd.sendAllMeta();
transactions = new TcpClientTransactions(ch, marsh,
new ClientTransactionConfiguration(cfg.getTransactionConfiguration()));
@@ -377,10 +381,12 @@ public class TcpIgniteClient implements IgniteClient {
// If type wasn't registered before or metadata changed, send registration request.
if (oldType == null || BinaryUtils.mergeMetadata(oldMeta, newMeta) != oldMeta) {
try {
- ch.request(
- ClientOperation.PUT_BINARY_TYPE,
- req -> serDes.binaryMetadata(newMeta, req.out())
- );
+ if (ch != null) { // Postpone binary type registration requests to server before channels initiated.
+ ch.request(
+ ClientOperation.PUT_BINARY_TYPE,
+ req -> serDes.binaryMetadata(newMeta, req.out())
+ );
+ }
}
catch (ClientException e) {
throw new BinaryObjectException(e);
@@ -390,6 +396,24 @@ public class TcpIgniteClient implements IgniteClient {
cache.addMeta(typeId, meta, failIfUnregistered); // merge
}
+ /** Send registration requests to the server for all collected metadata. */
+ public void sendAllMeta() {
+ try {
+ CompletableFuture.allOf(cache.metadata().stream()
+ .map(type -> sendMetaAsync(((BinaryTypeImpl)type).metadata()).toCompletableFuture())
+ .toArray(CompletableFuture[]::new)
+ ).get();
+ }
+ catch (Exception e) {
+ throw new ClientException(e);
+ }
+ }
+
+ /** Send metadata registration request to the server. */
+ private IgniteClientFuture<Void> sendMetaAsync(BinaryMetadata meta) {
+ return ch.requestAsync(ClientOperation.PUT_BINARY_TYPE, req -> serDes.binaryMetadata(meta, req.out()));
+ }
+
/** {@inheritDoc} */
@Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered)
throws BinaryObjectException {
diff --git a/modules/core/src/test/java/org/apache/ignite/client/IgniteBinaryTest.java b/modules/core/src/test/java/org/apache/ignite/client/IgniteBinaryTest.java
index 370e889..dc62e28 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/IgniteBinaryTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/IgniteBinaryTest.java
@@ -19,15 +19,22 @@ package org.apache.ignite.client;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
+import org.apache.ignite.binary.BinaryIdMapper;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinarySerializer;
import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
+import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
@@ -39,6 +46,7 @@ import org.junit.rules.Timeout;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
/**
* Ignite {@link BinaryObject} API system tests.
@@ -188,6 +196,76 @@ public class IgniteBinaryTest {
}
/**
+ * Test custom binary type serializer.
+ */
+ @Test
+ public void testBinarySerializer() throws Exception {
+ BinarySerializer binSer = new BinarySerializer() {
+ @Override public void writeBinary(Object obj, BinaryWriter writer) throws BinaryObjectException {
+ writer.writeInt("f1", ((Person)obj).getId());
+ }
+
+ @Override public void readBinary(Object obj, BinaryReader reader) throws BinaryObjectException {
+ ((Person)obj).setId(reader.readInt("f1"));
+ }
+ };
+
+ BinaryTypeConfiguration typeCfg = new BinaryTypeConfiguration(Person.class.getName()).setSerializer(binSer);
+ BinaryConfiguration binCfg = new BinaryConfiguration().setTypeConfigurations(Collections.singleton(typeCfg));
+
+ try (Ignite ignite = Ignition.start(Config.getServerConfiguration().setBinaryConfiguration(binCfg))) {
+ try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER)
+ .setBinaryConfiguration(binCfg))) {
+ IgniteCache<Integer, Person> igniteCache = ignite.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
+ ClientCache<Integer, Person> clientCache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
+
+ Person val = new Person(123, "Joe");
+
+ clientCache.put(1, val);
+
+ assertEquals(val.getId(), clientCache.get(1).getId());
+ assertNull(clientCache.get(1).getName());
+ assertEquals(val.getId(), igniteCache.get(1).getId());
+ assertNull(igniteCache.get(1).getName());
+ }
+ }
+ }
+
+ /**
+ * Test custom binary type ID mapper.
+ */
+ @Test
+ public void testBinaryIdMapper() throws Exception {
+ BinaryIdMapper idMapper = new BinaryIdMapper() {
+ @Override public int typeId(String typeName) {
+ return typeName.hashCode() % 1000 + 1000;
+ }
+
+ @Override public int fieldId(int typeId, String fieldName) {
+ return fieldName.hashCode();
+ }
+ };
+
+ BinaryTypeConfiguration typeCfg = new BinaryTypeConfiguration(Person.class.getName()).setIdMapper(idMapper);
+ BinaryConfiguration binCfg = new BinaryConfiguration().setTypeConfigurations(Collections.singleton(typeCfg));
+
+ try (Ignite ignite = Ignition.start(Config.getServerConfiguration().setBinaryConfiguration(binCfg))) {
+ try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER)
+ .setBinaryConfiguration(binCfg))) {
+ IgniteCache<Integer, Person> igniteCache = ignite.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
+ ClientCache<Integer, Person> clientCache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
+
+ Person val = new Person(123, "Joe");
+
+ clientCache.put(1, val);
+
+ assertEquals(val, clientCache.get(1));
+ assertEquals(val, igniteCache.get(1));
+ }
+ }
+ }
+
+ /**
* Binary Object API:
* {@link IgniteBinary#typeId(String)}
* {@link IgniteBinary#toBinary(Object)}
diff --git a/modules/core/src/test/java/org/apache/ignite/client/Person.java b/modules/core/src/test/java/org/apache/ignite/client/Person.java
index 27510a7..5e29ec2 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/Person.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/Person.java
@@ -27,11 +27,11 @@ import org.apache.ignite.internal.util.typedef.internal.S;
public class Person {
/** Id. */
@QuerySqlField(index = true)
- private final Integer id;
+ private Integer id;
/** Name. */
@QuerySqlField
- private final String name;
+ private String name;
/** Constructor. */
public Person(Integer id, String name) {
@@ -49,6 +49,16 @@ public class Person {
return name;
}
+ /** */
+ public void setId(Integer id) {
+ this.id = id;
+ }
+
+ /** */
+ public void setName(String name) {
+ this.name = name;
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(id, name);