You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2022/02/01 08:35:15 UTC
[ignite-3] branch main updated: IGNITE-14698 Improved configuration values serialization. (#605)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 40905cf IGNITE-14698 Improved configuration values serialization. (#605)
40905cf is described below
commit 40905cf17736e4d6270659620860bebc86865d87
Author: ibessonov <be...@gmail.com>
AuthorDate: Tue Feb 1 11:35:08 2022 +0300
IGNITE-14698 Improved configuration values serialization. (#605)
---
.../util/ConfigurationSerializationUtil.java | 426 +++++++++++++++++++++
.../util/ConfigurationSerializationUtilTest.java | 87 +++++
.../storage/DistributedConfigurationStorage.java | 13 +-
.../storage/LocalConfigurationStorage.java | 8 +-
4 files changed, 523 insertions(+), 11 deletions(-)
diff --git a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtil.java b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtil.java
new file mode 100644
index 0000000..dcc2d61
--- /dev/null
+++ b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtil.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.util;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utility class to help serializing deserializing configuration values - primitives, strings, or arrays of primitives or strings.
+ */
+public class ConfigurationSerializationUtil {
+ private static final byte BOOLEAN = 1;
+
+ private static final byte BYTE = 2;
+
+ private static final byte SHORT = 3;
+
+ private static final byte INT = 4;
+
+ private static final byte LONG = 5;
+
+ private static final byte CHAR = 6;
+
+ private static final byte FLOAT = 7;
+
+ private static final byte DOUBLE = 8;
+
+ private static final byte STRING = 9;
+
+ private static final byte ARRAY = (byte) 0x80;
+
+ private static final String[] EMPTY_STRING_ARRAY = new String[0];
+
+ /**
+ * Converts a configuration value into a byte array.
+ *
+ * @param value Configuration values.
+ * @return Serialized value.
+ */
+ public static byte[] toBytes(Object value) {
+ Objects.requireNonNull(value);
+
+ byte header = header(value.getClass());
+
+ switch (header) {
+ case BOOLEAN:
+ return new byte[]{header, (byte) ((boolean) value ? 1 : 0)};
+
+ case BYTE:
+ return new byte[]{header, (byte) value};
+
+ case SHORT:
+ return allocateBuffer(Short.BYTES + 1).put(header).putShort((short) value).array();
+
+ case INT:
+ return allocateBuffer(Integer.BYTES + 1).put(header).putInt((int) value).array();
+
+ case LONG:
+ return allocateBuffer(Long.BYTES + 1).put(header).putLong((long) value).array();
+
+ case CHAR:
+ return allocateBuffer(Character.BYTES + 1).put(header).putChar((char) value).array();
+
+ case FLOAT:
+ return allocateBuffer(Float.BYTES + 1).put(header).putFloat((float) value).array();
+
+ case DOUBLE:
+ return allocateBuffer(Double.BYTES + 1).put(header).putDouble((double) value).array();
+
+ case STRING: {
+ byte[] strBytes = ((String) value).getBytes(StandardCharsets.UTF_8);
+
+ return allocateBuffer(1 + strBytes.length).put(header).put(strBytes).array();
+ }
+
+ case BOOLEAN | ARRAY: {
+ return compressBooleanArray((boolean[]) value);
+ }
+
+ case BYTE | ARRAY: {
+ byte[] bytes = (byte[]) value;
+
+ return allocateBuffer(1 + bytes.length).put(header).put(bytes).array();
+ }
+
+ case SHORT | ARRAY: {
+ short[] shorts = (short[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Short.BYTES * shorts.length);
+
+ buf.put(header);
+
+ for (short s : shorts) {
+ buf.putShort(s);
+ }
+
+ return buf.array();
+ }
+
+ case INT | ARRAY: {
+ int[] ints = (int[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Integer.BYTES * ints.length);
+
+ buf.put(header);
+
+ for (int n : ints) {
+ buf.putInt(n);
+ }
+
+ return buf.array();
+ }
+
+ case LONG | ARRAY: {
+ long[] longs = (long[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Long.BYTES * longs.length);
+
+ buf.put(header);
+
+ for (long n : longs) {
+ buf.putLong(n);
+ }
+
+ return buf.array();
+ }
+
+ case CHAR | ARRAY: {
+ char[] chars = (char[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Character.BYTES * chars.length);
+
+ buf.put(header);
+
+ for (char c : chars) {
+ buf.putChar(c);
+ }
+
+ return buf.array();
+ }
+
+ case FLOAT | ARRAY: {
+ float[] floats = (float[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Float.BYTES * floats.length);
+
+ buf.put(header);
+
+ for (float f : floats) {
+ buf.putFloat(f);
+ }
+
+ return buf.array();
+ }
+
+ case DOUBLE | ARRAY: {
+ double[] doubles = (double[]) value;
+
+ ByteBuffer buf = allocateBuffer(1 + Double.BYTES * doubles.length);
+
+ buf.put(header);
+
+ for (double d : doubles) {
+ buf.putDouble(d);
+ }
+
+ return buf.array();
+ }
+
+ case STRING | ARRAY: {
+ String[] strings = (String[]) value;
+
+ byte[][] strBytes = Arrays.stream(strings).map(s -> s.getBytes(StandardCharsets.UTF_8)).toArray(byte[][]::new);
+
+ int totalSize = Arrays.stream(strBytes).mapToInt(bytes -> bytes.length).sum();
+
+ ByteBuffer buf = allocateBuffer(1 + Integer.BYTES * strBytes.length + totalSize);
+
+ buf.put(header);
+
+ for (int i = 0; i < strings.length; i++) {
+ buf.putInt(strings[i].length());
+
+ buf.put(strBytes[i]);
+ }
+
+ return buf.array();
+ }
+
+ default:
+ throw new IllegalArgumentException(value.getClass().getName());
+ }
+ }
+
+ /**
+ * Converts a byte array into a configuration value.
+ *
+ * @param bytes Serialized value.
+ * @return Deserialized configuration value.
+ */
+ public static Serializable fromBytes(byte[] bytes) {
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+
+ byte header = buf.get();
+
+ switch (header) {
+ case BOOLEAN:
+ return buf.get() == 1;
+
+ case BYTE:
+ return buf.get();
+
+ case SHORT:
+ return buf.getShort();
+
+ case INT:
+ return buf.getInt();
+
+ case LONG:
+ return buf.getLong();
+
+ case CHAR:
+ return buf.getChar();
+
+ case FLOAT:
+ return buf.getFloat();
+
+ case DOUBLE:
+ return buf.getDouble();
+
+ case STRING:
+ return new String(bytes, 1, bytes.length - 1, StandardCharsets.UTF_8);
+
+ case BOOLEAN | ARRAY: {
+ return decompressBooleanArray(bytes);
+ }
+
+ case BYTE | ARRAY:
+ return Arrays.copyOfRange(bytes, 1, bytes.length);
+
+ case SHORT | ARRAY: {
+ short[] shorts = new short[bytes.length / Short.BYTES];
+
+ for (int i = 0; i < shorts.length; i++) {
+ shorts[i] = buf.getShort();
+ }
+
+ return shorts;
+ }
+
+ case INT | ARRAY: {
+ int[] ints = new int[bytes.length / Integer.BYTES];
+
+ for (int i = 0; i < ints.length; i++) {
+ ints[i] = buf.getInt();
+ }
+
+ return ints;
+ }
+
+ case LONG | ARRAY: {
+ long[] longs = new long[bytes.length / Long.BYTES];
+
+ for (int i = 0; i < longs.length; i++) {
+ longs[i] = buf.getLong();
+ }
+
+ return longs;
+ }
+
+ case CHAR | ARRAY: {
+ char[] chars = new char[bytes.length / Character.BYTES];
+
+ for (int i = 0; i < chars.length; i++) {
+ chars[i] = buf.getChar();
+ }
+
+ return chars;
+ }
+
+ case FLOAT | ARRAY: {
+ float[] floats = new float[bytes.length / Float.BYTES];
+
+ for (int i = 0; i < floats.length; i++) {
+ floats[i] = buf.getFloat();
+ }
+
+ return floats;
+ }
+
+ case DOUBLE | ARRAY: {
+ double[] doubles = new double[bytes.length / Double.BYTES];
+
+ for (int i = 0; i < doubles.length; i++) {
+ doubles[i] = buf.getDouble();
+ }
+
+ return doubles;
+ }
+
+ case STRING | ARRAY: {
+ List<String> res = new ArrayList<>();
+
+ int offset = 1;
+
+ while (offset != bytes.length) {
+ int size = buf.getInt(offset);
+
+ res.add(new String(bytes, offset + Integer.BYTES, size, StandardCharsets.UTF_8));
+
+ offset += Integer.BYTES + size;
+ }
+
+ return res.toArray(EMPTY_STRING_ARRAY);
+ }
+
+ default:
+ throw new IllegalArgumentException(Arrays.toString(bytes));
+ }
+ }
+
+ private static ByteBuffer allocateBuffer(int size) {
+ return ByteBuffer.allocate(size).order(ByteOrder.LITTLE_ENDIAN);
+ }
+
+ private static byte header(Class<?> clazz) {
+ if (clazz == Boolean.class) {
+ return BOOLEAN;
+ } else if (clazz == Byte.class) {
+ return BYTE;
+ } else if (clazz == Short.class) {
+ return SHORT;
+ } else if (clazz == Integer.class) {
+ return INT;
+ } else if (clazz == Long.class) {
+ return LONG;
+ } else if (clazz == Character.class) {
+ return CHAR;
+ } else if (clazz == Float.class) {
+ return FLOAT;
+ } else if (clazz == Double.class) {
+ return DOUBLE;
+ } else if (clazz == String.class) {
+ return STRING;
+ } else if (clazz == boolean[].class) {
+ return BOOLEAN | ARRAY;
+ } else if (clazz == byte[].class) {
+ return BYTE | ARRAY;
+ } else if (clazz == short[].class) {
+ return SHORT | ARRAY;
+ } else if (clazz == int[].class) {
+ return INT | ARRAY;
+ } else if (clazz == long[].class) {
+ return LONG | ARRAY;
+ } else if (clazz == char[].class) {
+ return CHAR | ARRAY;
+ } else if (clazz == float[].class) {
+ return FLOAT | ARRAY;
+ } else if (clazz == double[].class) {
+ return DOUBLE | ARRAY;
+ } else if (clazz == String[].class) {
+ return STRING | ARRAY;
+ } else {
+ throw new IllegalArgumentException(clazz.getName());
+ }
+ }
+
+ private static byte[] compressBooleanArray(boolean[] value) {
+ boolean[] booleans = value;
+
+ int payloadSize = (booleans.length + Byte.SIZE - 1) / Byte.SIZE;
+
+ ByteBuffer buf = allocateBuffer(2 + payloadSize);
+
+ buf.put((byte) (BOOLEAN | ARRAY));
+
+ byte paddingLength = (byte) (payloadSize * Byte.SIZE - booleans.length);
+
+ buf.put(paddingLength);
+
+ byte[] res = buf.array();
+
+ for (int i = 0; i < booleans.length; i++) {
+ if (booleans[i]) {
+ res[2 + (i >>> 3)] |= (1 << (i & 7));
+ }
+ }
+
+ return res;
+ }
+
+ private static boolean[] decompressBooleanArray(byte[] bytes) {
+ byte paddingLength = bytes[1];
+
+ boolean[] booleans = new boolean[Byte.SIZE * (bytes.length - 2) - paddingLength];
+
+ for (int i = 0; i < booleans.length; i++) {
+ booleans[i] = (bytes[2 + (i >>> 3)] & (1 << (i & 7))) != 0;
+ }
+
+ return booleans;
+ }
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtilTest.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtilTest.java
new file mode 100644
index 0000000..74be203
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationSerializationUtilTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration.util;
+
+import static org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.fromBytes;
+import static org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil.toBytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.Serializable;
+import java.lang.reflect.Array;
+import org.junit.jupiter.api.Test;
+
+class ConfigurationSerializationUtilTest {
+ @Test
+ void testSingleValuesConsistency() {
+ assertSingleValue(Boolean.FALSE);
+ assertSingleValue(Boolean.TRUE);
+
+ assertSingleValue((byte) 123);
+
+ assertSingleValue((short) 0x1234);
+
+ assertSingleValue(0x12345678);
+
+ assertSingleValue(0x123456789ABCDEF0L);
+
+ assertSingleValue('F');
+
+ assertSingleValue(0.3f);
+
+ assertSingleValue(0.3d);
+
+ assertSingleValue("foo");
+ }
+
+ @Test
+ void testArraysConsistency() {
+ assertArray(new boolean[]{false, true});
+
+ assertArray(new byte[]{10, -10});
+
+ assertArray(new short[]{1000, -1000});
+
+ assertArray(new int[]{1000_000, -1000_000});
+
+ assertArray(new long[]{1000_000_000_000L, -1000_000_000_000L});
+
+ assertArray(new char[]{'X', 'Y'});
+
+ assertArray(new float[]{0.1f, -0.1f});
+
+ assertArray(new double[]{0.1d, -0.1d});
+
+ assertArray(new String[]{"foo", "bar"});
+ }
+
+ private void assertSingleValue(Object value) {
+ assertEquals(value, fromBytes(toBytes(value)));
+ }
+
+ private void assertArray(Object value) {
+ Serializable res = fromBytes(toBytes(value));
+
+ assertEquals(value.getClass(), res.getClass());
+
+ assertEquals(Array.getLength(value), Array.getLength(res));
+
+ for (int i = 0; i < Array.getLength(value); i++) {
+ assertEquals(Array.get(value, i), Array.get(res, i));
+ }
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index 634825b..683f597 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Conditions;
@@ -137,7 +138,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
- data.put(dataKey, (Serializable) ByteUtils.fromBytes(value));
+ data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
}
} catch (Exception e) {
throw new StorageException("Exception when closing a Meta Storage cursor", e);
@@ -152,7 +153,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
try {
Entry entry = metaStorageMgr.get(new ByteArray(DISTRIBUTED_PREFIX + key)).join();
- return entry.value() == null ? null : (Serializable) ByteUtils.fromBytes(entry.value());
+ return entry.value() == null ? null : ConfigurationSerializationUtil.fromBytes(entry.value());
} catch (Exception e) {
throw new StorageException("Exception while reading data from Meta Storage", e);
}
@@ -181,7 +182,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
String dataKey = key.toString().substring(DISTRIBUTED_PREFIX.length());
- data.put(dataKey, (Serializable) ByteUtils.fromBytes(value));
+ data.put(dataKey, ConfigurationSerializationUtil.fromBytes(value));
}
} catch (Exception e) {
throw new StorageException("Exception when closing a Vault cursor", e);
@@ -213,9 +214,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
ByteArray key = new ByteArray(DISTRIBUTED_PREFIX + entry.getKey());
if (entry.getValue() != null) {
- // TODO: investigate overhead when serialize int, long, double, boolean, string, arrays of above
- // TODO: https://issues.apache.org/jira/browse/IGNITE-14698
- operations.add(Operations.put(key, ByteUtils.toBytes(entry.getValue())));
+ operations.add(Operations.put(key, ConfigurationSerializationUtil.toBytes(entry.getValue())));
} else {
operations.add(Operations.remove(key));
}
@@ -270,7 +269,7 @@ public class DistributedConfigurationStorage implements ConfigurationStorage {
} else {
String key = e.key().toString().substring(DISTRIBUTED_PREFIX.length());
- Serializable value = e.value() == null ? null : (Serializable) ByteUtils.fromBytes(e.value());
+ Serializable value = e.value() == null ? null : ConfigurationSerializationUtil.fromBytes(e.value());
data.put(key, value);
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
index 3be6bb6..43a6062 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/LocalConfigurationStorage.java
@@ -23,7 +23,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.configuration.annotation.ConfigurationType;
-import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.configuration.util.ConfigurationSerializationUtil;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
@@ -81,7 +81,7 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
try {
VaultEntry vaultEntry = vaultMgr.get(new ByteArray(LOC_PREFIX + key)).join();
- return vaultEntry.empty() ? null : (Serializable) ByteUtils.fromBytes(vaultEntry.value());
+ return vaultEntry.empty() ? null : ConfigurationSerializationUtil.fromBytes(vaultEntry.value());
} catch (Exception e) {
throw new StorageException("Exception while reading vault entry", e);
}
@@ -108,7 +108,7 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
// vault iterator should not return nulls as values
assert value != null;
- data.put(key, (Serializable) ByteUtils.fromBytes(value));
+ data.put(key, ConfigurationSerializationUtil.fromBytes(value));
}
} catch (Exception e) {
throw new StorageException("Exception when closing a Vault cursor", e);
@@ -135,7 +135,7 @@ public class LocalConfigurationStorage implements ConfigurationStorage {
for (Map.Entry<String, ? extends Serializable> e : newValues.entrySet()) {
ByteArray key = ByteArray.fromString(LOC_PREFIX + e.getKey());
- data.put(key, e.getValue() == null ? null : ByteUtils.toBytes(e.getValue()));
+ data.put(key, e.getValue() == null ? null : ConfigurationSerializationUtil.toBytes(e.getValue()));
}
Data entries = new Data(newValues, ver.incrementAndGet());