You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/21 01:10:47 UTC
[1/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
Repository: kafka
Updated Branches:
refs/heads/trunk e4ef8e664 -> 959cf09e8
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
new file mode 100644
index 0000000..80ad67f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -0,0 +1,676 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.WindowStoreUtil;
+import org.apache.kafka.test.MockProcessorContext;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class RocksDBWindowStoreTest {
+
+ private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
+ private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
+ private final int numSegments = 3;
+ private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
+ private final long retentionPeriod = segmentSize * (numSegments - 1);
+ private final long windowSize = 3;
+ private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
+
+ protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
+ StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
+ WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
+ }
+
+ @Test
+ public void testPutAndFetch() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutAndFetchBefore() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutAndFetchAfter() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime + 0L);
+ store.put(0, "zero");
+ context.setTime(startTime + 1L);
+ store.put(1, "one");
+ context.setTime(startTime + 2L);
+ store.put(2, "two");
+ context.setTime(startTime + 3L);
+ // (3, "three") is not put
+ context.setTime(startTime + 4L);
+ store.put(4, "four");
+ context.setTime(startTime + 5L);
+ store.put(5, "five");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
+
+ context.setTime(startTime + 3L);
+ store.put(2, "two+1");
+ context.setTime(startTime + 4L);
+ store.put(2, "two+2");
+ context.setTime(startTime + 5L);
+ store.put(2, "two+3");
+ context.setTime(startTime + 6L);
+ store.put(2, "two+4");
+ context.setTime(startTime + 7L);
+ store.put(2, "two+5");
+ context.setTime(startTime + 8L);
+ store.put(2, "two+6");
+
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
+ assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
+ assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
+ assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
+ assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
+ assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
+ assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
+ assertNull(entriesByKey.get(3));
+ assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
+ assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
+ assertNull(entriesByKey.get(6));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testPutSameKeyTimestamp() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ try {
+ long startTime = segmentSize - 4L;
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+ context.setTime(startTime);
+ store.put(0, "zero+");
+ context.setTime(startTime);
+ store.put(0, "zero++");
+
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
+ assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+
+ Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
+
+ assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
+
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testRolling() throws IOException {
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+ try {
+ long startTime = segmentSize * 2;
+ long incr = segmentSize / 2;
+
+ context.setTime(startTime);
+ store.put(0, "zero");
+ assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+ context.setTime(startTime + incr);
+ store.put(1, "one");
+ assertEquals(Utils.mkSet(2L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 2);
+ store.put(2, "two");
+ assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 3);
+ // (3, "three") is not put
+ assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 4);
+ store.put(4, "four");
+ assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+ context.setTime(startTime + incr * 5);
+ store.put(5, "five");
+ assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
+
+ assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+
+ context.setTime(startTime + incr * 6);
+ store.put(6, "six");
+ assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+
+
+ context.setTime(startTime + incr * 7);
+ store.put(7, "seven");
+ assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+
+ context.setTime(startTime + incr * 8);
+ store.put(8, "eight");
+ assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+ // check segment directories
+ store.flush();
+ assertEquals(
+ Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+ segmentDirs(baseDir)
+ );
+ } finally {
+ store.close();
+ }
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+ }
+
+ @Test
+ public void testRestore() throws IOException {
+ final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
+ long startTime = segmentSize * 2;
+ long incr = segmentSize / 2;
+
+ File baseDir = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ try {
+ context.setTime(startTime);
+ store.put(0, "zero");
+ context.setTime(startTime + incr);
+ store.put(1, "one");
+ context.setTime(startTime + incr * 2);
+ store.put(2, "two");
+ context.setTime(startTime + incr * 3);
+ store.put(3, "three");
+ context.setTime(startTime + incr * 4);
+ store.put(4, "four");
+ context.setTime(startTime + incr * 5);
+ store.put(5, "five");
+ context.setTime(startTime + incr * 6);
+ store.put(6, "six");
+ context.setTime(startTime + incr * 7);
+ store.put(7, "seven");
+ context.setTime(startTime + incr * 8);
+ store.put(8, "eight");
+ store.flush();
+
+ } finally {
+ store.close();
+ }
+
+
+ } finally {
+ Utils.delete(baseDir);
+ }
+
+ File baseDir2 = Files.createTempDirectory("test").toFile();
+ try {
+ Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
+ RecordCollector recordCollector = new RecordCollector(producer) {
+ @Override
+ public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
+ changeLog.add(new Entry<>(
+ keySerializer.serialize(record.topic(), record.key()),
+ valueSerializer.serialize(record.topic(), record.value()))
+ );
+ }
+ };
+
+ MockProcessorContext context = new MockProcessorContext(
+ null, baseDir,
+ byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
+ recordCollector);
+
+ WindowStore<Integer, String> store = createWindowStore(context, serdes);
+ RocksDBWindowStore<Integer, String> inner =
+ (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
+
+ try {
+ context.restore("window", changeLog);
+
+ assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
+
+ assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
+ assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
+ assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
+ assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
+ assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
+ assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
+ assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
+
+ // check segment directories
+ store.flush();
+ assertEquals(
+ Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
+ segmentDirs(baseDir)
+ );
+ } finally {
+ store.close();
+ }
+
+
+ } finally {
+ Utils.delete(baseDir2);
+ }
+ }
+
+ private <E> List<E> toList(WindowStoreIterator<E> iterator) {
+ ArrayList<E> list = new ArrayList<>();
+ while (iterator.hasNext()) {
+ list.add(iterator.next().value);
+ }
+ return list;
+ }
+
+ private Set<String> segmentDirs(File baseDir) {
+ File rocksDbDir = new File(baseDir, "rocksdb");
+ String[] subdirs = rocksDbDir.list();
+
+ HashSet<String> set = new HashSet<>();
+
+ for (String subdir : subdirs) {
+ if (subdir.startsWith("window-"))
+ set.add(subdir.substring(7));
+ }
+ return set;
+ }
+
+ private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
+ HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
+
+ for (Entry<byte[], byte[]> entry : changeLog) {
+ long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
+ Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
+ String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
+
+ Set<String> entries = entriesByKey.get(key);
+ if (entries == null) {
+ entries = new HashSet<>();
+ entriesByKey.put(key, entries);
+ }
+ entries.add(value + "@" + (timestamp - startTime));
+ }
+
+ return entriesByKey;
+ }
+}
[5/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
Posted by gu...@apache.org.
KAFKA-3121: Remove aggregatorSupplier and add Reduce functions
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #795 from guozhangwang/K3121s1
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/959cf09e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/959cf09e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/959cf09e
Branch: refs/heads/trunk
Commit: 959cf09e8653f4b8255f49c6f4c258ed1a5ec38e
Parents: e4ef8e6
Author: Guozhang Wang <wa...@gmail.com>
Authored: Wed Jan 20 16:10:43 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jan 20 16:10:43 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/examples/KTableJob.java | 111 ---
.../kafka/streams/kstream/Aggregator.java | 7 +-
.../streams/kstream/AggregatorSupplier.java | 23 -
.../org/apache/kafka/streams/kstream/Count.java | 36 +
.../apache/kafka/streams/kstream/KStream.java | 38 +-
.../apache/kafka/streams/kstream/KTable.java | 59 +-
.../apache/kafka/streams/kstream/Reducer.java | 23 +
.../apache/kafka/streams/kstream/SumAsLong.java | 36 +
.../kstream/internals/CountSupplier.java | 52 --
.../kstream/internals/KStreamAggregate.java | 4 +-
.../streams/kstream/internals/KStreamImpl.java | 122 ++--
.../kstream/internals/KStreamReduce.java | 167 +++++
.../kstream/internals/KTableAggregate.java | 4 +-
.../streams/kstream/internals/KTableImpl.java | 128 ++--
.../streams/kstream/internals/KTableReduce.java | 120 ++++
.../kstream/internals/KTableStoreSupplier.java | 4 +-
.../kstream/internals/LongSumSupplier.java | 52 --
.../streams/kstream/internals/TopKSupplier.java | 106 ---
.../internals/ProcessorStateManager.java | 2 +-
.../state/InMemoryKeyValueStoreSupplier.java | 155 -----
.../state/InMemoryLRUCacheStoreSupplier.java | 195 ------
.../streams/state/MeteredKeyValueStore.java | 250 -------
.../kafka/streams/state/MeteredWindowStore.java | 206 ------
.../kafka/streams/state/OffsetCheckpoint.java | 162 -----
.../state/RocksDBKeyValueStoreSupplier.java | 52 --
.../kafka/streams/state/RocksDBStore.java | 265 --------
.../kafka/streams/state/RocksDBWindowStore.java | 289 --------
.../state/RocksDBWindowStoreSupplier.java | 58 --
.../kafka/streams/state/StoreChangeLogger.java | 91 ---
.../org/apache/kafka/streams/state/Stores.java | 3 +
.../InMemoryKeyValueStoreSupplier.java | 159 +++++
.../InMemoryLRUCacheStoreSupplier.java | 199 ++++++
.../state/internals/MeteredKeyValueStore.java | 254 +++++++
.../state/internals/MeteredWindowStore.java | 209 ++++++
.../state/internals/OffsetCheckpoint.java | 162 +++++
.../internals/RocksDBKeyValueStoreSupplier.java | 53 ++
.../streams/state/internals/RocksDBStore.java | 269 ++++++++
.../state/internals/RocksDBWindowStore.java | 295 ++++++++
.../internals/RocksDBWindowStoreSupplier.java | 59 ++
.../state/internals/StoreChangeLogger.java | 92 +++
.../kstream/internals/KStreamAggregateTest.java | 36 +-
.../kstream/internals/KTableAggregateTest.java | 36 +-
.../internals/ProcessorStateManagerTest.java | 2 +-
.../processor/internals/StandbyTaskTest.java | 2 +-
.../state/AbstractKeyValueStoreTest.java | 191 ------
.../state/InMemoryKeyValueStoreTest.java | 48 --
.../state/InMemoryLRUCacheStoreTest.java | 156 -----
.../streams/state/KeyValueStoreTestDriver.java | 4 +-
.../streams/state/RocksDBKeyValueStoreTest.java | 50 --
.../streams/state/RocksDBWindowStoreTest.java | 671 ------------------
.../internals/AbstractKeyValueStoreTest.java | 195 ++++++
.../internals/InMemoryKeyValueStoreTest.java | 50 ++
.../internals/InMemoryLRUCacheStoreTest.java | 159 +++++
.../internals/RocksDBKeyValueStoreTest.java | 52 ++
.../state/internals/RocksDBWindowStoreTest.java | 676 +++++++++++++++++++
55 files changed, 3468 insertions(+), 3431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
deleted file mode 100644
index 45ff58e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KTableJob.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.HoppingWindows;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.kstream.Windowed;
-
-import java.util.Properties;
-
-public class KTableJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-ktable");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
-
- Serializer<String> stringSerializer = new StringSerializer();
- Deserializer<String> stringDeserializer = new StringDeserializer();
-
- KStreamBuilder builder = new KStreamBuilder();
-
- // stream aggregate
- KStream<String, Long> stream1 = builder.stream("topic1");
-
- @SuppressWarnings("unchecked")
- KTable<Windowed<String>, Long> wtable1 = stream1.sumByKey(new KeyValueToLongMapper<String, Long>() {
- @Override
- public long apply(String key, Long value) {
- return value;
- }
- }, HoppingWindows.of("window1").with(500L).every(500L).emit(1000L).until(1000L * 60 * 60 * 24 /* one day */), stringSerializer, stringDeserializer);
-
- // table aggregation
- KTable<String, String> table1 = builder.table("topic2");
-
- KTable<String, Long> table2 = table1.sum(new KeyValueMapper<String, String, String>() {
- @Override
- public String apply(String key, String value) {
- return value;
- }
- }, new KeyValueToLongMapper<String, String>() {
- @Override
- public long apply(String key, String value) {
- return Long.parseLong(value);
- }
- }, stringSerializer, stringDeserializer, "table2");
-
- // stream-table join
- KStream<String, Long> stream2 = stream1.leftJoin(table2, new ValueJoiner<Long, Long, Long>() {
- @Override
- public Long apply(Long value1, Long value2) {
- if (value2 == null)
- return 0L;
- else
- return value1 * value2;
- }
- });
-
- // table-table join
- KTable<String, String> table3 = table1.outerJoin(table2, new ValueJoiner<String, Long, String>() {
- @Override
- public String apply(String value1, Long value2) {
- if (value2 == null)
- return value1 + "-null";
- else if (value1 == null)
- return "null-" + value2;
- else
- return value1 + "-" + value2;
- }
- });
-
- wtable1.to("topic3");
-
- KafkaStreaming kstream = new KafkaStreaming(builder, config);
- kstream.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
index d715fbd..c601024 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Aggregator.java
@@ -21,7 +21,7 @@ public interface Aggregator<K, V, T> {
/**
* Set the initial aggregate value
*/
- T initialValue();
+ T initialValue(K aggKey);
/**
* When a new record with the aggregate key is added,
@@ -34,9 +34,4 @@ public interface Aggregator<K, V, T> {
* updating the aggregate value for this key
*/
T remove(K aggKey, V value, T aggregate);
-
- /**
- * Merge two aggregate values
- */
- T merge(T aggr1, T aggr2);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
deleted file mode 100644
index 6ed9125..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/AggregatorSupplier.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface AggregatorSupplier<K, V, T> {
-
- Aggregator<K, V, T> get();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
new file mode 100644
index 0000000..3c1ed46
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class Count<K> implements Aggregator<K, Long, Long> {
+
+ @Override
+ public Long initialValue(K aggKey) {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, Long value, Long aggregate) {
+ return aggregate + 1L;
+ }
+
+ @Override
+ public Long remove(K aggKey, Long value, Long aggregate) {
+ return aggregate - 1L;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 36741a8..dfed661 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorSupplier;
+
/**
* KStream is an abstraction of a stream of key-value pairs.
*
@@ -268,35 +269,28 @@ public interface KStream<K, V> {
/**
* Aggregate values of this stream by key on a window basis.
*
- * @param aggregatorSupplier the class of aggregatorSupplier
+ * @param reducer the class of Reducer
+ * @param windows the specification of the aggregation window
+ */
+ <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer);
+
+ /**
+ * Aggregate values of this stream by key on a window basis.
+ *
+ * @param aggregator the class of Aggregator
* @param windows the specification of the aggregation window
* @param <T> the value type of the aggregated table
*/
- <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
Deserializer<K> keyDeserializer,
Deserializer<T> aggValueDeserializer);
- /**
- * Sum extracted long integer values of this stream by key on a window basis.
- *
- * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Long> sumByKey(KeyValueToLongMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
- /**
- * Count number of records of this stream by key on a window basis.
- *
- * @param windows the specification of the aggregation window
- */
- <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer);
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 93eceec..87298d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -139,49 +139,42 @@ public interface KTable<K, V> {
<V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner);
/**
- * Aggregate values of this table by the selected key.
+ * Reduce values of this table by the selected key.
*
- * @param aggregatorSupplier the class of AggregatorSupplier
+ * @param addReducer the class of Reducer
+ * @param removeReducer the class of Reducer
* @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
* @param <K1> the key type of the aggregated table
* @param <V1> the value type of the aggregated table
* @return the instance of KTable
*/
- <K1, V1, T> KTable<K1, T> aggregate(AggregatorSupplier<K1, V1, T> aggregatorSupplier,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<T> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<T> aggValueDeserializer,
- String name);
-
- /**
- * Sum extracted long integer values of this table by the selected aggregation key
- *
- * @param keySelector the class of KeyValueMapper to select the aggregation key
- * @param valueSelector the class of KeyValueToLongMapper to extract the long integer from value
- * @param name the name of the resulted table
- */
- <K1> KTable<K1, Long> sum(KeyValueMapper<K, V, K1> keySelector,
- KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K1> keySerializer,
- Deserializer<K1> keyDeserializer,
- String name);
+ <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+ Reducer<V1> removeReducer,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ String name);
/**
- * Count number of records of this table by the selected aggregation key
+ * Aggregate values of this table by the selected key.
*
- * @param keySelector the class of KeyValueMapper to select the aggregation key
+ * @param aggregator the class of Aggregator
+ * @param selector the KeyValue mapper that select the aggregate key
* @param name the name of the resulted table
+ * @param <K1> the key type of the aggregated table
+ * @param <V1> the value type of the aggregated table
+ * @return the instance of KTable
*/
- <K1> KTable<K1, Long> count(KeyValueMapper<K, V, K1> keySelector,
- Serializer<K1> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V> valueDeserializer,
- String name);
-
+ <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
new file mode 100644
index 0000000..418f442
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Reducer.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public interface Reducer<V> {
+
+ V apply(V value1, V value2);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
new file mode 100644
index 0000000..1f8df04
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SumAsLong.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream;
+
+public class SumAsLong<K> implements Aggregator<K, Long, Long> {
+
+ @Override
+ public Long initialValue(K aggKey) {
+ return 0L;
+ }
+
+ @Override
+ public Long add(K aggKey, Long value, Long aggregate) {
+ return aggregate + value;
+ }
+
+ @Override
+ public Long remove(K aggKey, Long value, Long aggregate) {
+ return aggregate - value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
deleted file mode 100644
index b7dc5aa..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CountSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class CountSupplier<K, V> implements AggregatorSupplier<K, V, Long> {
-
- private class Count implements Aggregator<K, V, Long> {
-
- @Override
- public Long initialValue() {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, V value, Long aggregate) {
- return aggregate + 1;
- }
-
- @Override
- public Long remove(K aggKey, V value, Long aggregate) {
- return aggregate - 1;
- }
-
- @Override
- public Long merge(Long aggr1, Long aggr2) {
- return aggr1 + aggr2;
- }
- }
-
- @Override
- public Aggregator<K, V, Long> get() {
- return new Count();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 5745a03..91bfa9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -97,7 +97,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
T oldAgg = entry.value;
if (oldAgg == null)
- oldAgg = aggregator.initialValue();
+ oldAgg = aggregator.initialValue(key);
// try to add the new new value (there will never be old value)
T newAgg = aggregator.add(key, value, oldAgg);
@@ -119,7 +119,7 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces
// create the new window for the rest of unmatched window that do not exist yet
for (long windowStartMs : matchedWindows.keySet()) {
- T oldAgg = aggregator.initialValue();
+ T oldAgg = aggregator.initialValue(key);
T newAgg = aggregator.add(key, value, oldAgg);
windowStore.put(key, newAgg, windowStartMs);
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 691910b..ce89220 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -18,15 +18,13 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -39,7 +37,7 @@ import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.RocksDBWindowStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
import java.lang.reflect.Array;
@@ -48,47 +46,50 @@ import java.util.Set;
public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V> {
- private static final String FILTER_NAME = "KSTREAM-FILTER-";
+ private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
- private static final String MAP_NAME = "KSTREAM-MAP-";
+ private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
- private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
+ private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+
+ private static final String FILTER_NAME = "KSTREAM-FILTER-";
private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
- private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
+ public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
- private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+ public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
- private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
+ public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
- private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
+ private static final String MAP_NAME = "KSTREAM-MAP-";
- private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
+ private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
- private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
+ public static final String MERGE_NAME = "KSTREAM-MERGE-";
- private static final String SELECT_NAME = "KSTREAM-SELECT-";
+ public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
- private static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
+ public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
- public static final String SINK_NAME = "KSTREAM-SINK-";
+ private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
- public static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
+ private static final String REDUCE_NAME = "KSTREAM-REDUCE-";
- public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
+ private static final String SELECT_NAME = "KSTREAM-SELECT-";
- public static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
+ public static final String SINK_NAME = "KSTREAM-SINK-";
- public static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
+ public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
- public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
+ private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
- public static final String MERGE_NAME = "KSTREAM-MERGE-";
+ private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
+
+ private static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
- public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
public KStreamImpl(KStreamBuilder topology, String name, Set<String> sourceNodes) {
super(topology, name, sourceNodes);
@@ -394,7 +395,41 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
}
@Override
- public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(AggregatorSupplier<K, V, T> aggregatorSupplier,
+ public <W extends Window> KTable<Windowed<K>, V> reduceByKey(Reducer<V> reducer,
+ Windows<W> windows,
+ Serializer<K> keySerializer,
+ Serializer<V> aggValueSerializer,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> aggValueDeserializer) {
+
+ // TODO: this agg window operator is only used for casting K to Windowed<K> for
+ // KTableProcessorSupplier, which is a bit awkward and better be removed in the future
+ String reduceName = topology.newName(REDUCE_NAME);
+ String selectName = topology.newName(SELECT_NAME);
+
+ ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
+
+ RocksDBWindowStoreSupplier<K, V> aggregateStore =
+ new RocksDBWindowStoreSupplier<>(
+ windows.name(),
+ windows.maintainMs(),
+ windows.segments,
+ false,
+ new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
+ null);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(selectName, aggWindowSupplier, this.name);
+ topology.addProcessor(reduceName, aggregateSupplier, selectName);
+ topology.addStateStore(aggregateStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, aggregateSupplier, sourceNodes);
+ }
+
+ @Override
+ public <T, W extends Window> KTable<Windowed<K>, T> aggregateByKey(Aggregator<K, V, T> aggregator,
Windows<W> windows,
Serializer<K> keySerializer,
Serializer<T> aggValueSerializer,
@@ -407,7 +442,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
String selectName = topology.newName(SELECT_NAME);
ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
- ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregatorSupplier.get());
+ ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
RocksDBWindowStoreSupplier<K, T> aggregateStore =
new RocksDBWindowStoreSupplier<>(
@@ -426,39 +461,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
// return the KTable representation with the intermediate topic as the sources
return new KTableImpl<>(topology, aggregateName, aggregateSupplier, sourceNodes);
}
-
- @Override
- public <W extends Window> KTable<Windowed<K>, Long> sumByKey(final KeyValueToLongMapper<K, V> valueSelector,
- Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
-
- KStream<K, Long> selected = this.map(new KeyValueMapper<K, V, KeyValue<K, Long>>() {
- @Override
- public KeyValue<K, Long> apply(K key, V value) {
- return new KeyValue<>(key, valueSelector.apply(key, value));
- }
- });
-
- return selected.<Long, W>aggregateByKey(new LongSumSupplier<K>(),
- windows,
- keySerializer,
- new LongSerializer(),
- keyDeserializer,
- new LongDeserializer());
- }
-
-
- @Override
- public <W extends Window> KTable<Windowed<K>, Long> countByKey(Windows<W> windows,
- Serializer<K> keySerializer,
- Deserializer<K> keyDeserializer) {
-
- return this.<Long, W>aggregateByKey(new CountSupplier<K, V>(),
- windows,
- keySerializer,
- new LongSerializer(),
- keyDeserializer,
- new LongDeserializer());
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
new file mode 100644
index 0000000..7d6eb27
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.Windows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+import java.util.Iterator;
+import java.util.Map;
+
+public class KStreamReduce<K, V, W extends Window> implements KTableProcessorSupplier<Windowed<K>, V, V> {
+
+ private final String storeName;
+ private final Windows<W> windows;
+ private final Reducer<V> reducer;
+
+ private boolean sendOldValues = false;
+
+ public KStreamReduce(Windows<W> windows, String storeName, Reducer<V> reducer) {
+ this.windows = windows;
+ this.storeName = storeName;
+ this.reducer = reducer;
+ }
+
+ @Override
+ public Processor<Windowed<K>, Change<V>> get() {
+ return new KStreamAggregateProcessor();
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ private class KStreamAggregateProcessor extends AbstractProcessor<Windowed<K>, Change<V>> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(Windowed<K> windowedKey, Change<V> change) {
+ // first get the matching windows
+ long timestamp = windowedKey.window().start();
+ K key = windowedKey.value();
+ V value = change.newValue;
+
+ Map<Long, W> matchedWindows = windows.windowsFor(timestamp);
+
+ long timeFrom = Long.MAX_VALUE;
+ long timeTo = Long.MIN_VALUE;
+
+ // use range query on window store for efficient reads
+ for (long windowStartMs : matchedWindows.keySet()) {
+ timeFrom = windowStartMs < timeFrom ? windowStartMs : timeFrom;
+ timeTo = windowStartMs > timeTo ? windowStartMs : timeTo;
+ }
+
+ WindowStoreIterator<V> iter = windowStore.fetch(key, timeFrom, timeTo);
+
+ // for each matching window, try to update the corresponding key and send to the downstream
+ while (iter.hasNext()) {
+ KeyValue<Long, V> entry = iter.next();
+ W window = matchedWindows.get(entry.key);
+
+ if (window != null) {
+
+ V oldAgg = entry.value;
+ V newAgg = oldAgg;
+
+ // try to add the new new value (there will never be old value)
+ if (newAgg == null) {
+ newAgg = value;
+ } else {
+ newAgg = reducer.apply(newAgg, value);
+ }
+
+ // update the store with the new value
+ windowStore.put(key, newAgg, window.start());
+
+ // forward the aggregated change pair
+ if (sendOldValues)
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg));
+ else
+ context().forward(new Windowed<>(key, window), new Change<>(newAgg, null));
+
+ matchedWindows.remove(entry.key);
+ }
+ }
+
+ iter.close();
+
+ // create the new window for the rest of unmatched window that do not exist yet
+ for (long windowStartMs : matchedWindows.keySet()) {
+ windowStore.put(key, value, windowStartMs);
+
+ // send the new aggregate pair (there will be no old value)
+ context().forward(new Windowed<>(key, matchedWindows.get(windowStartMs)), new Change<>(value, null));
+ }
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<Windowed<K>, V> view() {
+
+ return new KTableValueGetterSupplier<Windowed<K>, V>() {
+
+ public KTableValueGetter<Windowed<K>, V> get() {
+ return new KStreamAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KStreamAggregateValueGetter implements KTableValueGetter<Windowed<K>, V> {
+
+ private WindowStore<K, V> windowStore;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public V get(Windowed<K> windowedKey) {
+ K key = windowedKey.value();
+ W window = (W) windowedKey.window();
+
+ // this iterator should only contain one element
+ Iterator<KeyValue<Long, V>> iter = windowStore.fetch(key, window.start(), window.start());
+
+ return iter.next().value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
index a5948f8..1730a8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
@@ -30,7 +30,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
private boolean sendOldValues = false;
- KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
+ public KTableAggregate(String storeName, Aggregator<K, V, T> aggregator) {
this.storeName = storeName;
this.aggregator = aggregator;
}
@@ -62,7 +62,7 @@ public class KTableAggregate<K, V, T> implements KTableProcessorSupplier<K, V, T
T oldAgg = store.get(key);
if (oldAgg == null)
- oldAgg = aggregator.initialValue();
+ oldAgg = aggregator.initialValue(key);
T newAgg = oldAgg;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 9888dff..8ee557c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -18,17 +18,15 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
+import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.KeyValueToLongMapper;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
@@ -48,31 +46,34 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
private static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
- private static final String FILTER_NAME = "KTABLE-FILTER-";
+ private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
- private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
+ private static final String FILTER_NAME = "KTABLE-FILTER-";
- private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
+ public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
- private static final String SELECT_NAME = "KTABLE-SELECT-";
+ public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
- private static final String AGGREGATE_NAME = "KTABLE-AGGREGATE-";
+ public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
- public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+ public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
- public static final String JOINTHIS_NAME = "KTABLE-JOINTHIS-";
+ private static final String MAPVALUES_NAME = "KTABLE-MAPVALUES-";
- public static final String JOINOTHER_NAME = "KTABLE-JOINOTHER-";
+ public static final String MERGE_NAME = "KTABLE-MERGE-";
public static final String OUTERTHIS_NAME = "KTABLE-OUTERTHIS-";
public static final String OUTEROTHER_NAME = "KTABLE-OUTEROTHER-";
- public static final String LEFTTHIS_NAME = "KTABLE-LEFTTHIS-";
+ private static final String REDUCE_NAME = "KTABLE-REDUCE-";
- public static final String LEFTOTHER_NAME = "KTABLE-LEFTOTHER-";
+ private static final String SELECT_NAME = "KTABLE-SELECT-";
+
+ public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+
+ private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
- public static final String MERGE_NAME = "KTABLE-MERGE-";
public final ProcessorSupplier<K, ?> processorSupplier;
@@ -245,15 +246,15 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1, V1, V2> KTable<K1, V2> aggregate(AggregatorSupplier<K1, V1, V2> aggregatorSupplier,
- KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
- Serializer<K1> keySerializer,
- Serializer<V1> valueSerializer,
- Serializer<V2> aggValueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V1> valueDeserializer,
- Deserializer<V2> aggValueDeserializer,
- String name) {
+ public <K1, V1, T> KTable<K1, T> aggregate(Aggregator<K1, V1, T> aggregator,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Serializer<T> aggValueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ Deserializer<T> aggValueDeserializer,
+ String name) {
String selectName = topology.newName(SELECT_NAME);
String sinkName = topology.newName(KStreamImpl.SINK_NAME);
@@ -267,7 +268,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregatorSupplier.get());
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableAggregate<>(name, aggregator);
StateStoreSupplier aggregateStore = Stores.create(name)
.withKeys(keySerializer, keyDeserializer)
@@ -295,55 +296,52 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
}
@Override
- public <K1> KTable<K1, Long> sum(final KeyValueMapper<K, V, K1> keySelector,
- final KeyValueToLongMapper<K, V> valueSelector,
- Serializer<K1> keySerializer,
- Deserializer<K1> keyDeserializer,
- String name) {
+ public <K1, V1> KTable<K1, V1> reduce(Reducer<V1> addReducer,
+ Reducer<V1> removeReducer,
+ KeyValueMapper<K, V, KeyValue<K1, V1>> selector,
+ Serializer<K1> keySerializer,
+ Serializer<V1> valueSerializer,
+ Deserializer<K1> keyDeserializer,
+ Deserializer<V1> valueDeserializer,
+ String name) {
- Serializer<Long> longSerializer = new LongSerializer();
- Deserializer<Long> longDeserializer = new LongDeserializer();
+ String selectName = topology.newName(SELECT_NAME);
+ String sinkName = topology.newName(KStreamImpl.SINK_NAME);
+ String sourceName = topology.newName(KStreamImpl.SOURCE_NAME);
+ String reduceName = topology.newName(REDUCE_NAME);
- KeyValueMapper<K, V, KeyValue<K1, Long>> mapper = new KeyValueMapper<K, V, KeyValue<K1, Long>>() {
- @Override
- public KeyValue<K1, Long> apply(K key, V value) {
- K1 aggKey = keySelector.apply(key, value);
- Long aggValue = valueSelector.apply(key, value);
+ String topic = name + REPARTITION_TOPIC_SUFFIX;
- return new KeyValue<>(aggKey, aggValue);
- }
- };
+ ChangedSerializer<V1> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+ ChangedDeserializer<V1> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
- return this.<K1, Long, Long>aggregate(new LongSumSupplier<K1>(), mapper,
- keySerializer, longSerializer, longSerializer,
- keyDeserializer, longDeserializer, longDeserializer,
- name);
- }
+ KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector);
- @Override
- public <K1> KTable<K1, Long> count(final KeyValueMapper<K, V, K1> keySelector,
- Serializer<K1> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K1> keyDeserializer,
- Deserializer<V> valueDeserializer,
- String name) {
+ ProcessorSupplier<K1, Change<V1>> aggregateSupplier = new KTableReduce<>(name, addReducer, removeReducer);
- Serializer<Long> longSerializer = new LongSerializer();
- Deserializer<Long> longDeserializer = new LongDeserializer();
+ StateStoreSupplier aggregateStore = Stores.create(name)
+ .withKeys(keySerializer, keyDeserializer)
+ .withValues(valueSerializer, valueDeserializer)
+ .localDatabase()
+ .build();
- KeyValueMapper<K, V, KeyValue<K1, V>> mapper = new KeyValueMapper<K, V, KeyValue<K1, V>>() {
- @Override
- public KeyValue<K1, V> apply(K key, V value) {
- K1 aggKey = keySelector.apply(key, value);
+ // select the aggregate key and values (old and new), it would require parent to send old values
+ topology.addProcessor(selectName, selectSupplier, this.name);
+ this.enableSendingOldValues();
- return new KeyValue<>(aggKey, value);
- }
- };
+ // send the aggregate key-value pairs to the intermediate topic for partitioning
+ topology.addInternalTopic(topic);
+ topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, selectName);
- return this.<K1, V, Long>aggregate(new CountSupplier<K1, V>(), mapper,
- keySerializer, valueSerializer, longSerializer,
- keyDeserializer, valueDeserializer, longDeserializer,
- name);
+ // read the intermediate topic
+ topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
+
+ // aggregate the values with the aggregator and local store
+ topology.addProcessor(reduceName, aggregateSupplier, sourceName);
+ topology.addStateStore(aggregateStore, reduceName);
+
+ // return the KTable representation with the intermediate topic as the sources
+ return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
new file mode 100644
index 0000000..0d1b55a
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableReduce.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+public class KTableReduce<K, V> implements KTableProcessorSupplier<K, V, V> {
+
+ private final String storeName;
+ private final Reducer<V> addReducer;
+ private final Reducer<V> removeReducer;
+
+ private boolean sendOldValues = false;
+
+ public KTableReduce(String storeName, Reducer<V> addReducer, Reducer<V> removeReducer) {
+ this.storeName = storeName;
+ this.addReducer = addReducer;
+ this.removeReducer = removeReducer;
+ }
+
+ @Override
+ public void enableSendingOldValues() {
+ sendOldValues = true;
+ }
+
+ @Override
+ public Processor<K, Change<V>> get() {
+ return new KTableAggregateProcessor();
+ }
+
+ private class KTableAggregateProcessor extends AbstractProcessor<K, Change<V>> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ super.init(context);
+
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public void process(K key, Change<V> value) {
+ V oldAgg = store.get(key);
+ V newAgg = oldAgg;
+
+ // first try to add the new new value
+ if (value.newValue != null) {
+ if (newAgg == null) {
+ newAgg = value.newValue;
+ } else {
+ newAgg = addReducer.apply(newAgg, value.newValue);
+ }
+ }
+
+ // then try to remove the old value
+ if (value.oldValue != null) {
+ newAgg = removeReducer.apply(newAgg, value.oldValue);
+ }
+
+ // update the store with the new value
+ store.put(key, newAgg);
+
+ // send the old / new pair
+ if (sendOldValues)
+ context().forward(key, new Change<>(newAgg, oldAgg));
+ else
+ context().forward(key, new Change<>(newAgg, null));
+ }
+ }
+
+ @Override
+ public KTableValueGetterSupplier<K, V> view() {
+
+ return new KTableValueGetterSupplier<K, V>() {
+
+ public KTableValueGetter<K, V> get() {
+ return new KTableAggregateValueGetter();
+ }
+
+ };
+ }
+
+ private class KTableAggregateValueGetter implements KTableValueGetter<K, V> {
+
+ private KeyValueStore<K, V> store;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(ProcessorContext context) {
+ store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+ }
+
+ @Override
+ public V get(K key) {
+ return store.get(key);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index d07fc5d..c993512 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.MeteredKeyValueStore;
-import org.apache.kafka.streams.state.RocksDBStore;
+import org.apache.kafka.streams.state.internals.MeteredKeyValueStore;
+import org.apache.kafka.streams.state.internals.RocksDBStore;
import org.apache.kafka.streams.state.Serdes;
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
deleted file mode 100644
index b66590e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/LongSumSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-
-public class LongSumSupplier<K> implements AggregatorSupplier<K, Long, Long> {
-
- private class LongSum implements Aggregator<K, Long, Long> {
-
- @Override
- public Long initialValue() {
- return 0L;
- }
-
- @Override
- public Long add(K aggKey, Long value, Long aggregate) {
- return aggregate + value;
- }
-
- @Override
- public Long remove(K aggKey, Long value, Long aggregate) {
- return aggregate - value;
- }
-
- @Override
- public Long merge(Long aggr1, Long aggr2) {
- return aggr1 + aggr2;
- }
- }
-
- @Override
- public Aggregator<K, Long, Long> get() {
- return new LongSum();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
deleted file mode 100644
index 00f4b55..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TopKSupplier.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
-import org.apache.kafka.streams.kstream.Aggregator;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-
-/**
- * NOTE: This is just a demo aggregate supplier that can be implemented by users to add their own built-in aggregates.
- * It is highly in-efficient and is not supposed to be merged in.
- */
-public class TopKSupplier<K, V extends Comparable<V>> implements AggregatorSupplier<K, V, Collection<V>> {
-
- private final int k;
-
- public TopKSupplier(int k) {
- this.k = k;
- }
-
- private class TopK implements Aggregator<K, V, Collection<V>> {
-
- private final Map<K, PriorityQueue<V>> sorted = new HashMap<>();
-
- @Override
- public Collection<V> initialValue() {
- return Collections.<V>emptySet();
- }
-
- @Override
- public Collection<V> add(K aggKey, V value, Collection<V> aggregate) {
- PriorityQueue<V> queue = sorted.get(aggKey);
- if (queue == null) {
- queue = new PriorityQueue<>();
- sorted.put(aggKey, queue);
- }
-
- queue.add(value);
-
- PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
-
- @Override
- public Collection<V> remove(K aggKey, V value, Collection<V> aggregate) {
- PriorityQueue<V> queue = sorted.get(aggKey);
-
- if (queue == null)
- throw new IllegalStateException("This should not happen.");
-
- queue.remove(value);
-
- PriorityQueue<V> copy = new PriorityQueue<>(queue);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
-
- @Override
- public Collection<V> merge(Collection<V> aggr1, Collection<V> aggr2) {
- PriorityQueue<V> copy = new PriorityQueue<>(aggr1);
- copy.addAll(aggr2);
-
- Set<V> ret = new HashSet<>();
- for (int i = 1; i <= k; i++)
- ret.add(copy.poll());
-
- return ret;
- }
- }
-
- @Override
- public Aggregator<K, V, Collection<V>> get() {
- return new TopK();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3cac3f1..547bb15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index d1f845c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
- }
-
- private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, V> map;
-
- public MemoryStore(String name) {
- super();
- this.name = name;
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- return this.map.remove(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
-
- public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index a346534..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final int capacity;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.capacity = capacity;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- store.removed(key);
- }
- });
- return store;
- }
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
-}
[3/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
new file mode 100644
index 0000000..8c3b437
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
new file mode 100644
index 0000000..8a600f9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.CompressionType;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteOptions;
+
+import java.io.File;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
+
+ private static final int TTL_NOT_USED = -1;
+
+ // TODO: these values should be configurable
+ private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+ private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
+ private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
+ private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
+ private static final long BLOCK_SIZE = 4096L;
+ private static final int TTL_SECONDS = TTL_NOT_USED;
+ private static final int MAX_WRITE_BUFFERS = 3;
+ private static final String DB_FILE_DIR = "rocksdb";
+
+ private final String name;
+
+ private final Options options;
+ private final WriteOptions wOptions;
+ private final FlushOptions fOptions;
+
+ private Serdes<K, V> serdes;
+ private ProcessorContext context;
+ protected File dbDir;
+ private RocksDB db;
+
+ public RocksDBStore(String name, Serdes<K, V> serdes) {
+ this.name = name;
+ this.serdes = serdes;
+
+ // initialize the rocksdb options
+ BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+ tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
+ tableConfig.setBlockSize(BLOCK_SIZE);
+
+ options = new Options();
+ options.setTableFormatConfig(tableConfig);
+ options.setWriteBufferSize(WRITE_BUFFER_SIZE);
+ options.setCompressionType(COMPRESSION_TYPE);
+ options.setCompactionStyle(COMPACTION_STYLE);
+ options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
+ options.setCreateIfMissing(true);
+ options.setErrorIfExists(false);
+
+ wOptions = new WriteOptions();
+ wOptions.setDisableWAL(true);
+
+ fOptions = new FlushOptions();
+ fOptions.setWaitForFlush(true);
+ }
+
+ public void init(ProcessorContext context) {
+ serdes.init(context);
+
+ this.context = context;
+ this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
+ this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+ }
+
+ private RocksDB openDB(File dir, Options options, int ttl) {
+ try {
+ if (ttl == TTL_NOT_USED) {
+ dir.getParentFile().mkdirs();
+ return RocksDB.open(options, dir.toString());
+ } else {
+ throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
+ // TODO: support TTL with change log?
+ // return TtlDB.open(options, dir.toString(), ttl, false);
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
+ }
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ try {
+ return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ try {
+ if (value == null) {
+ db.remove(wOptions, serdes.rawKey(key));
+ } else {
+ db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+ }
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = get(key);
+ put(key, null);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ RocksIterator innerIter = db.newIterator();
+ innerIter.seekToFirst();
+ return new RocksDbIterator<K, V>(innerIter, serdes);
+ }
+
+ @Override
+ public void flush() {
+ try {
+ db.flush(fOptions);
+ } catch (RocksDBException e) {
+ // TODO: this needs to be handled more accurately
+ throw new KafkaException("Error while executing flush from store " + this.name, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ flush();
+ db.close();
+ }
+
+ private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
+ private final RocksIterator iter;
+ private final Serdes<K, V> serdes;
+
+ public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
+ this.iter = iter;
+ this.serdes = serdes;
+ }
+
+ protected byte[] peekRawKey() {
+ return iter.key();
+ }
+
+ protected Entry<K, V> getEntry() {
+ return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.isValid();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Entry<K, V> entry = this.getEntry();
+ iter.next();
+ return entry;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("RocksDB iterator does not support remove");
+ }
+
+ @Override
+ public void close() {
+ iter.dispose();
+ }
+
+ }
+
+ private static class LexicographicComparator implements Comparator<byte[]> {
+
+ @Override
+ public int compare(byte[] left, byte[] right) {
+ for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+ int leftByte = left[i] & 0xff;
+ int rightByte = right[j] & 0xff;
+ if (leftByte != rightByte) {
+ return leftByte - rightByte;
+ }
+ }
+ return left.length - right.length;
+ }
+ }
+
+ private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
+ // RocksDB's JNI interface does not expose getters/setters that allow the
+ // comparator to be pluggable, and the default is lexicographic, so it's
+ // safe to just force lexicographic comparator here for now.
+ private final Comparator<byte[]> comparator = new LexicographicComparator();
+ byte[] rawToKey;
+
+ public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
+ K from, K to) {
+ super(iter, serdes);
+ iter.seek(serdes.rawKey(from));
+ this.rawToKey = serdes.rawKey(to);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
new file mode 100644
index 0000000..933ed91
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.streams.state.WindowStoreUtil;
+
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.SimpleTimeZone;
+
+public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
+
+ public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
+
+ private static final long USE_CURRENT_TIMESTAMP = -1L;
+
+ private static class Segment extends RocksDBStore<byte[], byte[]> {
+ public final long id;
+
+ Segment(String name, long id) {
+ super(name, WindowStoreUtil.INNER_SERDES);
+ this.id = id;
+ }
+
+ public void destroy() {
+ Utils.delete(dbDir);
+ }
+ }
+
+ private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
+ private final Serdes<?, V> serdes;
+ private final KeyValueIterator<byte[], byte[]>[] iterators;
+ private int index = 0;
+
+ RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
+ this(serdes, WindowStoreUtil.NO_ITERATORS);
+ }
+
+ RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
+ this.serdes = serdes;
+ this.iterators = iterators;
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (index < iterators.length) {
+ if (iterators[index].hasNext())
+ return true;
+
+ index++;
+ }
+ return false;
+ }
+
+ @Override
+ public KeyValue<Long, V> next() {
+ if (index >= iterators.length)
+ throw new NoSuchElementException();
+
+ Entry<byte[], byte[]> entry = iterators[index].next();
+
+ return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
+ serdes.valueFrom(entry.value()));
+ }
+
+ @Override
+ public void remove() {
+ if (index < iterators.length)
+ iterators[index].remove();
+ }
+
+ @Override
+ public void close() {
+ for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
+ iterator.close();
+ }
+ }
+ }
+
+ private final String name;
+ private final long segmentInterval;
+ private final boolean retainDuplicates;
+ private final Segment[] segments;
+ private final Serdes<K, V> serdes;
+ private final SimpleDateFormat formatter;
+
+ private ProcessorContext context;
+ private long currentSegmentId = -1L;
+ private int seqnum = 0;
+
+ public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
+ this.name = name;
+
+ // The segment interval must be greater than MIN_SEGMENT_INTERVAL
+ this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
+
+ this.segments = new Segment[numSegments];
+ this.serdes = serdes;
+
+ this.retainDuplicates = retainDuplicates;
+
+ // Create a date formatter. Formatted timestamps are used as segment name suffixes
+ this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
+ this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public boolean persistent() {
+ return true;
+ }
+
+ @Override
+ public void flush() {
+ for (Segment segment : segments) {
+ if (segment != null)
+ segment.flush();
+ }
+ }
+
+ @Override
+ public void close() {
+ for (Segment segment : segments) {
+ if (segment != null)
+ segment.close();
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
+ }
+
+ @Override
+ public void put(K key, V value, long timestamp) {
+ putAndReturnInternalKey(key, value, timestamp);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value, long t) {
+ long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
+
+ long segmentId = segmentId(timestamp);
+
+ if (segmentId > currentSegmentId) {
+ // A new segment will be created. Clean up old segments first.
+ currentSegmentId = segmentId;
+ cleanup();
+ }
+
+ // If the record is within the retention period, put it in the store.
+ if (segmentId > currentSegmentId - segments.length) {
+ if (retainDuplicates)
+ seqnum = (seqnum + 1) & 0x7FFFFFFF;
+ byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
+ getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
+ return binaryKey;
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+ long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+ if (segmentId > currentSegmentId) {
+ // A new segment will be created. Clean up old segments first.
+ currentSegmentId = segmentId;
+ cleanup();
+ }
+
+ // If the record is within the retention period, put it in the store.
+ if (segmentId > currentSegmentId - segments.length)
+ getSegment(segmentId).put(binaryKey, binaryValue);
+ }
+
+ @Override
+ public byte[] getInternal(byte[] binaryKey) {
+ long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
+
+ Segment segment = segments[(int) (segmentId % segments.length)];
+
+ if (segment != null && segment.id == segmentId) {
+ return segment.get(binaryKey);
+ } else {
+ return null;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+ long segFrom = segmentId(timeFrom);
+ long segTo = segmentId(Math.max(0L, timeTo));
+
+ byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
+ byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
+
+ ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
+
+ for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
+ Segment segment = segments[(int) (segmentId % segments.length)];
+
+ if (segment != null && segment.id == segmentId)
+ iterators.add(segment.range(binaryFrom, binaryUntil));
+ }
+
+ if (iterators.size() > 0) {
+ return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
+ } else {
+ return new RocksDBWindowStoreIterator<>(serdes);
+ }
+ }
+
+ private Segment getSegment(long segmentId) {
+ int index = (int) (segmentId % segments.length);
+
+ if (segments[index] == null) {
+ segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
+ segments[index].init(context);
+ }
+
+ return segments[index];
+ }
+
+ private void cleanup() {
+ for (int i = 0; i < segments.length; i++) {
+ if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
+ segments[i].close();
+ segments[i].destroy();
+ segments[i] = null;
+ }
+ }
+ }
+
+ public long segmentId(long timestamp) {
+ return timestamp / segmentInterval;
+ }
+
+ public String directorySuffix(long segmentId) {
+ return formatter.format(new Date(segmentId * segmentInterval));
+ }
+
+ // this method is used by a test
+ public Set<Long> segmentIds() {
+ HashSet<Long> segmentIds = new HashSet<>();
+
+ for (Segment segment : segments) {
+ if (segment != null)
+ segmentIds.add(segment.id);
+ }
+
+ return segmentIds;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
new file mode 100644
index 0000000..fa85ce9
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Serdes;
+
+/**
+ * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
+ *
+ * @param <K> the type of keys
+ * @param <V> the type of values
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final long retentionPeriod;
+ private final boolean retainDuplicates;
+ private final int numSegments;
+ private final Serdes serdes;
+ private final Time time;
+
+ public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.retentionPeriod = retentionPeriod;
+ this.retainDuplicates = retainDuplicates;
+ this.numSegments = numSegments;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
new file mode 100644
index 0000000..da5544c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.HashSet;
+import java.util.Set;
+
+public class StoreChangeLogger<K, V> {
+
+ public interface ValueGetter<K, V> {
+ V get(K key);
+ }
+
+ protected final Serdes<K, V> serialization;
+
+ private final Set<K> dirty;
+ private final Set<K> removed;
+ private final int maxDirty;
+ private final int maxRemoved;
+
+ private final String topic;
+ private int partition;
+ private ProcessorContext context;
+
+ // always wrap the logged store with the metered store
+ public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
+ this.topic = topic;
+ this.serialization = serialization;
+ this.context = context;
+ this.partition = context.id().partition;
+
+ this.dirty = new HashSet<>();
+ this.removed = new HashSet<>();
+ this.maxDirty = 100; // TODO: this needs to be configurable
+ this.maxRemoved = 100; // TODO: this needs to be configurable
+ }
+
+ public void add(K key) {
+ this.dirty.add(key);
+ this.removed.remove(key);
+ }
+
+ public void delete(K key) {
+ this.dirty.remove(key);
+ this.removed.add(key);
+ }
+
+ public void maybeLogChange(ValueGetter<K, V> getter) {
+ if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
+ logChange(getter);
+ }
+
+ public void logChange(ValueGetter<K, V> getter) {
+ RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
+ if (collector != null) {
+ Serializer<K> keySerializer = serialization.keySerializer();
+ Serializer<V> valueSerializer = serialization.valueSerializer();
+
+ for (K k : this.removed) {
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
+ }
+ for (K k : this.dirty) {
+ V v = getter.get(k);
+ collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
+ }
+ this.removed.clear();
+ this.dirty.clear();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
index ba596a9..ecc303d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.HoppingWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -43,34 +42,21 @@ public class KStreamAggregateTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();
- private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+ private class StringCanonizer implements Aggregator<String, String, String> {
- private class StringCanonizer implements Aggregator<String, String, String> {
-
- @Override
- public String initialValue() {
- return "0";
- }
-
- @Override
- public String add(String aggKey, String value, String aggregate) {
- return aggregate + "+" + value;
- }
-
- @Override
- public String remove(String aggKey, String value, String aggregate) {
- return aggregate + "-" + value;
- }
+ @Override
+ public String initialValue(String aggKey) {
+ return "0";
+ }
- @Override
- public String merge(String aggr1, String aggr2) {
- return "(" + aggr1 + ") + (" + aggr2 + ")";
- }
+ @Override
+ public String add(String aggKey, String value, String aggregate) {
+ return aggregate + "+" + value;
}
@Override
- public Aggregator<String, String, String> get() {
- return new StringCanonizer();
+ public String remove(String aggKey, String value, String aggregate) {
+ return aggregate + "-" + value;
}
}
@@ -83,7 +69,7 @@ public class KStreamAggregateTest {
String topic1 = "topic1";
KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
- KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizeSupplier(),
+ KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringCanonizer(),
HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
strSerializer,
strSerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index b5037ee..439aa09 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.AggregatorSupplier;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
@@ -41,34 +40,21 @@ public class KTableAggregateTest {
private final Serializer<String> strSerializer = new StringSerializer();
private final Deserializer<String> strDeserializer = new StringDeserializer();
- private class StringCanonizeSupplier implements AggregatorSupplier<String, String, String> {
+ private class StringCanonizer implements Aggregator<String, String, String> {
- private class StringCanonizer implements Aggregator<String, String, String> {
-
- @Override
- public String initialValue() {
- return "0";
- }
-
- @Override
- public String add(String aggKey, String value, String aggregate) {
- return aggregate + "+" + value;
- }
-
- @Override
- public String remove(String aggKey, String value, String aggregate) {
- return aggregate + "-" + value;
- }
+ @Override
+ public String initialValue(String aggKey) {
+ return "0";
+ }
- @Override
- public String merge(String aggr1, String aggr2) {
- return "(" + aggr1 + ") + (" + aggr2 + ")";
- }
+ @Override
+ public String add(String aggKey, String value, String aggregate) {
+ return aggregate + "+" + value;
}
@Override
- public Aggregator<String, String, String> get() {
- return new StringCanonizer();
+ public String remove(String aggKey, String value, String aggregate) {
+ return aggregate + "-" + value;
}
}
@@ -81,7 +67,7 @@ public class KTableAggregateTest {
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
- KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizeSupplier(),
+ KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringCanonizer(),
new NoOpKeyValueMapper<String, String>(),
strSerializer,
strSerializer,
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 5e336cc..bc6f71b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b2f45fd..85a8a15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.state.OffsetCheckpoint;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.junit.Before;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
deleted file mode 100644
index d40f308..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/AbstractKeyValueStoreTest.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.junit.Test;
-
-public abstract class AbstractKeyValueStoreTest {
-
- protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
- Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes);
-
- @Test
- public void testPutGetRange() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
- try {
-
- // Verify that the store reads and writes correctly ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(4, "four");
- store.put(5, "five");
- assertEquals(5, driver.sizeOf(store));
- assertEquals("zero", store.get(0));
- assertEquals("one", store.get(1));
- assertEquals("two", store.get(2));
- assertNull(store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertEquals("zero", driver.flushedEntryStored(0));
- assertEquals("one", driver.flushedEntryStored(1));
- assertEquals("two", driver.flushedEntryStored(2));
- assertEquals("four", driver.flushedEntryStored(4));
- assertEquals(null, driver.flushedEntryStored(5));
-
- assertEquals(false, driver.flushedEntryRemoved(0));
- assertEquals(false, driver.flushedEntryRemoved(1));
- assertEquals(false, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
-
- // Check range iteration ...
- try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
- while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
- else
- fail("Unexpected entry: " + entry);
- }
- }
-
- // Check range iteration ...
- try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
- while (iter.hasNext()) {
- Entry<Integer, String> entry = iter.next();
- if (entry.key().equals(2))
- assertEquals("two", entry.value());
- else if (entry.key().equals(4))
- assertEquals("four", entry.value());
- else
- fail("Unexpected entry: " + entry);
- }
- }
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testPutGetRangeWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
- try {
-
- // Verify that the store reads and writes correctly ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(4, "four");
- store.put(5, "five");
- assertEquals(5, driver.sizeOf(store));
- assertEquals("zero", store.get(0));
- assertEquals("one", store.get(1));
- assertEquals("two", store.get(2));
- assertNull(store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertEquals("zero", driver.flushedEntryStored(0));
- assertEquals("one", driver.flushedEntryStored(1));
- assertEquals("two", driver.flushedEntryStored(2));
- assertEquals("four", driver.flushedEntryStored(4));
- assertEquals(null, driver.flushedEntryStored(5));
-
- assertEquals(false, driver.flushedEntryRemoved(0));
- assertEquals(false, driver.flushedEntryRemoved(1));
- assertEquals(false, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testRestore() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(0, "zero");
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
- try {
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(4, driver.sizeOf(store));
- } finally {
- store.close();
- }
- }
-
- @Test
- public void testRestoreWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(0, "zero");
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
- try {
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(4, driver.sizeOf(store));
- } finally {
- store.close();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
deleted file mode 100644
index 2b90d0a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreTest.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
- @SuppressWarnings("unchecked")
- @Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- ProcessorContext context,
- Class<K> keyClass, Class<V> valueClass,
- boolean useContextSerdes) {
-
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- Serializer<K> keySer = (Serializer<K>) context.keySerializer();
- Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
- Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
- Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
- } else {
- supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
- }
-
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
- store.init(context);
- return store;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
deleted file mode 100644
index 81adfad..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.junit.Test;
-
-public class InMemoryLRUCacheStoreTest {
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPutGetRange() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
- StateStoreSupplier supplier = Stores.create("my-store")
- .withIntegerKeys().withStringValues()
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(3, "three");
- store.put(4, "four");
- store.put(5, "five");
-
- // It should only keep the last 4 added ...
- assertEquals(3, driver.sizeOf(store));
- assertNull(store.get(0));
- assertNull(store.get(1));
- assertNull(store.get(2));
- assertEquals("three", store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertNull(driver.flushedEntryStored(0));
- assertNull(driver.flushedEntryStored(1));
- assertNull(driver.flushedEntryStored(2));
- assertEquals("three", driver.flushedEntryStored(3));
- assertEquals("four", driver.flushedEntryStored(4));
- assertNull(driver.flushedEntryStored(5));
-
- assertEquals(true, driver.flushedEntryRemoved(0));
- assertEquals(true, driver.flushedEntryRemoved(1));
- assertEquals(true, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(3));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testPutGetRangeWithDefaultSerdes() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
-
- Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
- Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
- Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
- Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
- StateStoreSupplier supplier = Stores.create("my-store")
- .withKeys(keySer, keyDeser)
- .withValues(valSer, valDeser)
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
- store.put(0, "zero");
- store.put(1, "one");
- store.put(2, "two");
- store.put(3, "three");
- store.put(4, "four");
- store.put(5, "five");
-
- // It should only keep the last 4 added ...
- assertEquals(3, driver.sizeOf(store));
- assertNull(store.get(0));
- assertNull(store.get(1));
- assertNull(store.get(2));
- assertEquals("three", store.get(3));
- assertEquals("four", store.get(4));
- assertEquals("five", store.get(5));
- store.delete(5);
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
- assertNull(driver.flushedEntryStored(0));
- assertNull(driver.flushedEntryStored(1));
- assertNull(driver.flushedEntryStored(2));
- assertEquals("three", driver.flushedEntryStored(3));
- assertEquals("four", driver.flushedEntryStored(4));
- assertNull(driver.flushedEntryStored(5));
-
- assertEquals(true, driver.flushedEntryRemoved(0));
- assertEquals(true, driver.flushedEntryRemoved(1));
- assertEquals(true, driver.flushedEntryRemoved(2));
- assertEquals(false, driver.flushedEntryRemoved(3));
- assertEquals(false, driver.flushedEntryRemoved(4));
- assertEquals(true, driver.flushedEntryRemoved(5));
- }
-
- @Test
- public void testRestore() {
- // Create the test driver ...
- KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
-
- // Add any entries that will be restored to any store
- // that uses the driver's context ...
- driver.addEntryToRestoreLog(1, "one");
- driver.addEntryToRestoreLog(2, "two");
- driver.addEntryToRestoreLog(4, "four");
-
- // Create the store, which should register with the context and automatically
- // receive the restore entries ...
- StateStoreSupplier supplier = Stores.create("my-store")
- .withIntegerKeys().withStringValues()
- .inMemory().maxEntries(3)
- .build();
- KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
- store.init(driver.context());
-
- // Verify that the store's contents were properly restored ...
- assertEquals(0, driver.checkForRestoredEntries(store));
-
- // and there are no other entries ...
- assertEquals(3, driver.sizeOf(store));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 108797a..b0c9bd7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -48,7 +48,7 @@ import java.util.Set;
* A component that provides a {@link #context() ProcessingContext} that can be supplied to a {@link KeyValueStore} so that
* all entries written to the Kafka topic by the store during {@link KeyValueStore#flush()} are captured for testing purposes.
* This class simplifies testing of various {@link KeyValueStore} instances, especially those that use
- * {@link MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
+ * {@link org.apache.kafka.streams.state.internals.MeteredKeyValueStore} to monitor and write its entries to the Kafka topic.
* <p>
* <h2>Basic usage</h2>
* This component can be used to help test a {@link KeyValueStore}'s ability to read and write entries.
@@ -93,7 +93,7 @@ import java.util.Set;
* <p>
* <h2>Restoring a store</h2>
* This component can be used to test whether a {@link KeyValueStore} implementation properly
- * {@link ProcessorContext#register(StateStore, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
+ * {@link ProcessorContext#register(StateStore, boolean, StateRestoreCallback) registers itself} with the {@link ProcessorContext}, so that
* the persisted contents of a store are properly restored from the flushed entries when the store instance is started.
* <p>
* To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object, Object) add entries} that will be
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
deleted file mode 100644
index 20e92ef..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
-
- @SuppressWarnings("unchecked")
- @Override
- protected <K, V> KeyValueStore<K, V> createKeyValueStore(
- ProcessorContext context,
- Class<K> keyClass,
- Class<V> valueClass,
- boolean useContextSerdes) {
-
- StateStoreSupplier supplier;
- if (useContextSerdes) {
- Serializer<K> keySer = (Serializer<K>) context.keySerializer();
- Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
- Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
- Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
- supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
- } else {
- supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
- }
-
- KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
- store.init(context);
- return store;
-
- }
-}
[2/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
deleted file mode 100644
index fc7a4e9..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/state/RocksDBWindowStoreTest.java
+++ /dev/null
@@ -1,671 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.MockProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.test.MockProcessorContext;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class RocksDBWindowStoreTest {
-
- private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
- private final ByteArrayDeserializer byteArrayDeserializer = new ByteArrayDeserializer();
- private final int numSegments = 3;
- private final long segmentSize = RocksDBWindowStore.MIN_SEGMENT_INTERVAL;
- private final long retentionPeriod = segmentSize * (numSegments - 1);
- private final long windowSize = 3;
- private final Serdes<Integer, String> serdes = Serdes.withBuiltinTypes("", Integer.class, String.class);
-
- protected <K, V> WindowStore<K, V> createWindowStore(ProcessorContext context, Serdes<K, V> serdes) {
- StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>("window", retentionPeriod, numSegments, true, serdes, null);
- WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
- store.init(context);
- return store;
- }
-
- @Test
- public void testPutAndFetch() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- try {
- long startTime = segmentSize - 4L;
-
- context.setTime(startTime + 0L);
- store.put(0, "zero");
- context.setTime(startTime + 1L);
- store.put(1, "one");
- context.setTime(startTime + 2L);
- store.put(2, "two");
- context.setTime(startTime + 3L);
- // (3, "three") is not put
- context.setTime(startTime + 4L);
- store.put(4, "four");
- context.setTime(startTime + 5L);
- store.put(5, "five");
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L + windowSize)));
-
- context.setTime(startTime + 3L);
- store.put(2, "two+1");
- context.setTime(startTime + 4L);
- store.put(2, "two+2");
- context.setTime(startTime + 5L);
- store.put(2, "two+3");
- context.setTime(startTime + 6L);
- store.put(2, "two+4");
- context.setTime(startTime + 7L);
- store.put(2, "two+5");
- context.setTime(startTime + 8L);
- store.put(2, "two+6");
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L - windowSize, startTime - 2L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L + windowSize)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L + windowSize)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L + windowSize)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L + windowSize)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L + windowSize)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L + windowSize)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L + windowSize)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testPutAndFetchBefore() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- try {
- long startTime = segmentSize - 4L;
-
- context.setTime(startTime + 0L);
- store.put(0, "zero");
- context.setTime(startTime + 1L);
- store.put(1, "one");
- context.setTime(startTime + 2L);
- store.put(2, "two");
- context.setTime(startTime + 3L);
- // (3, "three") is not put
- context.setTime(startTime + 4L);
- store.put(4, "four");
- context.setTime(startTime + 5L);
- store.put(5, "five");
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L - windowSize, startTime + 0L)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L - windowSize, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L - windowSize, startTime + 3L)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L - windowSize, startTime + 4L)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L - windowSize, startTime + 5L)));
-
- context.setTime(startTime + 3L);
- store.put(2, "two+1");
- context.setTime(startTime + 4L);
- store.put(2, "two+2");
- context.setTime(startTime + 5L);
- store.put(2, "two+3");
- context.setTime(startTime + 6L);
- store.put(2, "two+4");
- context.setTime(startTime + 7L);
- store.put(2, "two+5");
- context.setTime(startTime + 8L);
- store.put(2, "two+6");
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 1L - windowSize, startTime - 1L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 0L - windowSize, startTime + 0L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 1L - windowSize, startTime + 1L)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L - windowSize, startTime + 2L)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime + 3L - windowSize, startTime + 3L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 4L - windowSize, startTime + 4L)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 5L - windowSize, startTime + 5L)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 6L - windowSize, startTime + 6L)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 7L - windowSize, startTime + 7L)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 8L - windowSize, startTime + 8L)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 9L - windowSize, startTime + 9L)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 10L - windowSize, startTime + 10L)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 11L - windowSize, startTime + 11L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L - windowSize, startTime + 12L)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 13L - windowSize, startTime + 13L)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testPutAndFetchAfter() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- try {
- long startTime = segmentSize - 4L;
-
- context.setTime(startTime + 0L);
- store.put(0, "zero");
- context.setTime(startTime + 1L);
- store.put(1, "one");
- context.setTime(startTime + 2L);
- store.put(2, "two");
- context.setTime(startTime + 3L);
- // (3, "three") is not put
- context.setTime(startTime + 4L);
- store.put(4, "four");
- context.setTime(startTime + 5L);
- store.put(5, "five");
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime + 0L, startTime + 0L + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + 1L, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + 3L, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + 4L, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + 5L, startTime + 5L + windowSize)));
-
- context.setTime(startTime + 3L);
- store.put(2, "two+1");
- context.setTime(startTime + 4L);
- store.put(2, "two+2");
- context.setTime(startTime + 5L);
- store.put(2, "two+3");
- context.setTime(startTime + 6L);
- store.put(2, "two+4");
- context.setTime(startTime + 7L);
- store.put(2, "two+5");
- context.setTime(startTime + 8L);
- store.put(2, "two+6");
-
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime - 2L, startTime - 2L + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime - 1L, startTime - 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1"), toList(store.fetch(2, startTime, startTime + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2"), toList(store.fetch(2, startTime + 1L, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("two", "two+1", "two+2", "two+3"), toList(store.fetch(2, startTime + 2L, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("two+1", "two+2", "two+3", "two+4"), toList(store.fetch(2, startTime + 3L, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList("two+2", "two+3", "two+4", "two+5"), toList(store.fetch(2, startTime + 4L, startTime + 4L + windowSize)));
- assertEquals(Utils.mkList("two+3", "two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 5L, startTime + 5L + windowSize)));
- assertEquals(Utils.mkList("two+4", "two+5", "two+6"), toList(store.fetch(2, startTime + 6L, startTime + 6L + windowSize)));
- assertEquals(Utils.mkList("two+5", "two+6"), toList(store.fetch(2, startTime + 7L, startTime + 7L + windowSize)));
- assertEquals(Utils.mkList("two+6"), toList(store.fetch(2, startTime + 8L, startTime + 8L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 9L, startTime + 9L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 10L, startTime + 10L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 11L, startTime + 11L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + 12L, startTime + 12L + windowSize)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0"), entriesByKey.get(0));
- assertEquals(Utils.mkSet("one@1"), entriesByKey.get(1));
- assertEquals(Utils.mkSet("two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"), entriesByKey.get(2));
- assertNull(entriesByKey.get(3));
- assertEquals(Utils.mkSet("four@4"), entriesByKey.get(4));
- assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5));
- assertNull(entriesByKey.get(6));
-
- } finally {
- store.close();
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testPutSameKeyTimestamp() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- try {
- long startTime = segmentSize - 4L;
-
- context.setTime(startTime);
- store.put(0, "zero");
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
-
- context.setTime(startTime);
- store.put(0, "zero");
- context.setTime(startTime);
- store.put(0, "zero+");
- context.setTime(startTime);
- store.put(0, "zero++");
-
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 1L - windowSize, startTime + 1L + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 2L - windowSize, startTime + 2L + windowSize)));
- assertEquals(Utils.mkList("zero", "zero", "zero+", "zero++"), toList(store.fetch(0, startTime + 3L - windowSize, startTime + 3L + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime + 4L - windowSize, startTime + 4L + windowSize)));
-
- // Flush the store and verify all current entries were properly flushed ...
- store.flush();
-
- Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
- assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
-
- } finally {
- store.close();
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testRolling() throws IOException {
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- RocksDBWindowStore<Integer, String> inner =
- (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
- try {
- long startTime = segmentSize * 2;
- long incr = segmentSize / 2;
-
- context.setTime(startTime);
- store.put(0, "zero");
- assertEquals(Utils.mkSet(2L), inner.segmentIds());
-
- context.setTime(startTime + incr);
- store.put(1, "one");
- assertEquals(Utils.mkSet(2L), inner.segmentIds());
-
- context.setTime(startTime + incr * 2);
- store.put(2, "two");
- assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
-
- context.setTime(startTime + incr * 3);
- // (3, "three") is not put
- assertEquals(Utils.mkSet(2L, 3L), inner.segmentIds());
-
- context.setTime(startTime + incr * 4);
- store.put(4, "four");
- assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
-
- context.setTime(startTime + incr * 5);
- store.put(5, "five");
- assertEquals(Utils.mkSet(2L, 3L, 4L), inner.segmentIds());
-
- assertEquals(Utils.mkList("zero"), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList("one"), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
-
- context.setTime(startTime + incr * 6);
- store.put(6, "six");
- assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
-
-
- context.setTime(startTime + incr * 7);
- store.put(7, "seven");
- assertEquals(Utils.mkSet(3L, 4L, 5L), inner.segmentIds());
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList("two"), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
-
- context.setTime(startTime + incr * 8);
- store.put(8, "eight");
- assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
- assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
- // check segment directories
- store.flush();
- assertEquals(
- Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
- segmentDirs(baseDir)
- );
- } finally {
- store.close();
- }
-
- } finally {
- Utils.delete(baseDir);
- }
- }
-
- @Test
- public void testRestore() throws IOException {
- final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>();
- long startTime = segmentSize * 2;
- long incr = segmentSize / 2;
-
- File baseDir = Files.createTempDirectory("test").toFile();
- try {
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- try {
- context.setTime(startTime);
- store.put(0, "zero");
- context.setTime(startTime + incr);
- store.put(1, "one");
- context.setTime(startTime + incr * 2);
- store.put(2, "two");
- context.setTime(startTime + incr * 3);
- store.put(3, "three");
- context.setTime(startTime + incr * 4);
- store.put(4, "four");
- context.setTime(startTime + incr * 5);
- store.put(5, "five");
- context.setTime(startTime + incr * 6);
- store.put(6, "six");
- context.setTime(startTime + incr * 7);
- store.put(7, "seven");
- context.setTime(startTime + incr * 8);
- store.put(8, "eight");
- store.flush();
-
- } finally {
- store.close();
- }
-
-
- } finally {
- Utils.delete(baseDir);
- }
-
- File baseDir2 = Files.createTempDirectory("test").toFile();
- try {
- Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer);
- RecordCollector recordCollector = new RecordCollector(producer) {
- @Override
- public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
- changeLog.add(new Entry<>(
- keySerializer.serialize(record.topic(), record.key()),
- valueSerializer.serialize(record.topic(), record.value()))
- );
- }
- };
-
- MockProcessorContext context = new MockProcessorContext(
- null, baseDir,
- byteArraySerializer, byteArrayDeserializer, byteArraySerializer, byteArrayDeserializer,
- recordCollector);
-
- WindowStore<Integer, String> store = createWindowStore(context, serdes);
- RocksDBWindowStore<Integer, String> inner =
- (RocksDBWindowStore<Integer, String>) ((MeteredWindowStore<Integer, String>) store).inner();
-
- try {
- context.restore("window", changeLog);
-
- assertEquals(Utils.mkSet(4L, 5L, 6L), inner.segmentIds());
-
- assertEquals(Utils.mkList(), toList(store.fetch(0, startTime - windowSize, startTime + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(1, startTime + incr - windowSize, startTime + incr + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(2, startTime + incr * 2 - windowSize, startTime + incr * 2 + windowSize)));
- assertEquals(Utils.mkList(), toList(store.fetch(3, startTime + incr * 3 - windowSize, startTime + incr * 3 + windowSize)));
- assertEquals(Utils.mkList("four"), toList(store.fetch(4, startTime + incr * 4 - windowSize, startTime + incr * 4 + windowSize)));
- assertEquals(Utils.mkList("five"), toList(store.fetch(5, startTime + incr * 5 - windowSize, startTime + incr * 5 + windowSize)));
- assertEquals(Utils.mkList("six"), toList(store.fetch(6, startTime + incr * 6 - windowSize, startTime + incr * 6 + windowSize)));
- assertEquals(Utils.mkList("seven"), toList(store.fetch(7, startTime + incr * 7 - windowSize, startTime + incr * 7 + windowSize)));
- assertEquals(Utils.mkList("eight"), toList(store.fetch(8, startTime + incr * 8 - windowSize, startTime + incr * 8 + windowSize)));
-
- // check segment directories
- store.flush();
- assertEquals(
- Utils.mkSet(inner.directorySuffix(4L), inner.directorySuffix(5L), inner.directorySuffix(6L)),
- segmentDirs(baseDir)
- );
- } finally {
- store.close();
- }
-
-
- } finally {
- Utils.delete(baseDir2);
- }
- }
-
- private <E> List<E> toList(WindowStoreIterator<E> iterator) {
- ArrayList<E> list = new ArrayList<>();
- while (iterator.hasNext()) {
- list.add(iterator.next().value);
- }
- return list;
- }
-
- private Set<String> segmentDirs(File baseDir) {
- File rocksDbDir = new File(baseDir, "rocksdb");
- String[] subdirs = rocksDbDir.list();
-
- HashSet<String> set = new HashSet<>();
-
- for (String subdir : subdirs) {
- if (subdir.startsWith("window-"))
- set.add(subdir.substring(7));
- }
- return set;
- }
-
- private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) {
- HashMap<Integer, Set<String>> entriesByKey = new HashMap<>();
-
- for (Entry<byte[], byte[]> entry : changeLog) {
- long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key());
- Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes);
- String value = entry.value() == null ? null : serdes.valueFrom(entry.value());
-
- Set<String> entries = entriesByKey.get(key);
- if (entries == null) {
- entries = new HashSet<>();
- entriesByKey.put(key, entries);
- }
- entries.add(value + "@" + (timestamp - startTime));
- }
-
- return entriesByKey;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
new file mode 100644
index 0000000..2ed698c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.junit.Test;
+
+public abstract class AbstractKeyValueStoreTest {
+
+ protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(ProcessorContext context,
+ Class<K> keyClass, Class<V> valueClass,
+ boolean useContextSerdes);
+
+ @Test
+ public void testPutGetRange() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ try {
+
+ // Verify that the store reads and writes correctly ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(4, "four");
+ store.put(5, "five");
+ assertEquals(5, driver.sizeOf(store));
+ assertEquals("zero", store.get(0));
+ assertEquals("one", store.get(1));
+ assertEquals("two", store.get(2));
+ assertNull(store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertEquals("zero", driver.flushedEntryStored(0));
+ assertEquals("one", driver.flushedEntryStored(1));
+ assertEquals("two", driver.flushedEntryStored(2));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertEquals(null, driver.flushedEntryStored(5));
+
+ assertEquals(false, driver.flushedEntryRemoved(0));
+ assertEquals(false, driver.flushedEntryRemoved(1));
+ assertEquals(false, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+
+ // Check range iteration ...
+ try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) {
+ while (iter.hasNext()) {
+ Entry<Integer, String> entry = iter.next();
+ if (entry.key().equals(2))
+ assertEquals("two", entry.value());
+ else if (entry.key().equals(4))
+ assertEquals("four", entry.value());
+ else
+ fail("Unexpected entry: " + entry);
+ }
+ }
+
+ // Check range iteration ...
+ try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) {
+ while (iter.hasNext()) {
+ Entry<Integer, String> entry = iter.next();
+ if (entry.key().equals(2))
+ assertEquals("two", entry.value());
+ else if (entry.key().equals(4))
+ assertEquals("four", entry.value());
+ else
+ fail("Unexpected entry: " + entry);
+ }
+ }
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void testPutGetRangeWithDefaultSerdes() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ try {
+
+ // Verify that the store reads and writes correctly ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(4, "four");
+ store.put(5, "five");
+ assertEquals(5, driver.sizeOf(store));
+ assertEquals("zero", store.get(0));
+ assertEquals("one", store.get(1));
+ assertEquals("two", store.get(2));
+ assertNull(store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertEquals("zero", driver.flushedEntryStored(0));
+ assertEquals("one", driver.flushedEntryStored(1));
+ assertEquals("two", driver.flushedEntryStored(2));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertEquals(null, driver.flushedEntryStored(5));
+
+ assertEquals(false, driver.flushedEntryRemoved(0));
+ assertEquals(false, driver.flushedEntryRemoved(1));
+ assertEquals(false, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void testRestore() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+ // Add any entries that will be restored to any store
+ // that uses the driver's context ...
+ driver.addEntryToRestoreLog(0, "zero");
+ driver.addEntryToRestoreLog(1, "one");
+ driver.addEntryToRestoreLog(2, "two");
+ driver.addEntryToRestoreLog(4, "four");
+
+ // Create the store, which should register with the context and automatically
+ // receive the restore entries ...
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, false);
+ try {
+ // Verify that the store's contents were properly restored ...
+ assertEquals(0, driver.checkForRestoredEntries(store));
+
+ // and there are no other entries ...
+ assertEquals(4, driver.sizeOf(store));
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void testRestoreWithDefaultSerdes() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+ // Add any entries that will be restored to any store
+ // that uses the driver's context ...
+ driver.addEntryToRestoreLog(0, "zero");
+ driver.addEntryToRestoreLog(1, "one");
+ driver.addEntryToRestoreLog(2, "two");
+ driver.addEntryToRestoreLog(4, "four");
+
+ // Create the store, which should register with the context and automatically
+ // receive the restore entries ...
+ KeyValueStore<Integer, String> store = createKeyValueStore(driver.context(), Integer.class, String.class, true);
+ try {
+ // Verify that the store's contents were properly restored ...
+ assertEquals(0, driver.checkForRestoredEntries(store));
+
+ // and there are no other entries ...
+ assertEquals(4, driver.sizeOf(store));
+ } finally {
+ store.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
new file mode 100644
index 0000000..2b0927e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+ ProcessorContext context,
+ Class<K> keyClass, Class<V> valueClass,
+ boolean useContextSerdes) {
+
+ StateStoreSupplier supplier;
+ if (useContextSerdes) {
+ Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+ Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+ Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+ Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+ supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).inMemory().build();
+ } else {
+ supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).inMemory().build();
+ }
+
+ KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
new file mode 100644
index 0000000..d7cc5b9
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.KeyValueStoreTestDriver;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.Test;
+
+public class InMemoryLRUCacheStoreTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPutGetRange() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+ StateStoreSupplier supplier = Stores.create("my-store")
+ .withIntegerKeys().withStringValues()
+ .inMemory().maxEntries(3)
+ .build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
+
+ // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(3, "three");
+ store.put(4, "four");
+ store.put(5, "five");
+
+ // It should only keep the last 4 added ...
+ assertEquals(3, driver.sizeOf(store));
+ assertNull(store.get(0));
+ assertNull(store.get(1));
+ assertNull(store.get(2));
+ assertEquals("three", store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertNull(driver.flushedEntryStored(0));
+ assertNull(driver.flushedEntryStored(1));
+ assertNull(driver.flushedEntryStored(2));
+ assertEquals("three", driver.flushedEntryStored(3));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertNull(driver.flushedEntryStored(5));
+
+ assertEquals(true, driver.flushedEntryRemoved(0));
+ assertEquals(true, driver.flushedEntryRemoved(1));
+ assertEquals(true, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(3));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testPutGetRangeWithDefaultSerdes() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create();
+
+ Serializer<Integer> keySer = (Serializer<Integer>) driver.context().keySerializer();
+ Deserializer<Integer> keyDeser = (Deserializer<Integer>) driver.context().keyDeserializer();
+ Serializer<String> valSer = (Serializer<String>) driver.context().valueSerializer();
+ Deserializer<String> valDeser = (Deserializer<String>) driver.context().valueDeserializer();
+ StateStoreSupplier supplier = Stores.create("my-store")
+ .withKeys(keySer, keyDeser)
+ .withValues(valSer, valDeser)
+ .inMemory().maxEntries(3)
+ .build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
+
+ // Verify that the store reads and writes correctly, keeping only the last 2 entries ...
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(2, "two");
+ store.put(3, "three");
+ store.put(4, "four");
+ store.put(5, "five");
+
+ // It should only keep the last 4 added ...
+ assertEquals(3, driver.sizeOf(store));
+ assertNull(store.get(0));
+ assertNull(store.get(1));
+ assertNull(store.get(2));
+ assertEquals("three", store.get(3));
+ assertEquals("four", store.get(4));
+ assertEquals("five", store.get(5));
+ store.delete(5);
+
+ // Flush the store and verify all current entries were properly flushed ...
+ store.flush();
+ assertNull(driver.flushedEntryStored(0));
+ assertNull(driver.flushedEntryStored(1));
+ assertNull(driver.flushedEntryStored(2));
+ assertEquals("three", driver.flushedEntryStored(3));
+ assertEquals("four", driver.flushedEntryStored(4));
+ assertNull(driver.flushedEntryStored(5));
+
+ assertEquals(true, driver.flushedEntryRemoved(0));
+ assertEquals(true, driver.flushedEntryRemoved(1));
+ assertEquals(true, driver.flushedEntryRemoved(2));
+ assertEquals(false, driver.flushedEntryRemoved(3));
+ assertEquals(false, driver.flushedEntryRemoved(4));
+ assertEquals(true, driver.flushedEntryRemoved(5));
+ }
+
+ @Test
+ public void testRestore() {
+ // Create the test driver ...
+ KeyValueStoreTestDriver<Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class, String.class);
+
+ // Add any entries that will be restored to any store
+ // that uses the driver's context ...
+ driver.addEntryToRestoreLog(1, "one");
+ driver.addEntryToRestoreLog(2, "two");
+ driver.addEntryToRestoreLog(4, "four");
+
+ // Create the store, which should register with the context and automatically
+ // receive the restore entries ...
+ StateStoreSupplier supplier = Stores.create("my-store")
+ .withIntegerKeys().withStringValues()
+ .inMemory().maxEntries(3)
+ .build();
+ KeyValueStore<Integer, String> store = (KeyValueStore<Integer, String>) supplier.get();
+ store.init(driver.context());
+
+ // Verify that the store's contents were properly restored ...
+ assertEquals(0, driver.checkForRestoredEntries(store));
+
+ // and there are no other entries ...
+ assertEquals(3, driver.sizeOf(store));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
new file mode 100644
index 0000000..29a3c0a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+
+public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <K, V> KeyValueStore<K, V> createKeyValueStore(
+ ProcessorContext context,
+ Class<K> keyClass,
+ Class<V> valueClass,
+ boolean useContextSerdes) {
+
+ StateStoreSupplier supplier;
+ if (useContextSerdes) {
+ Serializer<K> keySer = (Serializer<K>) context.keySerializer();
+ Deserializer<K> keyDeser = (Deserializer<K>) context.keyDeserializer();
+ Serializer<V> valSer = (Serializer<V>) context.valueSerializer();
+ Deserializer<V> valDeser = (Deserializer<V>) context.valueDeserializer();
+ supplier = Stores.create("my-store").withKeys(keySer, keyDeser).withValues(valSer, valDeser).localDatabase().build();
+ } else {
+ supplier = Stores.create("my-store").withKeys(keyClass).withValues(valueClass).localDatabase().build();
+ }
+
+ KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
+ store.init(context);
+ return store;
+
+ }
+}
[4/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add
Reduce functions
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
deleted file mode 100644
index 743a110..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.List;
-
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
-
- protected final KeyValueStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter getter;
- protected final Serdes<K, V> serialization;
- protected final String metricScope;
- protected final Time time;
-
- private Sensor putTime;
- private Sensor getTime;
- private Sensor deleteTime;
- private Sensor putAllTime;
- private Sensor allTime;
- private Sensor rangeTime;
- private Sensor flushTime;
- private Sensor restoreTime;
- private StreamingMetrics metrics;
-
- private boolean loggingEnabled = true;
- private StoreChangeLogger<K, V> changeLogger = null;
-
- // always wrap the store with the metered store
- public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
- this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
- public V get(K key) {
- return inner.get(key);
- }
- };
- this.serialization = serialization;
- this.metricScope = metricScope;
- this.time = time != null ? time : new SystemTime();
- }
-
- public MeteredKeyValueStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public void init(ProcessorContext context) {
- final String name = name();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
- this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
- this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
- this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
- this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
- this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
- this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
- this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
- serialization.init(context);
- this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
-
- // register and possibly restore the state from the logs
- long startNs = time.nanoseconds();
- inner.init(context);
- try {
- final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
- final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.put(keyDeserializer.deserialize(name, key),
- valDeserializer.deserialize(name, value));
- }
- });
- } finally {
- this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public V get(K key) {
- long startNs = time.nanoseconds();
- try {
- return this.inner.get(key);
- } finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void put(K key, V value) {
- long startNs = time.nanoseconds();
- try {
- this.inner.put(key, value);
-
- if (loggingEnabled) {
- changeLogger.add(key);
- changeLogger.maybeLogChange(this.getter);
- }
- } finally {
- this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- long startNs = time.nanoseconds();
- try {
- this.inner.putAll(entries);
-
- if (loggingEnabled) {
- for (Entry<K, V> entry : entries) {
- K key = entry.key();
- changeLogger.add(key);
- }
- changeLogger.maybeLogChange(this.getter);
- }
- } finally {
- this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public V delete(K key) {
- long startNs = time.nanoseconds();
- try {
- V value = this.inner.delete(key);
-
- removed(key);
-
- return value;
- } finally {
- this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
- }
- }
-
- /**
- * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
- * store.
- *
- * @param key the key for the entry that the inner store removed
- */
- protected void removed(K key) {
- if (loggingEnabled) {
- changeLogger.delete(key);
- changeLogger.maybeLogChange(this.getter);
- }
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public void flush() {
- long startNs = time.nanoseconds();
- try {
- this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
- } finally {
- this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
- }
- }
-
- private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
- private final KeyValueIterator<K1, V1> iter;
- private final Sensor sensor;
- private final long startNs;
-
- public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
- this.iter = iter;
- this.sensor = sensor;
- this.startNs = time.nanoseconds();
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K1, V1> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
deleted file mode 100644
index cfcfb00..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-
-public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
-
- protected final WindowStore<K, V> inner;
- protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
- protected final String metricScope;
- protected final Time time;
-
- private Sensor putTime;
- private Sensor getTime;
- private Sensor rangeTime;
- private Sensor flushTime;
- private Sensor restoreTime;
- private StreamingMetrics metrics;
-
- private boolean loggingEnabled = true;
- private StoreChangeLogger<byte[], byte[]> changeLogger = null;
-
- // always wrap the store with the metered store
- public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
- this.inner = inner;
- this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
- public byte[] get(byte[] key) {
- return inner.getInternal(key);
- }
- };
- this.metricScope = metricScope;
- this.time = time != null ? time : new SystemTime();
- }
-
- public MeteredWindowStore<K, V> disableLogging() {
- loggingEnabled = false;
- return this;
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public void init(ProcessorContext context) {
- final String name = name();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
- this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
- this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
- this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
- this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
- this.changeLogger = this.loggingEnabled ?
- new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
-
- // register and possibly restore the state from the logs
- long startNs = time.nanoseconds();
- inner.init(context);
- try {
- context.register(this, loggingEnabled, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.putInternal(key, value);
- }
- });
- } finally {
- this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
- return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
- }
-
- @Override
- public void put(K key, V value) {
- putAndReturnInternalKey(key, value, -1L);
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- putAndReturnInternalKey(key, value, timestamp);
- }
-
- @Override
- public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
- long startNs = time.nanoseconds();
- try {
- byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
-
- if (loggingEnabled) {
- changeLogger.add(binKey);
- changeLogger.maybeLogChange(this.getter);
- }
-
- return binKey;
- } finally {
- this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void putInternal(byte[] binaryKey, byte[] binaryValue) {
- inner.putInternal(binaryKey, binaryValue);
- }
-
- @Override
- public byte[] getInternal(byte[] binaryKey) {
- long startNs = time.nanoseconds();
- try {
- return this.inner.getInternal(binaryKey);
- } finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public void flush() {
- long startNs = time.nanoseconds();
- try {
- this.inner.flush();
-
- if (loggingEnabled)
- changeLogger.logChange(this.getter);
- } finally {
- this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
- }
- }
-
- private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
-
- private final WindowStoreIterator<E> iter;
- private final Sensor sensor;
- private final long startNs;
-
- public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
- this.iter = iter;
- this.sensor = sensor;
- this.startNs = time.nanoseconds();
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public KeyValue<Long, E> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
- }
- }
-
- }
-
- WindowStore<K, V> inner() {
- return inner;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
deleted file mode 100644
index d748aac..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following:
- * <pre>
- * <version>
- * <n>
- * <topic_name_1> <partition_1> <offset_1>
- * .
- * .
- * .
- * <topic_name_n> <partition_n> <offset_n>
- * </pre>
- * The first line contains a number designating the format version (currently 0), the get line contains
- * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
- * separated by spaces.
- */
-public class OffsetCheckpoint {
-
- private static final int VERSION = 0;
-
- private final File file;
- private final Object lock;
-
- public OffsetCheckpoint(File file) throws IOException {
- this.file = file;
- this.lock = new Object();
- }
-
- public void write(Map<TopicPartition, Long> offsets) throws IOException {
- synchronized (lock) {
- // write to temp file and then swap with the existing file
- File temp = new File(file.getAbsolutePath() + ".tmp");
-
- FileOutputStream fileOutputStream = new FileOutputStream(temp);
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
- try {
- writeIntLine(writer, VERSION);
- writeIntLine(writer, offsets.size());
-
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
- writeEntry(writer, entry.getKey(), entry.getValue());
-
- writer.flush();
- fileOutputStream.getFD().sync();
- } finally {
- writer.close();
- }
-
- Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
- }
- }
-
- private void writeIntLine(BufferedWriter writer, int number) throws IOException {
- writer.write(Integer.toString(number));
- writer.newLine();
- }
-
- private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
- writer.write(part.topic());
- writer.write(' ');
- writer.write(Integer.toString(part.partition()));
- writer.write(' ');
- writer.write(Long.toString(offset));
- writer.newLine();
- }
-
- public Map<TopicPartition, Long> read() throws IOException {
- synchronized (lock) {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- } catch (FileNotFoundException e) {
- return Collections.emptyMap();
- }
-
- try {
- int version = readInt(reader);
- switch (version) {
- case 0:
- int expectedSize = readInt(reader);
- Map<TopicPartition, Long> offsets = new HashMap<>();
- String line = reader.readLine();
- while (line != null) {
- String[] pieces = line.split("\\s+");
- if (pieces.length != 3)
- throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
- line));
-
- String topic = pieces[0];
- int partition = Integer.parseInt(pieces[1]);
- long offset = Long.parseLong(pieces[2]);
- offsets.put(new TopicPartition(topic, partition), offset);
- line = reader.readLine();
- }
- if (offsets.size() != expectedSize)
- throw new IOException(String.format("Expected %d entries but found only %d",
- expectedSize,
- offsets.size()));
- return offsets;
-
- default:
- throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
- }
- } finally {
- if (reader != null)
- reader.close();
- }
- }
- }
-
- private int readInt(BufferedReader reader) throws IOException {
- String line = reader.readLine();
- if (line == null)
- throw new EOFException("File ended prematurely.");
- int val = Integer.parseInt(line);
- return val;
- }
-
- public void delete() throws IOException {
- file.delete();
- }
-
- @Override
- public String toString() {
- return this.file.getAbsolutePath();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index 41314b9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final Serdes serdes;
- private final Time time;
-
- protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
deleted file mode 100644
index 62b9f2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
- private static final int TTL_NOT_USED = -1;
-
- // TODO: these values should be configurable
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
- private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
- private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
- private static final long BLOCK_SIZE = 4096L;
- private static final int TTL_SECONDS = TTL_NOT_USED;
- private static final int MAX_WRITE_BUFFERS = 3;
- private static final String DB_FILE_DIR = "rocksdb";
-
- private final String name;
-
- private final Options options;
- private final WriteOptions wOptions;
- private final FlushOptions fOptions;
-
- private Serdes<K, V> serdes;
- private ProcessorContext context;
- protected File dbDir;
- private RocksDB db;
-
- public RocksDBStore(String name, Serdes<K, V> serdes) {
- this.name = name;
- this.serdes = serdes;
-
- // initialize the rocksdb options
- BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
- tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
- tableConfig.setBlockSize(BLOCK_SIZE);
-
- options = new Options();
- options.setTableFormatConfig(tableConfig);
- options.setWriteBufferSize(WRITE_BUFFER_SIZE);
- options.setCompressionType(COMPRESSION_TYPE);
- options.setCompactionStyle(COMPACTION_STYLE);
- options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
- options.setCreateIfMissing(true);
- options.setErrorIfExists(false);
-
- wOptions = new WriteOptions();
- wOptions.setDisableWAL(true);
-
- fOptions = new FlushOptions();
- fOptions.setWaitForFlush(true);
- }
-
- public void init(ProcessorContext context) {
- serdes.init(context);
-
- this.context = context;
- this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
- this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
- }
-
- private RocksDB openDB(File dir, Options options, int ttl) {
- try {
- if (ttl == TTL_NOT_USED) {
- dir.getParentFile().mkdirs();
- return RocksDB.open(options, dir.toString());
- } else {
- throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
- // TODO: support TTL with change log?
- // return TtlDB.open(options, dir.toString(), ttl, false);
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
- }
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- try {
- return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
- }
- }
-
- @Override
- public void put(K key, V value) {
- try {
- if (value == null) {
- db.remove(wOptions, serdes.rawKey(key));
- } else {
- db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
- }
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = get(key);
- put(key, null);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- RocksIterator innerIter = db.newIterator();
- innerIter.seekToFirst();
- return new RocksDbIterator<K, V>(innerIter, serdes);
- }
-
- @Override
- public void flush() {
- try {
- db.flush(fOptions);
- } catch (RocksDBException e) {
- // TODO: this needs to be handled more accurately
- throw new KafkaException("Error while executing flush from store " + this.name, e);
- }
- }
-
- @Override
- public void close() {
- flush();
- db.close();
- }
-
- private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
- private final RocksIterator iter;
- private final Serdes<K, V> serdes;
-
- public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
- this.iter = iter;
- this.serdes = serdes;
- }
-
- protected byte[] peekRawKey() {
- return iter.key();
- }
-
- protected Entry<K, V> getEntry() {
- return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
- }
-
- @Override
- public boolean hasNext() {
- return iter.isValid();
- }
-
- @Override
- public Entry<K, V> next() {
- if (!hasNext())
- throw new NoSuchElementException();
-
- Entry<K, V> entry = this.getEntry();
- iter.next();
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("RocksDB iterator does not support remove");
- }
-
- @Override
- public void close() {
- iter.dispose();
- }
-
- }
-
- private static class LexicographicComparator implements Comparator<byte[]> {
-
- @Override
- public int compare(byte[] left, byte[] right) {
- for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
- int leftByte = left[i] & 0xff;
- int rightByte = right[j] & 0xff;
- if (leftByte != rightByte) {
- return leftByte - rightByte;
- }
- }
- return left.length - right.length;
- }
- }
-
- private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
- // RocksDB's JNI interface does not expose getters/setters that allow the
- // comparator to be pluggable, and the default is lexicographic, so it's
- // safe to just force lexicographic comparator here for now.
- private final Comparator<byte[]> comparator = new LexicographicComparator();
- byte[] rawToKey;
-
- public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
- K from, K to) {
- super(iter, serdes);
- iter.seek(serdes.rawKey(from));
- this.rawToKey = serdes.rawKey(to);
- }
-
- @Override
- public boolean hasNext() {
- return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
deleted file mode 100644
index 2f30712..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SimpleTimeZone;
-
-public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
-
- public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
-
- private static final long USE_CURRENT_TIMESTAMP = -1L;
-
- private static class Segment extends RocksDBStore<byte[], byte[]> {
- public final long id;
-
- Segment(String name, long id) {
- super(name, WindowStoreUtil.INNER_SERDES);
- this.id = id;
- }
-
- public void destroy() {
- Utils.delete(dbDir);
- }
- }
-
- private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
- private final Serdes<?, V> serdes;
- private final KeyValueIterator<byte[], byte[]>[] iterators;
- private int index = 0;
-
- RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
- this(serdes, WindowStoreUtil.NO_ITERATORS);
- }
-
- RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
- this.serdes = serdes;
- this.iterators = iterators;
- }
-
- @Override
- public boolean hasNext() {
- while (index < iterators.length) {
- if (iterators[index].hasNext())
- return true;
-
- index++;
- }
- return false;
- }
-
- @Override
- public KeyValue<Long, V> next() {
- if (index >= iterators.length)
- throw new NoSuchElementException();
-
- Entry<byte[], byte[]> entry = iterators[index].next();
-
- return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
- serdes.valueFrom(entry.value()));
- }
-
- @Override
- public void remove() {
- if (index < iterators.length)
- iterators[index].remove();
- }
-
- @Override
- public void close() {
- for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
- iterator.close();
- }
- }
- }
-
- private final String name;
- private final long segmentInterval;
- private final boolean retainDuplicates;
- private final Segment[] segments;
- private final Serdes<K, V> serdes;
- private final SimpleDateFormat formatter;
-
- private ProcessorContext context;
- private long currentSegmentId = -1L;
- private int seqnum = 0;
-
- public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
- this.name = name;
-
- // The segment interval must be greater than MIN_SEGMENT_INTERVAL
- this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
-
- this.segments = new Segment[numSegments];
- this.serdes = serdes;
-
- this.retainDuplicates = retainDuplicates;
-
- // Create a date formatter. Formatted timestamps are used as segment name suffixes
- this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
- this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- }
-
- @Override
- public boolean persistent() {
- return true;
- }
-
- @Override
- public void flush() {
- for (Segment segment : segments) {
- if (segment != null)
- segment.flush();
- }
- }
-
- @Override
- public void close() {
- for (Segment segment : segments) {
- if (segment != null)
- segment.close();
- }
- }
-
- @Override
- public void put(K key, V value) {
- putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- putAndReturnInternalKey(key, value, timestamp);
- }
-
- @Override
- public byte[] putAndReturnInternalKey(K key, V value, long t) {
- long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
-
- long segmentId = segmentId(timestamp);
-
- if (segmentId > currentSegmentId) {
- // A new segment will be created. Clean up old segments first.
- currentSegmentId = segmentId;
- cleanup();
- }
-
- // If the record is within the retention period, put it in the store.
- if (segmentId > currentSegmentId - segments.length) {
- if (retainDuplicates)
- seqnum = (seqnum + 1) & 0x7FFFFFFF;
- byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
- getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
- return binaryKey;
- } else {
- return null;
- }
- }
-
- @Override
- public void putInternal(byte[] binaryKey, byte[] binaryValue) {
- long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
- if (segmentId > currentSegmentId) {
- // A new segment will be created. Clean up old segments first.
- currentSegmentId = segmentId;
- cleanup();
- }
-
- // If the record is within the retention period, put it in the store.
- if (segmentId > currentSegmentId - segments.length)
- getSegment(segmentId).put(binaryKey, binaryValue);
- }
-
- @Override
- public byte[] getInternal(byte[] binaryKey) {
- long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
- Segment segment = segments[(int) (segmentId % segments.length)];
-
- if (segment != null && segment.id == segmentId) {
- return segment.get(binaryKey);
- } else {
- return null;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
- long segFrom = segmentId(timeFrom);
- long segTo = segmentId(Math.max(0L, timeTo));
-
- byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
- byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
-
- ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
-
- for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
- Segment segment = segments[(int) (segmentId % segments.length)];
-
- if (segment != null && segment.id == segmentId)
- iterators.add(segment.range(binaryFrom, binaryUntil));
- }
-
- if (iterators.size() > 0) {
- return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
- } else {
- return new RocksDBWindowStoreIterator<>(serdes);
- }
- }
-
- private Segment getSegment(long segmentId) {
- int index = (int) (segmentId % segments.length);
-
- if (segments[index] == null) {
- segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
- segments[index].init(context);
- }
-
- return segments[index];
- }
-
- private void cleanup() {
- for (int i = 0; i < segments.length; i++) {
- if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
- segments[i].close();
- segments[i].destroy();
- segments[i] = null;
- }
- }
- }
-
- public long segmentId(long timestamp) {
- return timestamp / segmentInterval;
- }
-
- public String directorySuffix(long segmentId) {
- return formatter.format(new Date(segmentId * segmentInterval));
- }
-
- // this method is used by a test
- public Set<Long> segmentIds() {
- HashSet<Long> segmentIds = new HashSet<>();
-
- for (Segment segment : segments) {
- if (segment != null)
- segmentIds.add(segment.id);
- }
-
- return segmentIds;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index fcdcb9b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final long retentionPeriod;
- private final boolean retainDuplicates;
- private final int numSegments;
- private final Serdes serdes;
- private final Time time;
-
- public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.retentionPeriod = retentionPeriod;
- this.retainDuplicates = retainDuplicates;
- this.numSegments = numSegments;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
deleted file mode 100644
index ee6624e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class StoreChangeLogger<K, V> {
-
- public interface ValueGetter<K, V> {
- V get(K key);
- }
-
- protected final Serdes<K, V> serialization;
-
- private final Set<K> dirty;
- private final Set<K> removed;
- private final int maxDirty;
- private final int maxRemoved;
-
- private final String topic;
- private int partition;
- private ProcessorContext context;
-
- // always wrap the logged store with the metered store
- public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
- this.topic = topic;
- this.serialization = serialization;
- this.context = context;
- this.partition = context.id().partition;
-
- this.dirty = new HashSet<>();
- this.removed = new HashSet<>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
- }
-
- public void add(K key) {
- this.dirty.add(key);
- this.removed.remove(key);
- }
-
- public void delete(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- }
-
- public void maybeLogChange(ValueGetter<K, V> getter) {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange(getter);
- }
-
- public void logChange(ValueGetter<K, V> getter) {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = getter.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 5452040..46b2592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -27,6 +27,9 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
/**
* Factory for creating key-value stores.
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..286db1b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final Serdes serdes;
+ private final Time time;
+
+ public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+ }
+
+ private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final NavigableMap<K, V> map;
+
+ public MemoryStore(String name) {
+ super();
+ this.name = name;
+ this.map = new TreeMap<>();
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ return this.map.remove(key);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<Map.Entry<K, V>> iter;
+
+ public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ Map.Entry<K, V> entry = iter.next();
+ return new Entry<>(entry.getKey(), entry.getValue());
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ }
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..6a38423
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+ private final String name;
+ private final int capacity;
+ private final Serdes serdes;
+ private final Time time;
+
+ public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+ this.name = name;
+ this.capacity = capacity;
+ this.serdes = serdes;
+ this.time = time;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public StateStore get() {
+ MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+ final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+ cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+ @Override
+ public void apply(K key, V value) {
+ store.removed(key);
+ }
+ });
+ return store;
+ }
+
+ private static interface EldestEntryRemovalListener<K, V> {
+ public void apply(K key, V value);
+ }
+
+ protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+ private final String name;
+ private final Map<K, V> map;
+ private final NavigableSet<K> keys;
+ private EldestEntryRemovalListener<K, V> listener;
+
+ public MemoryLRUCache(String name, final int maxCacheSize) {
+ this.name = name;
+ this.keys = new TreeSet<>();
+ // leave room for one extra entry to handle adding an entry before the oldest can be removed
+ this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ if (size() > maxCacheSize) {
+ K key = eldest.getKey();
+ keys.remove(key);
+ if (listener != null) listener.apply(key, eldest.getValue());
+ return true;
+ }
+ return false;
+ }
+ };
+ }
+
+ protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public boolean persistent() {
+ return false;
+ }
+
+ @Override
+ public V get(K key) {
+ return this.map.get(key);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ this.map.put(key, value);
+ this.keys.add(key);
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ for (Entry<K, V> entry : entries)
+ put(entry.key(), entry.value());
+ }
+
+ @Override
+ public V delete(K key) {
+ V value = this.map.remove(key);
+ this.keys.remove(key);
+ return value;
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+ }
+
+ @Override
+ public void flush() {
+ // do-nothing since it is in-memory
+ }
+
+ @Override
+ public void close() {
+ // do-nothing
+ }
+
+ private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+ private final Iterator<K> keys;
+ private final Map<K, V> entries;
+ private K lastKey;
+
+ public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+ this.keys = keys;
+ this.entries = entries;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keys.hasNext();
+ }
+
+ @Override
+ public Entry<K, V> next() {
+ lastKey = keys.next();
+ return new Entry<>(lastKey, entries.get(lastKey));
+ }
+
+ @Override
+ public void remove() {
+ keys.remove();
+ entries.remove(lastKey);
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
new file mode 100644
index 0000000..21f73b0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.List;
+
+public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
+
+ protected final KeyValueStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter getter;
+ protected final Serdes<K, V> serialization;
+ protected final String metricScope;
+ protected final Time time;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor deleteTime;
+ private Sensor putAllTime;
+ private Sensor allTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
+ private boolean loggingEnabled = true;
+ private StoreChangeLogger<K, V> changeLogger = null;
+
+ // always wrap the store with the metered store
+ public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
+ this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+ public V get(K key) {
+ return inner.get(key);
+ }
+ };
+ this.serialization = serialization;
+ this.metricScope = metricScope;
+ this.time = time != null ? time : new SystemTime();
+ }
+
+ public MeteredKeyValueStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ final String name = name();
+ this.metrics = context.metrics();
+ this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+ this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
+ this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
+ this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
+ this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+ this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+ serialization.init(context);
+ this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
+
+ // register and possibly restore the state from the logs
+ long startNs = time.nanoseconds();
+ inner.init(context);
+ try {
+ final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
+ final Deserializer<V> valDeserializer = serialization.valueDeserializer();
+
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ inner.put(keyDeserializer.deserialize(name, key),
+ valDeserializer.deserialize(name, value));
+ }
+ });
+ } finally {
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public V get(K key) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.get(key);
+ } finally {
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void put(K key, V value) {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.put(key, value);
+
+ if (loggingEnabled) {
+ changeLogger.add(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+ } finally {
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void putAll(List<Entry<K, V>> entries) {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.putAll(entries);
+
+ if (loggingEnabled) {
+ for (Entry<K, V> entry : entries) {
+ K key = entry.key();
+ changeLogger.add(key);
+ }
+ changeLogger.maybeLogChange(this.getter);
+ }
+ } finally {
+ this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public V delete(K key) {
+ long startNs = time.nanoseconds();
+ try {
+ V value = this.inner.delete(key);
+
+ removed(key);
+
+ return value;
+ } finally {
+ this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
+ }
+ }
+
+ /**
+ * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+ * store.
+ *
+ * @param key the key for the entry that the inner store removed
+ */
+ protected void removed(K key) {
+ if (loggingEnabled) {
+ changeLogger.delete(key);
+ changeLogger.maybeLogChange(this.getter);
+ }
+ }
+
+ @Override
+ public KeyValueIterator<K, V> range(K from, K to) {
+ return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+ }
+
+ @Override
+ public KeyValueIterator<K, V> all() {
+ return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.flush();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.getter);
+ } finally {
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+ }
+ }
+
+ private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
+
+ private final KeyValueIterator<K1, V1> iter;
+ private final Sensor sensor;
+ private final long startNs;
+
+ public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<K1, V1> next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
new file mode 100644
index 0000000..821927d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
+
+ protected final WindowStore<K, V> inner;
+ protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+ protected final String metricScope;
+ protected final Time time;
+
+ private Sensor putTime;
+ private Sensor getTime;
+ private Sensor rangeTime;
+ private Sensor flushTime;
+ private Sensor restoreTime;
+ private StreamingMetrics metrics;
+
+ private boolean loggingEnabled = true;
+ private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+
+ // always wrap the store with the metered store
+ public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
+ this.inner = inner;
+ this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+ public byte[] get(byte[] key) {
+ return inner.getInternal(key);
+ }
+ };
+ this.metricScope = metricScope;
+ this.time = time != null ? time : new SystemTime();
+ }
+
+ public MeteredWindowStore<K, V> disableLogging() {
+ loggingEnabled = false;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return inner.name();
+ }
+
+ @Override
+ public void init(ProcessorContext context) {
+ final String name = name();
+ this.metrics = context.metrics();
+ this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+ this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+ this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+ this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+ this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+ this.changeLogger = this.loggingEnabled ?
+ new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
+
+ // register and possibly restore the state from the logs
+ long startNs = time.nanoseconds();
+ inner.init(context);
+ try {
+ context.register(this, loggingEnabled, new StateRestoreCallback() {
+ @Override
+ public void restore(byte[] key, byte[] value) {
+ inner.putInternal(key, value);
+ }
+ });
+ } finally {
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public boolean persistent() {
+ return inner.persistent();
+ }
+
+ @Override
+ public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+ return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
+ }
+
+ @Override
+ public void put(K key, V value) {
+ putAndReturnInternalKey(key, value, -1L);
+ }
+
+ @Override
+ public void put(K key, V value, long timestamp) {
+ putAndReturnInternalKey(key, value, timestamp);
+ }
+
+ @Override
+ public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+ long startNs = time.nanoseconds();
+ try {
+ byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
+
+ if (loggingEnabled) {
+ changeLogger.add(binKey);
+ changeLogger.maybeLogChange(this.getter);
+ }
+
+ return binKey;
+ } finally {
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+ inner.putInternal(binaryKey, binaryValue);
+ }
+
+ @Override
+ public byte[] getInternal(byte[] binaryKey) {
+ long startNs = time.nanoseconds();
+ try {
+ return this.inner.getInternal(binaryKey);
+ } finally {
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+ }
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public void flush() {
+ long startNs = time.nanoseconds();
+ try {
+ this.inner.flush();
+
+ if (loggingEnabled)
+ changeLogger.logChange(this.getter);
+ } finally {
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+ }
+ }
+
+ private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+ private final WindowStoreIterator<E> iter;
+ private final Sensor sensor;
+ private final long startNs;
+
+ public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<Long, E> next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ iter.remove();
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ }
+ }
+
+ }
+
+ WindowStore<K, V> inner() {
+ return inner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
new file mode 100644
index 0000000..e276f83
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following:
+ * <pre>
+ * <version>
+ * <n>
+ * <topic_name_1> <partition_1> <offset_1>
+ * .
+ * .
+ * .
+ * <topic_name_n> <partition_n> <offset_n>
+ * </pre>
+ * The first line contains a number designating the format version (currently 0), the get line contains
+ * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
+ * separated by spaces.
+ */
+public class OffsetCheckpoint {
+
+ private static final int VERSION = 0;
+
+ private final File file;
+ private final Object lock;
+
+ public OffsetCheckpoint(File file) throws IOException {
+ this.file = file;
+ this.lock = new Object();
+ }
+
+ public void write(Map<TopicPartition, Long> offsets) throws IOException {
+ synchronized (lock) {
+ // write to temp file and then swap with the existing file
+ File temp = new File(file.getAbsolutePath() + ".tmp");
+
+ FileOutputStream fileOutputStream = new FileOutputStream(temp);
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
+ try {
+ writeIntLine(writer, VERSION);
+ writeIntLine(writer, offsets.size());
+
+ for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+ writeEntry(writer, entry.getKey(), entry.getValue());
+
+ writer.flush();
+ fileOutputStream.getFD().sync();
+ } finally {
+ writer.close();
+ }
+
+ Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
+ }
+ }
+
+ private void writeIntLine(BufferedWriter writer, int number) throws IOException {
+ writer.write(Integer.toString(number));
+ writer.newLine();
+ }
+
+ private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
+ writer.write(part.topic());
+ writer.write(' ');
+ writer.write(Integer.toString(part.partition()));
+ writer.write(' ');
+ writer.write(Long.toString(offset));
+ writer.newLine();
+ }
+
+ public Map<TopicPartition, Long> read() throws IOException {
+ synchronized (lock) {
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new FileReader(file));
+ } catch (FileNotFoundException e) {
+ return Collections.emptyMap();
+ }
+
+ try {
+ int version = readInt(reader);
+ switch (version) {
+ case 0:
+ int expectedSize = readInt(reader);
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ String line = reader.readLine();
+ while (line != null) {
+ String[] pieces = line.split("\\s+");
+ if (pieces.length != 3)
+ throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
+ line));
+
+ String topic = pieces[0];
+ int partition = Integer.parseInt(pieces[1]);
+ long offset = Long.parseLong(pieces[2]);
+ offsets.put(new TopicPartition(topic, partition), offset);
+ line = reader.readLine();
+ }
+ if (offsets.size() != expectedSize)
+ throw new IOException(String.format("Expected %d entries but found only %d",
+ expectedSize,
+ offsets.size()));
+ return offsets;
+
+ default:
+ throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
+ }
+ } finally {
+ if (reader != null)
+ reader.close();
+ }
+ }
+ }
+
+ private int readInt(BufferedReader reader) throws IOException {
+ String line = reader.readLine();
+ if (line == null)
+ throw new EOFException("File ended prematurely.");
+ int val = Integer.parseInt(line);
+ return val;
+ }
+
+ public void delete() throws IOException {
+ file.delete();
+ }
+
+ @Override
+ public String toString() {
+ return this.file.getAbsolutePath();
+ }
+
+}