You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/19 04:41:40 UTC
[kafka] 01/01: Add unit tests
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch kafka-3729-pr-from-ted
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 07ef6497789c28b19c3ba2e77c08afd57cd5f19b
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Apr 18 21:40:02 2019 -0700
Add unit tests
---
.../org/apache/kafka/streams/KafkaStreams.java | 8 +-
.../streams/processor/internals/SourceNode.java | 4 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 137 +++++++++++++++++++++
.../state/internals/MeteredKeyValueStoreTest.java | 51 +++++++-
.../state/internals/MeteredSessionStoreTest.java | 49 ++++++++
.../internals/MeteredTimestampWindowStoreTest.java | 77 +++++++++++-
.../MeteredTimestampedKeyValueStoreTest.java | 52 +++++++-
.../state/internals/MeteredWindowStoreTest.java | 75 ++++++++++-
8 files changed, 439 insertions(+), 14 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb7ca8a..34c8589 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -647,11 +647,11 @@ public class KafkaStreams implements AutoCloseable {
}
}
for (final SourceNode sn : sources) {
- if (sn.getKeyDeSerializer() != null) {
- sn.getKeyDeSerializer().configure(config.originals(), true);
+ if (sn.getKeyDeserializer() != null) {
+ sn.getKeyDeserializer().configure(config.originals(), true);
}
- if (sn.getValueDeSerializer() != null) {
- sn.getValueDeSerializer().configure(config.originals(), false);
+ if (sn.getValueDeserializer() != null) {
+ sn.getValueDeserializer().configure(config.originals(), false);
}
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index bcd6475..f80749b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,11 +52,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
this(name, topics, null, keyDeserializer, valDeserializer);
}
- public Deserializer getKeyDeSerializer() {
+ public Deserializer getKeyDeserializer() {
return keyDeserializer;
}
- public Deserializer getValueDeSerializer() {
+ public Deserializer getValueDeserializer() {
return valDeserializer;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 6b8b5b5..80e268d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -25,7 +25,10 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
@@ -33,7 +36,9 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -74,6 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -662,6 +674,131 @@ public class KafkaStreamsTest {
}
@SuppressWarnings("unchecked")
+ @Test
+ public void shouldInitializedUserSerdes() {
+ final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
+ mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
+ mockSourceValueDeserialzer.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ final Serde mockSourceKeySerde = mock(Serde.class);
+ final Serde mockSourceValueSerde = mock(Serde.class);
+ expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
+ expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
+
+
+
+ final Serializer mockThroughKeySerializer = mock(Serializer.class);
+ mockThroughKeySerializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Serializer mockThroughValueSerializer = mock(Serializer.class);
+ mockThroughValueSerializer.configure(anyObject(), eq(false));
+ expectLastCall();
+ final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
+ mockThroughKeyDeserializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
+ mockThroughValueDeserializer.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ final Serde mockThroughKeySerde = mock(Serde.class);
+ final Serde mockThroughValueSerde = mock(Serde.class);
+ expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
+ expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
+ expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
+ expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
+
+
+
+ final Serializer mockGroupedKeySerializer = mock(Serializer.class);
+ mockGroupedKeySerializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Serializer mockGroupedValueSerializer = mock(Serializer.class);
+ mockGroupedValueSerializer.configure(anyObject(), eq(false));
+ expectLastCall();
+ final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
+ mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
+ mockGroupedValueDeserializer.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ final Serde mockGroupedKeySerde = mock(Serde.class);
+ final Serde mockGroupedValueSerde = mock(Serde.class);
+ expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
+ expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
+ expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
+ expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
+
+
+
+ final Serializer mockOutputKeySerializer = mock(Serializer.class);
+ mockOutputKeySerializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Serializer mockOutputValueSerializer = mock(Serializer.class);
+ mockOutputValueSerializer.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ final Serde mockOutputKeySerde = mock(Serde.class);
+ final Serde mockOutputValueSerde = mock(Serde.class);
+ expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
+ expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
+
+
+
+ final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
+ mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
+ expectLastCall();
+ final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
+ mockGlobalValueDeserializer.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ final Serde mockGlobalKeySerde = mock(Serde.class);
+ final Serde mockGlobalValueSerde = mock(Serde.class);
+ expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
+ expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
+
+
+
+ builder
+ .stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
+ .through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
+ .selectKey(KeyValue::pair)
+ .groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
+ .count()
+ .toStream()
+ .to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
+ builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
+
+ replay(
+ mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
+ mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
+ mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
+ mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
+ mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
+ mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
+ mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
+
+ KafkaStreams kafkaStreams = null;
+ try {
+ kafkaStreams = new KafkaStreams(builder.build(), props);
+ } finally {
+ if (kafkaStreams != null) {
+ kafkaStreams.close();
+ }
+ }
+
+ verify(
+ mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
+ mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
+ mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
+ mockOutputKeySerializer, mockOutputValueSerializer,
+ mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
+ }
+
+ @SuppressWarnings("unchecked")
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 5ef7ce7..07ea6da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
@@ -39,8 +40,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
-import java.util.List;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -52,6 +53,7 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -101,6 +103,53 @@ public class MeteredKeyValueStoreTest {
metered.init(context, metered);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+ metered = new MeteredKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ null,
+ null
+ );
+ final Serde mockSerde = mock(Serde.class);
+ replay(mockSerde);
+ expect(context.keySerde()).andReturn(mockSerde);
+ expect(context.valueSerde()).andReturn(mockSerde);
+
+ init();
+ verify(context, mockSerde);
+ }
+
+ @Test
+ public void shouldConfigureUserSerdes() {
+ final Serde<String> mockKeySerde = mock(Serde.class);
+ mockKeySerde.configure(anyObject(), eq(true));
+ expectLastCall();
+
+ final Serde<String> mockValueSerde = mock(Serde.class);
+ mockValueSerde.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ replay(mockKeySerde, mockValueSerde);
+
+ metered = new MeteredKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ mockKeySerde,
+ mockValueSerde
+ );
+
+ reset(context);
+ expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+ expect(context.taskId()).andReturn(taskId).anyTimes();
+
+ init();
+ verify(context, mockKeySerde, mockValueSerde);
+ }
+
@Test
public void testMetrics() {
init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 9a74ed7..72c84be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
@@ -54,6 +55,7 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -100,6 +102,53 @@ public class MeteredSessionStoreTest {
metered.init(context, metered);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+ metered = new MeteredSessionStore<>(
+ inner,
+ "scope",
+ null,
+ null,
+ new MockTime()
+ );
+ final Serde mockSerde = mock(Serde.class);
+ replay(mockSerde);
+ expect(context.keySerde()).andReturn(mockSerde);
+ expect(context.valueSerde()).andReturn(mockSerde);
+
+ init();
+ verify(context, mockSerde);
+ }
+
+ @Test
+ public void shouldConfigureUserSerdes() {
+ final Serde<String> mockKeySerde = mock(Serde.class);
+ mockKeySerde.configure(anyObject(), eq(true));
+ expectLastCall();
+
+ final Serde<String> mockValueSerde = mock(Serde.class);
+ mockValueSerde.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ replay(mockKeySerde, mockValueSerde);
+
+ metered = new MeteredSessionStore<>(
+ inner,
+ "scope",
+ mockKeySerde,
+ mockValueSerde,
+ new MockTime()
+ );
+
+ reset(context);
+ expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+ expect(context.taskId()).andReturn(taskId).anyTimes();
+
+ init();
+ verify(context, mockKeySerde, mockValueSerde);
+ }
+
@Test
public void testMetrics() {
init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
index a3522f3..5e9f274 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
@@ -34,13 +36,19 @@ import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertNull;
public class MeteredTimestampWindowStoreTest {
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
- private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
+ private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
innerStoreMock,
10L, // any size
"scope",
@@ -70,6 +78,73 @@ public class MeteredTimestampWindowStoreTest {
}
@Test
+ public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+ store = new MeteredTimestampedWindowStore<>(
+ innerStoreMock,
+ 10L, // any size
+ "scope",
+ new MockTime(),
+ null,
+ null
+ );
+
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+ final Serde mockSerde = mock(Serde.class);
+
+ new InternalMockProcessorContext(
+ TestUtils.tempDirectory(),
+ mockSerde,
+ mockSerde,
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ NoOpRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+ );
+ replay(mockSerde, innerStoreMock);
+
+ store.init(context, store);
+
+ verify(mockSerde);
+ }
+
+ @Test
+ public void shouldConfigureUserSerdes() {
+ final Serde<String> mockKeySerde = mock(Serde.class);
+ mockKeySerde.configure(anyObject(), eq(true));
+ expectLastCall();
+
+ final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+ mockValueSerde.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ store = new MeteredTimestampedWindowStore<>(
+ innerStoreMock,
+ 10L, // any size
+ "scope",
+ new MockTime(),
+ mockKeySerde,
+ mockValueSerde
+ );
+
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+ final Serde mockSerde = mock(Serde.class);
+
+ new InternalMockProcessorContext(
+ TestUtils.tempDirectory(),
+ mockSerde,
+ mockSerde,
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ NoOpRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+ );
+ replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+ store.init(context, store);
+ verify(mockKeySerde, mockValueSerde);
+ }
+
+ @Test
public void shouldCloseUnderlyingStore() {
innerStoreMock.close();
EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 587d369..a17b03a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
@@ -40,8 +41,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Collections;
-import java.util.List;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -53,6 +54,7 @@ import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -76,7 +78,6 @@ public class MeteredTimestampedKeyValueStoreTest {
private MeteredTimestampedKeyValueStore<String, String> metered;
private final String key = "key";
private final Bytes keyBytes = Bytes.wrap(key.getBytes());
- private final String value = "value";
private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
// timestamp is 97 what is ASCII of 'a'
private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
@@ -104,6 +105,53 @@ public class MeteredTimestampedKeyValueStoreTest {
metered.init(context, metered);
}
+ @SuppressWarnings("unchecked")
+ @Test
+ public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+ metered = new MeteredTimestampedKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ null,
+ null
+ );
+ final Serde mockSerde = mock(Serde.class);
+ replay(mockSerde);
+ expect(context.keySerde()).andReturn(mockSerde);
+ expect(context.valueSerde()).andReturn(mockSerde);
+
+ init();
+ verify(context, mockSerde);
+ }
+
+ @Test
+ public void shouldConfigureUserSerdes() {
+ final Serde<String> mockKeySerde = mock(Serde.class);
+ mockKeySerde.configure(anyObject(), eq(true));
+ expectLastCall();
+
+ final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+ mockValueSerde.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ replay(mockKeySerde, mockValueSerde);
+
+ metered = new MeteredTimestampedKeyValueStore<>(
+ inner,
+ "scope",
+ new MockTime(),
+ mockKeySerde,
+ mockValueSerde
+ );
+
+ reset(context);
+ expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+ expect(context.taskId()).andReturn(taskId).anyTimes();
+
+ init();
+ verify(context, mockKeySerde, mockValueSerde);
+ }
+
@Test
public void testMetrics() {
init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 962888a..96279b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
@@ -59,7 +59,7 @@ public class MeteredWindowStoreTest {
private InternalMockProcessorContext context;
@SuppressWarnings("unchecked")
private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
- private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+ private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
innerStoreMock,
10L, // any size
"scope",
@@ -89,6 +89,73 @@ public class MeteredWindowStoreTest {
}
@Test
+ public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+ store = new MeteredWindowStore<>(
+ innerStoreMock,
+ 10L, // any size
+ "scope",
+ new MockTime(),
+ null,
+ null
+ );
+
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+ final Serde mockSerde = mock(Serde.class);
+
+ new InternalMockProcessorContext(
+ TestUtils.tempDirectory(),
+ mockSerde,
+ mockSerde,
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ NoOpRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+ );
+ replay(mockSerde, innerStoreMock);
+
+ store.init(context, store);
+
+ verify(mockSerde);
+ }
+
+ @Test
+ public void shouldConfigureUserSerdes() {
+ final Serde<String> mockKeySerde = mock(Serde.class);
+ mockKeySerde.configure(anyObject(), eq(true));
+ expectLastCall();
+
+ final Serde<String> mockValueSerde = mock(Serde.class);
+ mockValueSerde.configure(anyObject(), eq(false));
+ expectLastCall();
+
+ store = new MeteredWindowStore<>(
+ innerStoreMock,
+ 10L, // any size
+ "scope",
+ new MockTime(),
+ mockKeySerde,
+ mockValueSerde
+ );
+
+ final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+ final Serde mockSerde = mock(Serde.class);
+
+ new InternalMockProcessorContext(
+ TestUtils.tempDirectory(),
+ mockSerde,
+ mockSerde,
+ streamsMetrics,
+ new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+ NoOpRecordCollector::new,
+ new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+ );
+ replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+ store.init(context, store);
+ verify(mockKeySerde, mockValueSerde);
+ }
+
+ @Test
public void testMetrics() {
replay(innerStoreMock);
store.init(context, store);
@@ -128,7 +195,7 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordFetchLatency() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+ expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
replay(innerStoreMock);
store.init(context, store);
@@ -141,7 +208,7 @@ public class MeteredWindowStoreTest {
@Test
public void shouldRecordFetchRangeLatency() {
- expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+ expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyIterator());
replay(innerStoreMock);
store.init(context, store);