You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/06 20:09:54 UTC
[1/2] kafka git commit: KAFKA-3994: Fix deadlock in Watchers by
calling tryComplete without any locks
Repository: kafka
Updated Branches:
refs/heads/0.10.1 f46032f64 -> abfee8549
KAFKA-3994: Fix deadlock in Watchers by calling tryComplete without any locks
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma, Jun Rao, Jiangjie Qin, Guozhang Wang
Closes #2195 from hachikuji/KAFKA-3994-linked-queue
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/02e75a27
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/02e75a27
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/02e75a27
Branch: refs/heads/0.10.1
Commit: 02e75a27ce1b2a34a54142cfd3c6bf0931c94c10
Parents: f46032f
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Dec 5 11:12:03 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 11:56:19 2016 -0800
----------------------------------------------------------------------
.../kafka/coordinator/DelayedHeartbeat.scala | 5 ++
.../scala/kafka/coordinator/DelayedJoin.scala | 8 +-
.../scala/kafka/server/DelayedOperation.scala | 93 +++++++++++---------
.../other/kafka/TestPurgatoryPerformance.scala | 1 -
.../kafka/server/DelayedOperationTest.scala | 4 +-
5 files changed, 62 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 8e250c3..b05186c 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -29,6 +29,11 @@ private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
heartbeatDeadline: Long,
sessionTimeout: Long)
extends DelayedOperation(sessionTimeout) {
+
+ // overridden since tryComplete already synchronizes on the group. This makes it safe to
+ // call purgatory operations while holding the group lock.
+ override def safeTryComplete(): Boolean = tryComplete()
+
override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
override def onComplete() = coordinator.onCompleteHeartbeat()
http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
index a62884a..8744f16 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
@@ -31,8 +31,12 @@ import kafka.server.DelayedOperation
*/
private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
group: GroupMetadata,
- sessionTimeout: Long)
- extends DelayedOperation(sessionTimeout) {
+ rebalanceTimeout: Long)
+ extends DelayedOperation(rebalanceTimeout) {
+
+ // overridden since tryComplete already synchronizes on the group. This makes it safe to
+ // call purgatory operations while holding the group lock.
+ override def safeTryComplete(): Boolean = tryComplete()
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
override def onExpiration() = coordinator.onExpireJoin()
http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 5248edf..553c27a 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -17,22 +17,18 @@
package kafka.server
-import kafka.utils._
-import kafka.utils.timer._
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.metrics.KafkaMetricsGroup
-
-import java.util.LinkedList
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection._
-
import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
+import kafka.utils._
+import kafka.utils.timer._
+
+import scala.collection._
/**
* An operation whose processing needs to be delayed for at most the given delayMs. For example
@@ -77,7 +73,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
/**
* Check if the delayed operation is already completed
*/
- def isCompleted(): Boolean = completed.get()
+ def isCompleted: Boolean = completed.get()
/**
* Call-back to execute when a delayed operation gets expired and hence forced to complete.
@@ -90,7 +86,7 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
*/
def onComplete(): Unit
- /*
+ /**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
* forceComplete() and return true iff forceComplete returns true; otherwise return false
@@ -99,6 +95,16 @@ abstract class DelayedOperation(override val delayMs: Long) extends TimerTask wi
*/
def tryComplete(): Boolean
+ /**
+ * Thread-safe variant of tryComplete(). This can be overridden if the operation provides its
+ * own synchronization.
+ */
+ def safeTryComplete(): Boolean = {
+ synchronized {
+ tryComplete()
+ }
+ }
+
/*
* run() method defines a task that is executed on timeout
*/
@@ -187,14 +193,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
// operation is unnecessarily added for watch. However, this is a less severe issue since the
// expire reaper will clean it up periodically.
- var isCompletedByMe = operation synchronized operation.tryComplete()
+ var isCompletedByMe = operation.safeTryComplete()
if (isCompletedByMe)
return true
var watchCreated = false
for(key <- watchKeys) {
// If the operation is already completed, stop adding it to the rest of the watcher list.
- if (operation.isCompleted())
+ if (operation.isCompleted)
return false
watchForOperation(key, operation)
@@ -204,14 +210,14 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
}
}
- isCompletedByMe = operation synchronized operation.tryComplete()
+ isCompletedByMe = operation.safeTryComplete()
if (isCompletedByMe)
return true
// if it cannot be completed by now and hence is watched, add to the expire queue also
- if (! operation.isCompleted()) {
+ if (!operation.isCompleted) {
timeoutTimer.add(operation)
- if (operation.isCompleted()) {
+ if (operation.isCompleted) {
// cancel the timer task
operation.cancel()
}
@@ -239,7 +245,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
* on multiple lists, and some of its watched entries may still be in the watch lists
* even when it has been completed, this number may be larger than the number of real operations watched
*/
- def watched() = allWatchers.map(_.watched).sum
+ def watched() = allWatchers.map(_.countWatched).sum
/**
* Return the number of delayed operations in the expiry queue
@@ -272,7 +278,7 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
if (watchersForKey.get(key) != watchers)
return
- if (watchers != null && watchers.watched == 0) {
+ if (watchers != null && watchers.isEmpty) {
watchersForKey.remove(key)
}
}
@@ -291,35 +297,35 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
* A linked list of watched delayed operations based on some key
*/
private class Watchers(val key: Any) {
+ private[this] val operations = new ConcurrentLinkedQueue[T]()
- private[this] val operations = new LinkedList[T]()
+ // count the current number of watched operations. This is O(n), so use isEmpty() if possible
+ def countWatched: Int = operations.size
- def watched: Int = operations synchronized operations.size
+ def isEmpty: Boolean = operations.isEmpty
// add the element to watch
def watch(t: T) {
- operations synchronized operations.add(t)
+ operations.add(t)
}
// traverse the list and try to complete some watched elements
def tryCompleteWatched(): Int = {
-
var completed = 0
- operations synchronized {
- val iter = operations.iterator()
- while (iter.hasNext) {
- val curr = iter.next()
- if (curr.isCompleted) {
- // another thread has completed this operation, just remove it
- iter.remove()
- } else if (curr synchronized curr.tryComplete()) {
- completed += 1
- iter.remove()
- }
+
+ val iter = operations.iterator()
+ while (iter.hasNext) {
+ val curr = iter.next()
+ if (curr.isCompleted) {
+ // another thread has completed this operation, just remove it
+ iter.remove()
+ } else if (curr.safeTryComplete()) {
+ iter.remove()
+ completed += 1
}
}
- if (operations.size == 0)
+ if (operations.isEmpty)
removeKeyIfEmpty(key, this)
completed
@@ -328,18 +334,17 @@ class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
// traverse the list and purge elements that are already completed by others
def purgeCompleted(): Int = {
var purged = 0
- operations synchronized {
- val iter = operations.iterator()
- while (iter.hasNext) {
- val curr = iter.next()
- if (curr.isCompleted) {
- iter.remove()
- purged += 1
- }
+
+ val iter = operations.iterator()
+ while (iter.hasNext) {
+ val curr = iter.next()
+ if (curr.isCompleted) {
+ iter.remove()
+ purged += 1
}
}
- if (operations.size == 0)
+ if (operations.isEmpty)
removeKeyIfEmpty(key, this)
purged
http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
index ba89fc8..addd232 100644
--- a/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
+++ b/core/src/test/scala/other/kafka/TestPurgatoryPerformance.scala
@@ -237,7 +237,6 @@ object TestPurgatoryPerformance {
}
private class FakeOperation(delayMs: Long, size: Int, val latencyMs: Long, latch: CountDownLatch) extends DelayedOperation(delayMs) {
- private[this] val data = new Array[Byte](size)
val completesAt = System.currentTimeMillis + latencyMs
def onExpiration(): Unit = {}
http://git-wip-us.apache.org/repos/asf/kafka/blob/02e75a27/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index ae0d12f..dccde2f 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -62,8 +62,8 @@ class DelayedOperationTest {
assertFalse("r2 not satisfied and hence watched", purgatory.tryCompleteElseWatch(r2, Array("test2")))
r1.awaitExpiration()
val elapsed = SystemTime.hiResClockMs - start
- assertTrue("r1 completed due to expiration", r1.isCompleted())
- assertFalse("r2 hasn't completed", r2.isCompleted())
+ assertTrue("r1 completed due to expiration", r1.isCompleted)
+ assertFalse("r2 hasn't completed", r2.isCompleted)
assertTrue(s"Time for expiration $elapsed should at least $expiration", elapsed >= expiration)
}
[2/2] kafka git commit: KAFKA-4488: UnsupportedOperationException
during initialization of StandbyTask
Posted by gu...@apache.org.
KAFKA-4488: UnsupportedOperationException during initialization of StandbyTask
Instead of throwing `UnsupportedOperationException` from `StandbyTask.recordCollector()` return a No-op implementation of `RecordCollector`.
Refactored `RecordCollector` to have an interface and impl.
Author: Damian Guy <da...@gmail.com>
Reviewers: Eno Thereska, Guozhang Wang
Closes #2212 from dguy/standby-task
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abfee854
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abfee854
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abfee854
Branch: refs/heads/0.10.1
Commit: abfee8549b920ca88b008b59f651324ce0b0c05e
Parents: 02e75a2
Author: Damian Guy <da...@gmail.com>
Authored: Tue Dec 6 11:49:54 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Dec 6 12:09:34 2016 -0800
----------------------------------------------------------------------
.../processor/internals/RecordCollector.java | 113 ++---------------
.../internals/RecordCollectorImpl.java | 123 +++++++++++++++++++
.../processor/internals/StandbyContextImpl.java | 27 +++-
.../streams/processor/internals/StreamTask.java | 4 +-
.../QueryableStateIntegrationTest.java | 1 +
.../internals/RecordCollectorTest.java | 8 +-
.../processor/internals/SinkNodeTest.java | 117 ++----------------
.../processor/internals/StandbyTaskTest.java | 31 +++++
.../streams/state/KeyValueStoreTestDriver.java | 3 +-
.../state/internals/RocksDBWindowStoreTest.java | 25 ++--
.../state/internals/StoreChangeLoggerTest.java | 4 +-
.../apache/kafka/test/KStreamTestDriver.java | 4 +-
.../apache/kafka/test/MockProcessorContext.java | 6 +-
.../apache/kafka/test/NoOpRecordCollector.java | 4 +-
14 files changed, 232 insertions(+), 238 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index 63d6a3b..6d7d561 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -5,126 +5,39 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+public interface RecordCollector {
+ <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer);
+
+ <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+ StreamPartitioner<K, V> partitioner);
-public class RecordCollector {
- private static final int MAX_SEND_ATTEMPTS = 3;
- private static final long SEND_RETRY_BACKOFF = 100L;
+ void flush();
+
+ void close();
/**
- * A supplier of a {@link RecordCollector} instance.
+ * A supplier of a {@link RecordCollectorImpl} instance.
*/
- public interface Supplier {
+ interface Supplier {
/**
* Get the record collector.
* @return the record collector
*/
RecordCollector recordCollector();
}
-
- private static final Logger log = LoggerFactory.getLogger(RecordCollector.class);
-
- private final Producer<byte[], byte[]> producer;
- private final Map<TopicPartition, Long> offsets;
- private final String logPrefix;
-
-
- public RecordCollector(Producer<byte[], byte[]> producer, String streamTaskId) {
- this.producer = producer;
- this.offsets = new HashMap<>();
- this.logPrefix = String.format("task [%s]", streamTaskId);
- }
-
- public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
- send(record, keySerializer, valueSerializer, null);
- }
-
- public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
- StreamPartitioner<K, V> partitioner) {
- byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
- byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
- Integer partition = record.partition();
- if (partition == null && partitioner != null) {
- List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
- if (partitions != null && partitions.size() > 0)
- partition = partitioner.partition(record.key(), record.value(), partitions.size());
- }
-
- ProducerRecord<byte[], byte[]> serializedRecord =
- new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
- final String topic = serializedRecord.topic();
-
- for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
- try {
- this.producer.send(serializedRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (exception == null) {
- TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- offsets.put(tp, metadata.offset());
- } else {
- log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
- }
- }
- });
- return;
- } catch (TimeoutException e) {
- if (attempt == MAX_SEND_ATTEMPTS) {
- throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
- }
- log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
- Utils.sleep(SEND_RETRY_BACKOFF);
- }
-
- }
- }
-
- public void flush() {
- log.debug("{} Flushing producer", logPrefix);
- this.producer.flush();
- }
-
- /**
- * Closes this RecordCollector
- */
- public void close() {
- producer.close();
- }
-
- /**
- * The last ack'd offset from the producer
- *
- * @return the map from TopicPartition to offset
- */
- Map<TopicPartition, Long> offsets() {
- return this.offsets;
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
new file mode 100644
index 0000000..0dbad5b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordCollectorImpl implements RecordCollector {
+ private static final int MAX_SEND_ATTEMPTS = 3;
+ private static final long SEND_RETRY_BACKOFF = 100L;
+
+ private static final Logger log = LoggerFactory.getLogger(RecordCollectorImpl.class);
+
+ private final Producer<byte[], byte[]> producer;
+ private final Map<TopicPartition, Long> offsets;
+ private final String logPrefix;
+
+
+ public RecordCollectorImpl(Producer<byte[], byte[]> producer, String streamTaskId) {
+ this.producer = producer;
+ this.offsets = new HashMap<>();
+ this.logPrefix = String.format("task [%s]", streamTaskId);
+ }
+
+ @Override
+ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ send(record, keySerializer, valueSerializer, null);
+ }
+
+ @Override
+ public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer,
+ StreamPartitioner<K, V> partitioner) {
+ byte[] keyBytes = keySerializer.serialize(record.topic(), record.key());
+ byte[] valBytes = valueSerializer.serialize(record.topic(), record.value());
+ Integer partition = record.partition();
+ if (partition == null && partitioner != null) {
+ List<PartitionInfo> partitions = this.producer.partitionsFor(record.topic());
+ if (partitions != null && partitions.size() > 0)
+ partition = partitioner.partition(record.key(), record.value(), partitions.size());
+ }
+
+ ProducerRecord<byte[], byte[]> serializedRecord =
+ new ProducerRecord<>(record.topic(), partition, record.timestamp(), keyBytes, valBytes);
+ final String topic = serializedRecord.topic();
+
+ for (int attempt = 1; attempt <= MAX_SEND_ATTEMPTS; attempt++) {
+ try {
+ this.producer.send(serializedRecord, new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception == null) {
+ TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.error("{} Error sending record to topic {}", logPrefix, topic, exception);
+ }
+ }
+ });
+ return;
+ } catch (TimeoutException e) {
+ if (attempt == MAX_SEND_ATTEMPTS) {
+ throw new StreamsException(String.format("%s Failed to send record to topic %s after %d attempts", logPrefix, topic, attempt));
+ }
+ log.warn("{} Timeout exception caught when sending record to topic {} attempt {}", logPrefix, topic, attempt);
+ Utils.sleep(SEND_RETRY_BACKOFF);
+ }
+
+ }
+ }
+
+ @Override
+ public void flush() {
+ log.debug("{} Flushing producer", logPrefix);
+ this.producer.flush();
+ }
+
+ /**
+ * Closes this RecordCollector
+ */
+ @Override
+ public void close() {
+ producer.close();
+ }
+
+ /**
+ * The last ack'd offset from the producer
+ *
+ * @return the map from TopicPartition to offset
+ */
+ Map<TopicPartition, Long> offsets() {
+ return this.offsets;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 80c0026..9ce6595 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -17,11 +17,14 @@
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
@@ -29,6 +32,28 @@ import java.util.Map;
public class StandbyContextImpl implements InternalProcessorContext, RecordCollector.Supplier {
+ private static final RecordCollector NO_OP_COLLECTOR = new RecordCollector() {
+ @Override
+ public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer) {
+
+ }
+
+ @Override
+ public <K, V> void send(final ProducerRecord<K, V> record, final Serializer<K> keySerializer, final Serializer<V> valueSerializer, final StreamPartitioner<K, V> partitioner) {
+
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
private final TaskId id;
private final String applicationId;
private final StreamsMetrics metrics;
@@ -78,7 +103,7 @@ public class StandbyContextImpl implements InternalProcessorContext, RecordColle
@Override
public RecordCollector recordCollector() {
- throw new UnsupportedOperationException();
+ return NO_OP_COLLECTOR;
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 9a2f03e..6651705 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -54,7 +54,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
private final PunctuationQueue punctuationQueue;
private final Map<TopicPartition, Long> consumedOffsets;
- private final RecordCollector recordCollector;
+ private final RecordCollectorImpl recordCollector;
private final int maxBufferedSize;
private boolean commitRequested = false;
@@ -109,7 +109,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
this.consumedOffsets = new HashMap<>();
// create the record recordCollector that maintains the produced offsets
- this.recordCollector = new RecordCollector(producer, id().toString());
+ this.recordCollector = new RecordCollectorImpl(producer, id().toString());
// initialize the topology with its own context
this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 66b6d2e..7df2798 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -144,6 +144,7 @@ public class QueryableStateIntegrationTest {
.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
stringComparator = new Comparator<KeyValue<String, String>>() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 23abf8a..66397fb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -70,7 +70,7 @@ public class RecordCollectorTest {
@Test
public void testSpecificPartition() {
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestSpecificPartition");
@@ -102,7 +102,7 @@ public class RecordCollectorTest {
@Test
public void testStreamPartitioner() {
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
"RecordCollectorTest-TestStreamPartitioner");
@@ -129,7 +129,7 @@ public class RecordCollectorTest {
@Test
public void shouldRetryWhenTimeoutExceptionOccursOnSend() throws Exception {
final AtomicInteger attempt = new AtomicInteger(0);
- RecordCollector collector = new RecordCollector(
+ RecordCollectorImpl collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
@@ -149,7 +149,7 @@ public class RecordCollectorTest {
@SuppressWarnings("unchecked")
@Test(expected = StreamsException.class)
public void shouldThrowStreamsExceptionAfterMaxAttempts() throws Exception {
- RecordCollector collector = new RecordCollector(
+ RecordCollector collector = new RecordCollectorImpl(
new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 3b41517..8ae250c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,129 +17,28 @@
package org.apache.kafka.streams.processor.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
-import java.io.File;
-import java.util.Map;
-
public class SinkNodeTest {
@Test(expected = StreamsException.class)
@SuppressWarnings("unchecked")
public void invalidInputRecordTimestampTest() {
final Serializer anySerializer = Serdes.Bytes().serializer();
+ final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
+
+ final MockProcessorContext context = new MockProcessorContext(anyStateSerde, new RecordCollectorImpl(null, null));
+ context.setTime(-1);
final SinkNode sink = new SinkNode<>("name", "output-topic", anySerializer, anySerializer, null);
- sink.init(new MockProcessorContext());
+ sink.init(context);
sink.process(null, null);
}
-
- private final class MockProcessorContext implements ProcessorContext, RecordCollector.Supplier {
- private final long invalidTimestamp = -1;
-
- @Override
- public String applicationId() {
- return null;
- }
-
- @Override
- public TaskId taskId() {
- return null;
- }
-
- @Override
- public Serde<?> keySerde() {
- return null;
- }
-
- @Override
- public Serde<?> valueSerde() {
- return null;
- }
-
- @Override
- public File stateDir() {
- return null;
- }
-
- @Override
- public StreamsMetrics metrics() {
- return null;
- }
-
- @Override
- public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
- }
-
- @Override
- public StateStore getStateStore(String name) {
- return null;
- }
-
- @Override
- public void schedule(long interval) {
- }
-
- @Override
- public <K, V> void forward(K key, V value) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, int childIndex) {
- }
-
- @Override
- public <K, V> void forward(K key, V value, String childName) {
- }
-
- @Override
- public void commit() {
- }
-
- @Override
- public String topic() {
- return null;
- }
-
- @Override
- public int partition() {
- return 0;
- }
-
- @Override
- public long offset() {
- return 0;
- }
-
- @Override
- public long timestamp() {
- return invalidTimestamp;
- }
-
- @Override
- public Map<String, Object> appConfigs() {
- return null;
- }
-
- @Override
- public Map<String, Object> appConfigsWithPrefix(String prefix) {
- return null;
- }
-
- @Override
- public RecordCollector recordCollector() {
- return null;
- }
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index afd9bb6..b28b8d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
@@ -303,6 +306,34 @@ public class StandbyTaskTest {
}
+ @Test
+ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStores() throws Exception {
+ final String changelogName = "test-application-my-store-changelog";
+ final List<TopicPartition> partitions = Utils.mkList(new TopicPartition(changelogName, 0));
+ consumer.assign(partitions);
+ Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
+ committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L));
+ consumer.commitSync(committedOffsets);
+
+ restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
+ new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
+ final KStreamBuilder builder = new KStreamBuilder();
+ builder.stream("topic").groupByKey().count("my-store");
+ final ProcessorTopology topology = builder.build(0);
+ StreamsConfig config = createConfig(baseDir);
+ new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() {
+ @Override
+ public Sensor addLatencySensor(final String scopeName, final String entityName, final String operationName, final String... tags) {
+ return null;
+ }
+
+ @Override
+ public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
+
+ }
+ }, stateDirectory);
+
+ }
private List<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]>... recs) {
return Arrays.asList(recs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index aca974b..68a80d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -34,6 +34,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.MockTimestampExtractor;
@@ -199,7 +200,7 @@ public class KeyValueStoreTestDriver<K, V> {
ByteArraySerializer rawSerializer = new ByteArraySerializer();
Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
- this.recordCollector = new RecordCollector(producer, "KeyValueStoreTestDriver") {
+ this.recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index f47bc24..b15ebab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -79,7 +80,7 @@ public class RocksDBWindowStoreTest {
public void shouldOnlyIterateOpenSegments() throws Exception {
final File baseDir = TestUtils.tempDirectory();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
}
@@ -126,7 +127,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetch") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -200,7 +201,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchBefore") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -289,7 +290,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutAndFetchAfter") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -376,7 +377,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestPutSameKeyTimestamp") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -432,7 +433,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "anyTaskID") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "anyTaskID") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -460,7 +461,7 @@ public class RocksDBWindowStoreTest {
try {
final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRolling") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRolling") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -575,7 +576,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestore") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestore") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -624,7 +625,7 @@ public class RocksDBWindowStoreTest {
File baseDir2 = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestRestoreII") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestRestoreII") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
changeLog.add(new KeyValue<>(
@@ -679,7 +680,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestSegmentMaintenance") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
@@ -782,7 +783,7 @@ public class RocksDBWindowStoreTest {
File baseDir = Files.createTempDirectory("test").toFile();
try {
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-TestInitialLoading") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
// do nothing
@@ -845,7 +846,7 @@ public class RocksDBWindowStoreTest {
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() throws Exception {
final File baseDir = TestUtils.tempDirectory();
Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerde.serializer(), byteArraySerde.serializer());
- RecordCollector recordCollector = new RecordCollector(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
+ RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTest-ShouldOnlyIterateOpenSegments") {
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index fbfffb9..9189a14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.MockProcessorContext;
import org.junit.Test;
@@ -41,7 +41,7 @@ public class StoreChangeLoggerTest {
private final Map<Integer, String> logged = new HashMap<>();
private final MockProcessorContext context = new MockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
- new RecordCollector(null, "StoreChangeLoggerTest") {
+ new RecordCollectorImpl(null, "StoreChangeLoggerTest") {
@SuppressWarnings("unchecked")
@Override
public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 14e15a2..f51cc0e 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -29,7 +29,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import java.io.File;
@@ -246,7 +246,7 @@ public class KStreamTestDriver {
}
- private class MockRecordCollector extends RecordCollector {
+ private class MockRecordCollector extends RecordCollectorImpl {
public MockRecordCollector() {
super(null, "KStreamTestDriver");
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cafdd9e..f4ab642 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordContext;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.ThreadCache;
http://git-wip-us.apache.org/repos/asf/kafka/blob/abfee854/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
index 880a93b..d4368d3 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java
@@ -19,9 +19,9 @@ package org.apache.kafka.test;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
-public class NoOpRecordCollector extends RecordCollector {
+public class NoOpRecordCollector extends RecordCollectorImpl {
public NoOpRecordCollector() {
super(null, "NoOpRecordCollector");
}