You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "vcrfxia (via GitHub)" <gi...@apache.org> on 2023/02/15 02:33:55 UTC

[GitHub] [kafka] vcrfxia opened a new pull request, #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

vcrfxia opened a new pull request, #13249:
URL: https://github.com/apache/kafka/pull/13249

   As part of introducing versioned key-value stores in [KIP-889](https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores), we'd like a way to represent a versioned key-value store (`VersionedKeyValueStore<Bytes, byte[]>`) as a regular key-value store (`KeyValueStore<Bytes, byte[]>`) in order to be compatible with existing DSL methods for passing key-value stores, e.g., [StreamsBuilder#table()](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java#L260) and [KTable methods](https://github.com/apache/kafka/blob/3012332e3d82947e434933efd4ab4e9366ab429d/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java#L153), which are explicitly typed to accept `Materialized<K, V, KeyValueStore<Bytes, byte[]>`. This way, we do not need to introduce new versions of all relevant StreamsBuilder and KTable methods to relax the Materialized type to accept versioned stores. 
   
   As part of representing a versioned key-value store as a regular key-value bytes store, we need a way to serialize a value and timestamp as a single byte array, where the value may be null (in order to represent putting a tombstone with timestamp into the versioned store). This PR introduces the serdes for doing so. See <> for its usage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107868286


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Well, I would rather let it crash with an exception right here to surface a potential bug instead of trying to be resilient and mask a potential bug (that we might hit on de-serialization later, making it harder to figure out where it came from).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107869144


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   Yes, that was the question. I was just not sure if it would do this. SG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107522774


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so

Review Comment:
   Not sure if I can follow? We should return `null` and just make sure spotbug does not mess with us. What does it complain about?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   Should we call `make()` here? `value` should never be `null` for this case?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   The test does not really verify if we get "empty bytes"? I am also not sure if `""` is actually empty, as it would encode "string length as zero" ?
   
   I believe you want to test the boolean `isTombstone` flag here? Mabye we need a custom `Serde` for the value to force an empty byte array as serialization output?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so
+                    // we fail here during serialization too for consistency
+                    throw new SerializationException("BooleanSerializer does not support null");
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)

Review Comment:
   Ah. Here is the error. We should just add an exception for this error (cf `gradle/spotbugs-exclude.xml`)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Both `rawTimestamp.length` and `rawIsTombstone.length` are constants, right? Should we do `allocate(8L + 1L + nonNullRawValueLength)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107868729


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   I see. Guess it's fine as-is, using `makeAllowNullable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107868729


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   I see. Guess it's fine as-is, using `makeAllowNullable`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13249:
URL: https://github.com/apache/kafka/pull/13249


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "vcrfxia (via GitHub)" <gi...@apache.org>.
vcrfxia commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107815109


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so

Review Comment:
   Resolved below.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}.
