You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/06 20:09:55 UTC
[2/2] kafka git commit: KAFKA-4488: UnsupportedOperationException
during initialization of StandbyTask
KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask
Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes #2212 from dguy/standby-task
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abfee854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abfee854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abfee854
Branch: refs/heads/0.10.1
Commit: abfee8549b920ca88b008b59f651324ce0b0c05e
Parents: 02e75a2
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 6 11:49:54 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 12:09:34 2016 -0800
----------------------------------------------------------------------
.../processor/internals/RecordCollector.java | 113 ++---------------
.../internals/RecordCollectorImpl.java | 123 +++++++++++++++++++
.../processor/internals/StandbyContextImpl.java | 27 +++-
.../streams/processor/internals/StreamTask.java | 4 +-
.../QueryableStateIntegrationTest.java | 1 +
.../internals/RecordCollectorTest.java | 8 +-
.../processor/internals/SinkNodeTest.java | 117 ++----------------
.../processor/internals/StandbyTaskTest.java | 31 +++++
.../streams/state/KeyValueStoreTestDriver.java | 3 +-
.../state/internals/RocksDBWindowStoreTest.java | 25 ++--
.../state/internals/StoreChangeLoggerTest.java | 4 +-
.../apache/kafka/test/KStreamTestDriver.java | 4 +-
.../apache/kafka/test/MockProcessorContext.java | 6 +-
.../apache/kafka/test/NoOpRecordCollector.java | 4 +-
14 files changed, 232 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 63d6a3b..6d7d561 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -5,126 +5,39 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface RecordCollector {
+ <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
+
+ <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+ StreamPartitioner<K, V> partitioner);
-public class RecordCollector {
- private static final int MAX_SEND_ATTEMPTS = 3;
- private static final long SEND_RETRY_BACKOFF = 100L;
+ void flush();
+
+ void close();
/**
- * A supplier of a {@link RecordCollector} instance.
+ * A supplier of a {@link RecordCollectorImpl} instance.
*/
- public interface Supplier {
+ interface Supplier {
/**
* Get the record collector.
* @return the record collector
*/
RecordCollector recordCollector();
}
-
- private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
- private final Producer<byte[], byte[]> producer;
- private final Map<TopicPartition, Long> offsets;
- private final String logPrefix;
-
-
- public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
- this.producer = producer;
- this.offsets = new HashMap<>();
- this.logPrefix = String.format("task [%s]", streamTaskId);
- }
-
- public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- send(record, keySerializer, valueSerializer, null);
- }
-
- public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamPartitioner<K, V> partitioner) {
- byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
- byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
- Integer partition = record.partition();
- if (partition == null && partitioner != null) {
- List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
- if (partitions != null && partitions.size() > 0)
- partition = partitioner.partition(record.key(), record.value(), partitions.size());
- }
-
- ProducerRecord<byte[], byte[]> serializedRecord =
- new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
- final String topic = serializedRecord.topic();
-
- for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
- try {
- this.producer.send(serializedRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- offsets.put(tp, metadata.offset());
- } else {
- log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
- }
- }
- });
- return;
- } catch (TimeoutException e) {
- if (attempt == MAX_SEND_ATTEMPTS) {
- throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
- }
- log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
- Utils.sleep(SEND_RETRY_BACKOFF);
- }
-
- }
- }
-
- public void flush() {
- log.debug("{} Flushing producer", logPrefix);
- this.producer.flush();
- }
-
- /**
- * Closes this RecordCollector
- */
- public void close() {
- producer.close();
- }
-
- /**
- * The last ack'd offset from the producer
- *
- * @return the map from TopicPartition to offset
- */
- Map<TopicPartition, Long> offsets() {
- return this.offsets;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
new file mode 100644
index 0000000..0dbad5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordCollectorImpl implements RecordCollector {
+ private static final int MAX_SEND_ATTEMPTS = 3;
+ private static final long SEND_RETRY_BACKOFF = 100L;
+
+ private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
+
+ private final Producer<byte[], byte[]> producer;
+ private final Map<TopicPartition, Long> offsets;
+ private final String logPrefix;
+
+
+ public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+ this.producer = producer;
+ this.offsets = new HashMap<>();
+ this.logPrefix = String.format("task [%s]", streamTaskId);
+ }
+
+ @Override
+ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ send(record, keySerializer, valueSerializer, null);
+ }
+
+ @Override
+ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+ StreamPartitioner<K, V> partitioner) {
+ byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
+ byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
+ Integer partition = record.partition();
+ if (partition == null && partitioner != null) {
+ List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
+ if (partitions != null && partitions.size() > 0)
+ partition = partitioner.partition(record.key(), record.value(), partitions.size());
+ }
+
+ ProducerRecord<byte[], byte[]> serializedRecord =
+ new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
+ final String topic = serializedRecord.topic();
+
+ for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+ try {
+ this.producer.send(serializedRecord, new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception == null) {
+ TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+ }
+ }
+ });
+ return;
+ } catch (TimeoutException e) {
+ if (attempt == MAX_SEND_ATTEMPTS) {
+ throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
+ }
+ log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
+ Utils.sleep(SEND_RETRY_BACKOFF);
+ }
+
+ }
+ }
+
+ @Override
+ public void flush() {
+ log.debug("{} Flushing producer", logPrefix);
+ this.producer.flush();
+ }
+
+ /**
+ * Closes this RecordCollector
+ */
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+ /**
+ * The last ack'd offset from the producer
+ *
+ * @return the map from TopicPartition to offset
+ */
+ Map<TopicPartition, Long> offsets() {
+ return this.offsets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 80c0026..9ce6595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
@@ -29,6 +32,28 @@ import java.util.Map;
public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
+ private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
+ @Override
+ public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+
+ }
+
+ @Override
+ public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
private final TaskId id;
private final String applicationId;
private final StreamsMetrics metrics;
@@ -78,7 +103,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
@Override
public RecordCollector recordCollector() {
- throw new UnsupportedOperationException();
+ return NO_OP_COLLECTOR;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 9a2f03e..6651705 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -54,7 +54,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
private final PunctuationQueue punctuationQueue;
private final Map<TopicPartition, Long> consumedOffsets;
- private final RecordCollector recordCollector;
+ private final RecordCollectorImpl recordCollector;
private final int maxBufferedSize;
private boolean commitRequested = false;
@@ -109,7 +109,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
this.consumedOffsets = new HashMap<>();
// create the record recordCollector that maintains the produced offsets
- this.recordCollector = new RecordCollector(producer, id().toString());
+ this.recordCollector = new RecordCollectorImpl(producer, id().toString());
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 66b6d2e..7df2798 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -144,6 +144,7 @@ public class QueryableStateIntegrationTest {
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
stringComparator = new Comparator<KeyValue<String, String>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 23abf8a..66397fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -70,7 +70,7 @@ public class RecordCollectorTest {
@Test
public void testSpecificPartition() {
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestSpecificPartition");
@@ -102,7 +102,7 @@ public class RecordCollectorTest {
@Test
public void testStreamPartitioner() {
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestStreamPartitioner");
@@ -129,7 +129,7 @@ public class RecordCollectorTest {
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
final AtomicInteger attempt = new AtomicInteger(0);
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -149,7 +149,7 @@ public class RecordCollectorTest {
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
- RecordCollector collector = new RecordCollector(
+ RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 3b41517..8ae250c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,129 +17,28 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
-import java.io.File;
-import java.util.Map;
-
public class SinkNodeTest {
@Test(expected = StreamsException.class)
@SuppressWarnings("unchecked")
public void invalidInputRecordTimestampTest() {
final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null));
+ context.setTime(-1);
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
- sink.init(new MockProcessorContext());
+ sink.init(context);
sink.process(null, null);
}
-
- private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
- private final long invalidTimestamp = -1;
-
- @Override
- public String applicationId() {
- return null;
- }
-
- @Override
- public TaskId taskId() {
- return null;
- }
-
- @Override
- public Serde<?> keySerde() {
- return null;
- }
-
- @Override
- public Serde<?> valueSerde() {
- return null;
- }
-
- @Override
- public File stateDir() {
- return null;
- }
-
- @Override
- public StreamsMetrics metrics() {
- return null;
- }
-
- @Override
- public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
- }
-
- @Override
- public StateStore getStateStore(String name) {
- return null;
- }
-
- @Override
- public void schedule(long interval) {
- }
-
- @Override
- public <K, V> void forward(K key, V value) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, int childIndex) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, String childName) {
- }
-
- @Override
- public void commit() {
- }
-
- @Override
- public String topic() {
- return null;
- }
-
- @Override
- public int partition() {
- return 0;
- }
-
- @Override
- public long offset() {
- return 0;
- }
-
- @Override
- public long timestamp() {
- return invalidTimestamp;
- }
-
- @Override
- public Map<String, Object> appConfigs() {
- return null;
- }
-
- @Override
- public Map<String, Object> appConfigsWithPrefix(String prefix) {
- return null;
- }
-
- @Override
- public RecordCollector recordCollector() {
- return null;
- }
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index afd9bb6..b28b8d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -303,6 +306,34 @@ public class StandbyTaskTest {
}
+ @Test
+ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
+ final String changelogName = "test-application-my-store-changelog";
+ final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
+ consumer.assign(partitions);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+ committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
+ consumer.commitSync(committedOffsets);
+
+ restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
+ new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.stream("topic").groupByKey().count("my-store");
+ final ProcessorTopology topology = builder.build(0);
+ StreamsConfig config = createConfig(baseDir);
+ new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() {
+ @Override
+ public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+ return null;
+ }
+
+ @Override
+ public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+
+ }
+ }, stateDirectory);
+
+ }
private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index aca974b..68a80d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -199,7 +200,7 @@ public class KeyValueStoreTestDriver<K, V> {
ByteArraySerializer rawSerializer = new ByteArraySerializer();
Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
- this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") {
+ this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index f47bc24..b15ebab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -79,7 +80,7 @@ public class RocksDBWindowStoreTest {
public void shouldOnlyIterateOpenSegments() throws Exception {
final File baseDir = TestUtils.tempDirectory();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
}
@@ -126,7 +127,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -200,7 +201,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -289,7 +290,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -376,7 +377,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -432,7 +433,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "anyTaskID") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "anyTaskID") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -460,7 +461,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRolling") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -575,7 +576,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestore") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -624,7 +625,7 @@ public class RocksDBWindowStoreTest {
File baseDir2 = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestoreII") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -679,7 +680,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
@@ -782,7 +783,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
@@ -845,7 +846,7 @@ public class RocksDBWindowStoreTest {
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
final File baseDir = TestUtils.tempDirectory();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index fbfffb9..9189a14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class StoreChangeLoggerTest {
private final Map<Integer, String> logged = new HashMap<>();
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
- new RecordCollector(null, "StoreChangeLoggerTest") {
+ new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 14e15a2..f51cc0e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
@@ -246,7 +246,7 @@ public class KStreamTestDriver {
}
- private class MockRecordCollector extends RecordCollector {
+ private class MockRecordCollector extends RecordCollectorImpl {
public MockRecordCollector() {
super(null, "KStreamTestDriver");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cafdd9e..f4ab642 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 880a93b..d4368d3 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -19,9 +19,9 @@ package org.apache.kafka.test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
-public class NoOpRecordCollector extends RecordCollector {
+public class NoOpRecordCollector extends RecordCollectorImpl {
public NoOpRecordCollector() {
super(null, "NoOpRecordCollector");
}