You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2019/05/29 13:38:29 UTC
[kafka] branch 2.3 updated: MINOR: Add unit tests to verify setting
of serdes in timestamped key-value store (#6825)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 0e153ac MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825)
0e153ac is described below
commit 0e153ac734a63fd484673eb79f76dd71d2807a5d
Author: cadonna <br...@confluent.io>
AuthorDate: Wed May 29 15:25:07 2019 +0200
MINOR: Add unit tests to verify setting of serdes in timestamped key-value store (#6825)
Guozhang Wang <wa...@gmail.com>, Bill Bejeck <bb...@gmail.com>
---
.../MeteredTimestampedKeyValueStoreTest.java | 52 ++++++++++++++++++++++
1 file changed, 52 insertions(+)
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 0f60d24..606428a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,10 +21,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
@@ -57,6 +59,7 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
@RunWith(EasyMockRunner.class)
public class MeteredTimestampedKeyValueStoreTest {
@@ -254,4 +257,53 @@ public class MeteredTimestampedKeyValueStoreTest {
return this.metrics.metric(metricName);
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
+ expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+ expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+ final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ null,
+ null
+ );
+ replay(inner, context);
+ store.init(context, inner);
+
+ try {
+ store.put("key", ValueAndTimestamp.make(42L, 60000));
+ } catch (final StreamsException exception) {
+ if (exception.getCause() instanceof ClassCastException) {
+ fail("Serdes are not correctly set from processor context.");
+ }
+ throw exception;
+ }
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
+ expect(context.keySerde()).andStubReturn((Serde) Serdes.String());
+ expect(context.valueSerde()).andStubReturn((Serde) Serdes.Long());
+ final MeteredTimestampedKeyValueStore<String, Long> store = new MeteredTimestampedKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ Serdes.String(),
+ new ValueAndTimestampSerde<>(Serdes.Long())
+ );
+ replay(inner, context);
+ store.init(context, inner);
+
+ try {
+ store.put("key", ValueAndTimestamp.make(42L, 60000));
+ } catch (final StreamsException exception) {
+ if (exception.getCause() instanceof ClassCastException) {
+ fail("Serdes are not correctly set from constructor parameters.");
+ }
+ throw exception;
+ }
+ }
}
\ No newline at end of file