+ * <p>
+ * The serialized format is:
+ * <pre>
+ *     <timestamp> + <bool indicating whether value is null> + <raw value>
+ * </pre>
+ * where the boolean is needed in order to distinguish between null and empty values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde<V> extends WrappingNullableSerde<ValueAndTimestamp<V>, Void, V> {
+    public NullableValueAndTimestampSerde(final Serde<V> valueSerde) {
+        super(
+            new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+            new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+        );
+    }
+
+    static final class BooleanSerde {
+        private static final byte TRUE = 0x01;
+        private static final byte FALSE = 0x00;
+
+        static class BooleanSerializer implements Serializer<Boolean> {
+            @Override
+            public byte[] serialize(final String topic, final Boolean data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow deserialization so
+                    // we fail here during serialization too for consistency
+                    throw new SerializationException("BooleanSerializer does not support null");
+                }
+
+                return new byte[] {
+                    data ? TRUE : FALSE
+                };
+            }
+        }
+
+        static class BooleanDeserializer implements Deserializer<Boolean> {
+            @Override
+            public Boolean deserialize(final String topic, final byte[] data) {
+                if (data == null) {
+                    // actually want to return null here but spotbugs won't allow it (NP_BOOLEAN_RETURN_NULL)

Review Comment:
   OK, added an exception for this particular class.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer<V> implements WrappingNullableDeserializer<ValueAndTimestamp<V>, Void, V> {
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final Deserializer<Boolean> booleanDeserializer;
+
+    NullableValueAndTimestampDeserializer(final Deserializer<V> valueDeserializer) {
+        this.valueDeserializer = Objects.requireNonNull(valueDeserializer);
+        timestampDeserializer = new LongDeserializer();
+        booleanDeserializer = new BooleanDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+        booleanDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueAndTimestamp<V> deserialize(final String topic, final byte[] rawValueAndTimestamp) {
+        if (rawValueAndTimestamp == null) {
+            return null;
+        }
+
+        final long timestamp = timestampDeserializer.deserialize(topic, rawTimestamp(rawValueAndTimestamp));
+        final boolean isTombstone = booleanDeserializer.deserialize(topic, rawIsTombstone(rawValueAndTimestamp));
+        if (isTombstone) {
+            return ValueAndTimestamp.makeAllowNullable(null, timestamp);
+        } else {
+            final V value = valueDeserializer.deserialize(topic, rawValue(rawValueAndTimestamp));
+            return ValueAndTimestamp.makeAllowNullable(value, timestamp);

Review Comment:
   You're correct that `value` should never be null here, assuming valid deserializer implementations which never deserialize non-null into null. 
   
   Regarding updating this call to `make()`, `make()` does not throw if `value == null`, instead it returns `null`. In the event of a buggy deserializer implementation which returns null when it shouldn't, calling `make()` here and returning null will cause cascading failures elsewhere -- for example, the changelogging layer uses this deserializer to obtain the value to write to the changelog (see https://github.com/apache/kafka/pull/13251) and will throw an exception if it gets a null ValueAndTimestamp from the deserializer. 
   
   I suppose I can update this to `make()` and throw an explicit exception if `value == null`. I'll do that.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerdeTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes.StringSerde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class NullableValueAndTimestampSerdeTest {
+
+    private final static NullableValueAndTimestampSerde<String> SERDE = new NullableValueAndTimestampSerde<>(new StringSerde());
+    private final static Serializer<ValueAndTimestamp<String>> SERIALIZER = SERDE.serializer();
+    private final static Deserializer<ValueAndTimestamp<String>> DESERIALIZER = SERDE.deserializer();
+
+    @Test
+    public void shouldSerdeNull() {
+        assertThat(SERIALIZER.serialize(null, null), is(nullValue()));
+        assertThat(DESERIALIZER.deserialize(null, null), is(nullValue()));
+    }
+
+    @Test
+    public void shouldSerdeNonNull() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("foo", 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerdeNonNullWithNullValue() {
+        final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.makeAllowNullable(null, 10L);
+
+        final byte[] rawValueAndTimestamp = SERIALIZER.serialize(null, valueAndTimestamp);
+        assertThat(rawValueAndTimestamp, is(notNullValue()));
+
+        assertThat(DESERIALIZER.deserialize(null, rawValueAndTimestamp), is(valueAndTimestamp));
+    }
+
+    @Test
+    public void shouldSerializeNonNullWithEmptyBytes() {

Review Comment:
   StringSerializer serializes an empty string as `byte[0]`, which is exactly what this test case is for (`isTombstone` will be set to `false`, which is how `byte[0]` is distinguished from a null value). 
   
   Did I misunderstand your question?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Yes that's correct. I extracted the expected lengths into static variables and added the relevant checks in order to be defensive against serialization surprises.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107868286


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerializer.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanSerializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampSerializer<V> implements WrappingNullableSerializer<ValueAndTimestamp<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final Serializer<Boolean> booleanSerializer;
+
+    NullableValueAndTimestampSerializer(final Serializer<V> valueSerializer) {
+        this.valueSerializer = Objects.requireNonNull(valueSerializer);
+        timestampSerializer = new LongSerializer();
+        booleanSerializer = new BooleanSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+        booleanSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueAndTimestamp<V> data) {
+        if (data == null) {
+            return null;
+        }
+        final byte[] rawValue = valueSerializer.serialize(topic, data.value());
+        final byte[] rawIsTombstone = booleanSerializer.serialize(topic, rawValue == null);
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, data.timestamp());
+
+        final byte[] nonNullRawValue = rawValue == null ? new byte[0] : rawValue;
+        return ByteBuffer
+            .allocate(rawTimestamp.length + rawIsTombstone.length + nonNullRawValue.length)

Review Comment:
   Well, I would rather let it crash with an exception right here to surface a potential bug instead of trying to be resilient and mask a potential bug (that we might hit on de-serialization later, making it harder to figure out where it came from).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org