You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/02/15 18:47:21 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107522774


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so

Review Comment:
   Not sure if I can follow? We should return `null` and just make sure spotbug does not mess with us. What does it complain about?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   Should we call `make()` here? `value` should never be `null` for this case?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   The test does not really verify if we get "empty bytes"? I am also not sure if `""` is actually empty, as it would encode "string length as zero" ?
   
   I believe you want to test the boolean `isTombstone` flag here? Mabye we need a custom `Serde` for the value to force an empty byte array as serialization output?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so
+                    // we fail here during serialization too for consistency
+                    throw new SerializationException("BooleanSerializer does not support null");
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)

Review Comment:
   Ah. Here is the error. We should just add an exception for this error (cf `gradle/spotbugs-exclude.xml`)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Both `rawTimestamp.length` and `rawIsTombstone.length` are constants, right? Should we do `allocate(8L + 1L + nonNullRawValueLength)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org