You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/29 13:25:27 UTC

[kafka] branch trunk updated: MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new da6063b  MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825)
da6063b is described below

commit da6063be4ff2499ea2c65fec2a7975e0cf48e7b7
Author: cadonna <br...@confluent.io>
AuthorDate: Wed May 29 15:25:07 2019 +0200

    MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825)
    
    Guozhang Wang <wa...@gmail.com>,  Bill Bejeck <bb...@gmail.com>
---
 .../MeteredTimestampedKeyValueStoreTest.java       | 52 ++++++++++++++++++++++
 1 file changed, 52 insertions(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f60d24..606428a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -57,6 +59,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @RunWith(EasyMockRunner.class)
 public class MeteredTimestampedKeyValueStoreTest {
@@ -254,4 +257,53 @@ public class MeteredTimestampedKeyValueStoreTest {
         return this.metrics.metric(metricName);
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
+        expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+        expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+        final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+        replay(inner, context);
+        store.init(context, inner);
+
+        try {
+            store.put("key", ValueAndTimestamp.make(42L, 60000));
+        } catch (final StreamsException exception) {
+            if (exception.getCause() instanceof ClassCastException) {
+                fail("Serdes are not correctly set from processor context.");
+            }
+            throw exception;
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
+        expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+        expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+        final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            Serdes.String(),
+            new ValueAndTimestampSerde<>(Serdes.Long())
+        );
+        replay(inner, context);
+        store.init(context, inner);
+
+        try {
+            store.put("key", ValueAndTimestamp.make(42L, 60000));
+        } catch (final StreamsException exception) {
+            if (exception.getCause() instanceof ClassCastException) {
+                fail("Serdes are not correctly set from constructor parameters.");
+            }
+            throw exception;
+        }
+    }
 }
\ No newline at end of file