You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2017/09/28 10:00:35 UTC
[2/2] kafka git commit: KAFKA-5949;
User Callback Exceptions need to be handled properly
KAFKA-5949; User Callback Exceptions need to be handled properly
- catch user exception in user callback (TimestampExtractor, DeserializationHandler, StateRestoreListener) and wrap with StreamsException
Additional cleanup:
- rename globalRestoreListener to userRestoreListener
- remove unnecessary interface -> collapse SourceNodeRecordDeserializer and RecordDeserializer
- removed unused parameter loggingEnabled from ProcessorContext#register
Author: Matthias J. Sax <ma...@confluent.io>
Reviewers: Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Damian Guy <da...@gmail.com>
Closes #3939 from mjsax/kafka-5949-exceptions-user-callbacks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e5f2471c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e5f2471c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e5f2471c
Branch: refs/heads/trunk
Commit: e5f2471c548fc490a42dd0321bcf7fcdd4ddc52d
Parents: 2703fda
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Sep 28 11:00:31 2017 +0100
Committer: Damian Guy <da...@gmail.com>
Committed: Thu Sep 28 11:00:31 2017 +0100
----------------------------------------------------------------------
.../errors/LogAndContinueExceptionHandler.java | 6 +-
.../errors/LogAndFailExceptionHandler.java | 8 +-
.../kafka/streams/kstream/ValueTransformer.java | 2 +
.../internals/KStreamTransformValues.java | 6 +-
.../streams/processor/ProcessorContext.java | 11 +-
.../kafka/streams/processor/StateStore.java | 4 +
.../internals/AbstractProcessorContext.java | 4 +-
.../processor/internals/AbstractTask.java | 6 +-
.../processor/internals/AssignedTasks.java | 5 +
.../internals/CompositeRestoreListener.java | 52 +++++++--
.../processor/internals/GlobalStateManager.java | 6 +
.../internals/GlobalStateManagerImpl.java | 1 -
.../internals/GlobalStateUpdateTask.java | 21 ++--
.../processor/internals/GlobalStreamThread.java | 7 +-
.../internals/InternalTopologyBuilder.java | 8 +-
.../internals/ProcessorStateManager.java | 13 +--
.../processor/internals/RecordDeserializer.java | 70 +++++++++++-
.../processor/internals/RecordQueue.java | 28 +++--
.../internals/SourceNodeRecordDeserializer.java | 90 ---------------
.../processor/internals/StateManager.java | 8 +-
.../processor/internals/StateRestorer.java | 4 +-
.../internals/StoreChangelogReader.java | 8 +-
.../streams/processor/internals/StreamTask.java | 3 +-
.../processor/internals/StreamThread.java | 12 +-
.../kafka/streams/processor/internals/Task.java | 3 +
.../processor/internals/TaskManager.java | 5 +-
.../state/internals/InMemoryKeyValueStore.java | 2 +-
.../streams/state/internals/MemoryLRUCache.java | 2 +-
.../streams/processor/TopologyBuilderTest.java | 2 +-
.../internals/CompositeRestoreListenerTest.java | 8 +-
.../internals/GlobalStateManagerImplTest.java | 42 +++----
.../internals/GlobalStateTaskTest.java | 20 +++-
.../internals/InternalTopologyBuilderTest.java | 3 +
.../processor/internals/PartitionGroupTest.java | 18 ++-
.../internals/ProcessorStateManagerTest.java | 50 ++++-----
.../internals/RecordDeserializerTest.java | 98 ++++++++++++++++
.../processor/internals/RecordQueueTest.java | 32 ++++--
.../SourceNodeRecordDeserializerTest.java | 111 -------------------
.../processor/internals/StandbyTaskTest.java | 2 +-
.../processor/internals/StateManagerStub.java | 2 +-
.../processor/internals/StateRestorerTest.java | 2 +-
.../internals/StoreChangelogReaderTest.java | 2 +-
.../internals/StreamPartitionAssignorTest.java | 15 ++-
.../processor/internals/StreamTaskTest.java | 4 +-
.../kafka/test/GlobalStateManagerStub.java | 2 +-
.../apache/kafka/test/MockProcessorContext.java | 4 +-
.../kafka/test/MockStateStoreSupplier.java | 39 +++----
.../apache/kafka/test/NoOpProcessorContext.java | 4 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 5 +-
49 files changed, 478 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index dde4b52..b2ef45b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -38,9 +38,9 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
final Exception exception) {
log.warn("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(), record.offset(),
- exception);
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
return DeserializationHandlerResponse.CONTINUE;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 23557a3..60af32f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -37,10 +37,10 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
- log.warn("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(), record.offset(),
- exception);
+ log.error("Exception caught during Deserialization, " +
+ "taskId: {}, topic: {}, partition: {}, offset: {}",
+ context.taskId(), record.topic(), record.partition(), record.offset(),
+ exception);
return DeserializationHandlerResponse.FAIL;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 5463a76..0a8e890 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -62,6 +62,8 @@ public interface ValueTransformer<V, VR> {
* {@code ValueTransformer} and will result in an {@link StreamsException exception}.
*
* @param context the context
+ * @throws IllegalStateException If store gets registered after initialization is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
*/
void init(final ProcessorContext context);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index ab1c302..55c16cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -91,8 +91,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
}
@Override
- public void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback) {
- context.register(store, loggingEnabled, stateRestoreCallback);
+ public void register(final StateStore store,
+ final boolean deprecatedAndIgnoredLoggingEnabled,
+ final StateRestoreCallback stateRestoreCallback) {
+ context.register(store, deprecatedAndIgnoredLoggingEnabled, stateRestoreCallback);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index cdf1612..385d641 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
import java.io.File;
import java.util.Map;
@@ -75,8 +76,14 @@ public interface ProcessorContext {
* Registers and possibly restores the specified storage engine.
*
* @param store the storage engine
- */
- void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback);
+ * @param loggingEnabledIsDeprecatedAndIgnored deprecated parameter {@code loggingEnabled} is ignored:
+ * if you want to enable logging on a state stores call
+ * {@link org.apache.kafka.streams.state.StoreBuilder#withLoggingEnabled(Map)}
+ * when creating the store
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void register(StateStore store, boolean loggingEnabledIsDeprecatedAndIgnored, StateRestoreCallback stateRestoreCallback);
/**
* Get the state store given the store name.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 3925951..cb8139c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.processor;
+import org.apache.kafka.streams.errors.StreamsException;
+
/**
* A storage engine for managing state maintained by a stream processor.
*
@@ -36,6 +38,8 @@ public interface StateStore {
/**
* Initializes this state store
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
*/
void init(ProcessorContext context, StateStore root);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 9e853fd..410212e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -93,13 +93,13 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
@Override
public void register(final StateStore store,
- final boolean loggingEnabled,
+ final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
if (initialized) {
throw new IllegalStateException("Can only create state stores during initialization.");
}
Objects.requireNonNull(store, "store must not be null");
- stateManager.register(store, loggingEnabled, stateRestoreCallback);
+ stateManager.register(store, stateRestoreCallback);
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 6734da6..c24686e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -90,7 +90,7 @@ public abstract class AbstractTask implements Task {
topology.storeToChangelogTopic(),
changelogReader,
eosEnabled,
- logContext);
+ logContext);
} catch (final IOException e) {
throw new ProcessorStateException(String.format("%sError while creating the state manager", logPrefix), e);
}
@@ -196,6 +196,10 @@ public abstract class AbstractTask implements Task {
stateMgr.flush();
}
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
void initializeStateStores() {
if (topology.stateStores().isEmpty()) {
return;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index e51ebd7..fcb717d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;
@@ -107,6 +108,10 @@ class AssignedTasks {
return partitions;
}
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
void initializeNewTasks() {
if (!created.isEmpty()) {
log.debug("Initializing {}s {}", taskTypeName, created.keySet());
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
index 138be77..a1c2f7f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListener.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.AbstractNotifyingBatchingRestoreCallback;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
@@ -32,7 +33,7 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
public static final NoOpStateRestoreListener NO_OP_STATE_RESTORE_LISTENER = new NoOpStateRestoreListener();
private final BatchingStateRestoreCallback internalBatchingRestoreCallback;
private final StateRestoreListener storeRestoreListener;
- private StateRestoreListener globalRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
+ private StateRestoreListener userRestoreListener = NO_OP_STATE_RESTORE_LISTENER;
CompositeRestoreListener(final StateRestoreCallback stateRestoreCallback) {
@@ -45,31 +46,66 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
internalBatchingRestoreCallback = getBatchingRestoreCallback(stateRestoreCallback);
}
+ /**
+ * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+ * {@link StateRestoreListener#onRestoreStart(TopicPartition, String, long, long)}
+ */
@Override
public void onRestoreStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset,
final long endingOffset) {
- globalRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+ try {
+ userRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
storeRestoreListener.onRestoreStart(topicPartition, storeName, startingOffset, endingOffset);
}
+ /**
+ * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+ * {@link StateRestoreListener#onBatchRestored(TopicPartition, String, long, long)}
+ */
@Override
public void onBatchRestored(final TopicPartition topicPartition,
final String storeName,
final long batchEndOffset,
final long numRestored) {
- globalRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+ try {
+ userRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
storeRestoreListener.onBatchRestored(topicPartition, storeName, batchEndOffset, numRestored);
}
+ /**
+ * @throws StreamsException if user provided {@link StateRestoreListener} raises an exception in
+ * {@link StateRestoreListener#onRestoreEnd(TopicPartition, String, long)}
+ */
@Override
public void onRestoreEnd(final TopicPartition topicPartition,
final String storeName,
final long totalRestored) {
- globalRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+ try {
+ userRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in store restore listener for store %s, partition %s.",
+ storeName,
+ topicPartition),
+ fatalUserException);
+ }
storeRestoreListener.onRestoreEnd(topicPartition, storeName, totalRestored);
-
}
@Override
@@ -77,9 +113,9 @@ public class CompositeRestoreListener implements BatchingStateRestoreCallback, S
internalBatchingRestoreCallback.restoreAll(records);
}
- void setGlobalRestoreListener(final StateRestoreListener globalRestoreListener) {
- if (globalRestoreListener != null) {
- this.globalRestoreListener = globalRestoreListener;
+ void setUserRestoreListener(final StateRestoreListener userRestoreListener) {
+ if (userRestoreListener != null) {
+ this.userRestoreListener = userRestoreListener;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
index b058844..c9b8ca8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManager.java
@@ -16,8 +16,14 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.streams.errors.StreamsException;
+
import java.util.Set;
public interface GlobalStateManager extends StateManager {
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
Set<String> initialize(InternalProcessorContext processorContext);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index d03425b..10a0775 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -120,7 +120,6 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
}
public void register(final StateStore store,
- final boolean ignored,
final StateRestoreCallback stateRestoreCallback) {
if (stores.containsKey(store.name())) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index 4c2b40f..849af57 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -18,7 +18,9 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
import java.io.IOException;
import java.util.HashMap;
@@ -33,29 +35,34 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
private final ProcessorTopology topology;
private final InternalProcessorContext processorContext;
private final Map<TopicPartition, Long> offsets = new HashMap<>();
- private final Map<String, SourceNodeRecordDeserializer> deserializers = new HashMap<>();
+ private final Map<String, RecordDeserializer> deserializers = new HashMap<>();
private final GlobalStateManager stateMgr;
private final DeserializationExceptionHandler deserializationExceptionHandler;
-
+ private final LogContext logContext;
public GlobalStateUpdateTask(final ProcessorTopology topology,
final InternalProcessorContext processorContext,
final GlobalStateManager stateMgr,
- final DeserializationExceptionHandler deserializationExceptionHandler) {
-
+ final DeserializationExceptionHandler deserializationExceptionHandler,
+ final LogContext logContext) {
this.topology = topology;
this.stateMgr = stateMgr;
this.processorContext = processorContext;
this.deserializationExceptionHandler = deserializationExceptionHandler;
+ this.logContext = logContext;
}
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
public Map<TopicPartition, Long> initialize() {
final Set<String> storeNames = stateMgr.initialize(processorContext);
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode source = topology.source(sourceTopic);
- deserializers.put(sourceTopic, new SourceNodeRecordDeserializer(source, deserializationExceptionHandler));
+ deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext));
}
initTopology();
processorContext.initialized();
@@ -66,8 +73,8 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
@SuppressWarnings("unchecked")
@Override
public void update(final ConsumerRecord<byte[], byte[]> record) {
- final SourceNodeRecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
- final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.tryDeserialize(processorContext, record);
+ final RecordDeserializer sourceNodeAndDeserializer = deserializers.get(record.topic());
+ final ConsumerRecord<Object, Object> deserialized = sourceNodeAndDeserializer.deserialize(processorContext, record);
if (deserialized != null) {
final ProcessorRecordContext recordContext =
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index a365add..1ee49e1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -220,6 +220,10 @@ public class GlobalStreamThread extends Thread {
this.flushInterval = flushInterval;
}
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
void initialize() {
final Map<TopicPartition, Long> partitionOffsets = stateMaintainer.initialize();
consumer.assign(partitionOffsets.keySet());
@@ -312,7 +316,8 @@ public class GlobalStreamThread extends Thread {
streamsMetrics,
cache),
stateMgr,
- config.defaultDeserializationExceptionHandler()),
+ config.defaultDeserializationExceptionHandler(),
+ logContext),
time,
config.getLong(StreamsConfig.POLL_MS_CONFIG),
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 81d2f6c..06405ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -178,8 +178,10 @@ public class InternalTopologyBuilder {
}
private static class StateStoreSupplierFactory extends AbstractStateStoreFactory {
+ @SuppressWarnings("deprecation")
private final StateStoreSupplier supplier;
+ @SuppressWarnings("deprecation")
StateStoreSupplierFactory(final StateStoreSupplier<?> supplier) {
super(supplier.name(),
supplier.loggingEnabled(),
@@ -495,6 +497,7 @@ public class InternalTopologyBuilder {
nodeGrouper.unite(name, predecessorNames);
}
+ @SuppressWarnings("deprecation")
public final void addStateStore(final StateStoreSupplier supplier,
final String... processorNames) {
Objects.requireNonNull(supplier, "supplier can't be null");
@@ -527,6 +530,7 @@ public class InternalTopologyBuilder {
}
}
+ @SuppressWarnings("deprecation")
public final void addGlobalStore(final StateStoreSupplier<KeyValueStore> storeSupplier,
final String sourceName,
final TimestampExtractor timestampExtractor,
@@ -1612,8 +1616,8 @@ public class InternalTopologyBuilder {
return Collections.unmodifiableSet(nodes);
}
- // only for testing
- public Iterator<TopologyDescription.Node> nodesInOrder() {
+ // visible for testing
+ Iterator<TopologyDescription.Node> nodesInOrder() {
return nodes.iterator();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2f16547..cc14c67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.BatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -41,8 +40,7 @@ import java.util.Map;
public class ProcessorStateManager implements StateManager {
-
- public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
+ private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
static final String CHECKPOINT_FILE_NAME = ".checkpoint";
private final Logger log;
@@ -119,17 +117,8 @@ public class ProcessorStateManager implements StateManager {
return baseDir;
}
- /**
- * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
- * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
- *
- * // TODO: parameter loggingEnabled can be removed now
- *
- * @throws StreamsException if the store's change log does not contain the partition
- */
@Override
public void register(final StateStore store,
- final boolean loggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
log.debug("Registering state store {} to its state manager", store.name());
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 4e04108..1b5f764 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,7 +17,73 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.slf4j.Logger;
-interface RecordDeserializer {
- ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord);
+import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
+
+class RecordDeserializer {
+ private final SourceNode sourceNode;
+ private final DeserializationExceptionHandler deserializationExceptionHandler;
+ private final Logger log;
+
+ RecordDeserializer(final SourceNode sourceNode,
+ final DeserializationExceptionHandler deserializationExceptionHandler,
+ final LogContext logContext) {
+ this.sourceNode = sourceNode;
+ this.deserializationExceptionHandler = deserializationExceptionHandler;
+ this.log = logContext.logger(RecordDeserializer.class);
+ }
+
+ /**
+ * @throws StreamsException if a deserialization error occurs and the deserialization callback returns
+ * {@link DeserializationExceptionHandler.DeserializationHandlerResponse#FAIL FAIL} or throws an exception itself
+ */
+ @SuppressWarnings("deprecation")
+ ConsumerRecord<Object, Object> deserialize(final ProcessorContext processorContext,
+ final ConsumerRecord<byte[], byte[]> rawRecord) {
+
+ try {
+ return new ConsumerRecord<>(
+ rawRecord.topic(),
+ rawRecord.partition(),
+ rawRecord.offset(),
+ rawRecord.timestamp(),
+ TimestampType.CREATE_TIME,
+ rawRecord.checksum(),
+ rawRecord.serializedKeySize(),
+ rawRecord.serializedValueSize(),
+ sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key()),
+ sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value()));
+ } catch (final Exception deserializationException) {
+ final DeserializationExceptionHandler.DeserializationHandlerResponse response;
+ try {
+ response = deserializationExceptionHandler.handle(processorContext, rawRecord, deserializationException);
+ } catch (final Exception fatalUserException) {
+ log.error("Deserialization error callback failed after deserialization error for record {}",
+ rawRecord,
+ deserializationException);
+ throw new StreamsException("Fatal user code error in deserialization error callback", fatalUserException);
+ }
+
+ if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
+ throw new StreamsException("Deserialization exception handler is set to fail upon" +
+ " a deserialization error. If you would rather have the streaming pipeline" +
+ " continue after a deserialization error, please set the " +
+ DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
+ deserializationException);
+ } else {
+ sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
+ }
+ }
+ return null;
+ }
+
+ SourceNode sourceNode() {
+ return sourceNode;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 889b6d8..e6facaf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -18,11 +18,12 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
@@ -33,16 +34,14 @@ import java.util.ArrayDeque;
* timestamp is monotonically increasing such that once it is advanced, it will not be decremented.
*/
public class RecordQueue {
-
- private static final Logger log = LoggerFactory.getLogger(RecordQueue.class);
-
private final SourceNode source;
private final TimestampExtractor timestampExtractor;
private final TopicPartition partition;
private final ArrayDeque<StampedRecord> fifoQueue;
private final TimestampTracker<ConsumerRecord<Object, Object>> timeTracker;
- private final SourceNodeRecordDeserializer recordDeserializer;
+ private final RecordDeserializer recordDeserializer;
private final ProcessorContext processorContext;
+ private final Logger log;
private long partitionTime = TimestampTracker.NOT_KNOWN;
@@ -50,14 +49,16 @@ public class RecordQueue {
final SourceNode source,
final TimestampExtractor timestampExtractor,
final DeserializationExceptionHandler deserializationExceptionHandler,
- final ProcessorContext processorContext) {
+ final ProcessorContext processorContext,
+ final LogContext logContext) {
this.partition = partition;
this.source = source;
this.timestampExtractor = timestampExtractor;
this.fifoQueue = new ArrayDeque<>();
this.timeTracker = new MinTimestampTracker<>();
- this.recordDeserializer = new SourceNodeRecordDeserializer(source, deserializationExceptionHandler);
+ this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
this.processorContext = processorContext;
+ this.log = logContext.logger(RecordQueue.class);
}
/**
@@ -87,12 +88,21 @@ public class RecordQueue {
int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
for (final ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
- final ConsumerRecord<Object, Object> record = recordDeserializer.tryDeserialize(processorContext, rawRecord);
+ final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
continue;
}
- final long timestamp = timestampExtractor.extract(record, timeTracker.get());
+ final long timestamp;
+ try {
+ timestamp = timestampExtractor.extract(record, timeTracker.get());
+ } catch (final StreamsException internalFatalExtractorException) {
+ throw internalFatalExtractorException;
+ } catch (final Exception fatalUserException) {
+ throw new StreamsException(
+ String.format("Fatal user code error in TimestampExtractor callback for record %s.", record),
+ fatalUserException);
+ }
log.trace("Source node {} extracted timestamp {} for record {}", source.name(), timestamp, record);
// drop message if TS is invalid, i.e., negative
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
deleted file mode 100644
index 7fde881..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import static java.lang.String.format;
-import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
-
-class SourceNodeRecordDeserializer implements RecordDeserializer {
- private final SourceNode sourceNode;
- private final DeserializationExceptionHandler deserializationExceptionHandler;
-
- SourceNodeRecordDeserializer(final SourceNode sourceNode,
- final DeserializationExceptionHandler deserializationExceptionHandler) {
- this.sourceNode = sourceNode;
- this.deserializationExceptionHandler = deserializationExceptionHandler;
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[], byte[]> rawRecord) {
- final Object key;
- try {
- key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key());
- } catch (Exception e) {
- throw new StreamsException(format("Failed to deserialize key for record. topic=%s, partition=%d, offset=%d",
- rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
- }
-
- final Object value;
- try {
- value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value());
- } catch (Exception e) {
- throw new StreamsException(format("Failed to deserialize value for record. topic=%s, partition=%d, offset=%d",
- rawRecord.topic(), rawRecord.partition(), rawRecord.offset()), e);
- }
-
- return new ConsumerRecord<>(rawRecord.topic(), rawRecord.partition(), rawRecord.offset(),
- rawRecord.timestamp(), TimestampType.CREATE_TIME,
- rawRecord.checksum(),
- rawRecord.serializedKeySize(),
- rawRecord.serializedValueSize(), key, value);
-
- }
-
- public ConsumerRecord<Object, Object> tryDeserialize(final ProcessorContext processorContext,
- final ConsumerRecord<byte[], byte[]> rawRecord) {
-
- // catch and process if we have a deserialization handler
- try {
- return deserialize(rawRecord);
- } catch (final Exception e) {
- final DeserializationExceptionHandler.DeserializationHandlerResponse response =
- deserializationExceptionHandler.handle(processorContext, rawRecord, e);
- if (response == DeserializationExceptionHandler.DeserializationHandlerResponse.FAIL) {
- throw new StreamsException("Deserialization exception handler is set to fail upon" +
- " a deserialization error. If you would rather have the streaming pipeline" +
- " continue after a deserialization error, please set the " +
- DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
- e);
- } else {
- sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
- }
- }
- return null;
- }
-
- public SourceNode sourceNode() {
- return sourceNode;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
index 511280d..2a8d9a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -27,7 +28,12 @@ import java.util.Map;
interface StateManager extends Checkpointable {
File baseDir();
- void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
+ /**
+ * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
+ * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
+ void register(final StateStore store, final StateRestoreCallback stateRestoreCallback);
void flush();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 579561f..33dce9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -79,8 +79,8 @@ public class StateRestorer {
return persistent;
}
- void setGlobalRestoreListener(StateRestoreListener globalStateRestoreListener) {
- this.compositeRestoreListener.setGlobalRestoreListener(globalStateRestoreListener);
+ void setUserRestoreListener(StateRestoreListener userRestoreListener) {
+ this.compositeRestoreListener.setUserRestoreListener(userRestoreListener);
}
void setRestoredOffset(final long restoredOffset) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index caa0100..4ba860d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -41,7 +41,7 @@ public class StoreChangelogReader implements ChangelogReader {
private final Logger log;
private final Consumer<byte[], byte[]> consumer;
- private final StateRestoreListener stateRestoreListener;
+ private final StateRestoreListener userStateRestoreListener;
private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
@@ -50,11 +50,11 @@ public class StoreChangelogReader implements ChangelogReader {
public StoreChangelogReader(final String threadId,
final Consumer<byte[], byte[]> consumer,
- final StateRestoreListener stateRestoreListener,
+ final StateRestoreListener userStateRestoreListener,
final LogContext logContext) {
this.consumer = consumer;
this.log = logContext.logger(getClass());
- this.stateRestoreListener = stateRestoreListener;
+ this.userStateRestoreListener = userStateRestoreListener;
}
public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
@@ -65,7 +65,7 @@ public class StoreChangelogReader implements ChangelogReader {
@Override
public void register(final StateRestorer restorer) {
- restorer.setGlobalRestoreListener(stateRestoreListener);
+ restorer.setUserRestoreListener(userStateRestoreListener);
stateRestorers.put(restorer.partition(), restorer);
needsInitializing.put(restorer.partition(), restorer);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6775edb..8c26fa9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -136,7 +136,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
for (final TopicPartition partition : partitions) {
final SourceNode source = topology.source(partition.topic());
final TimestampExtractor sourceTimestampExtractor = source.getTimestampExtractor() != null ? source.getTimestampExtractor() : defaultTimestampExtractor;
- final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext);
+ final RecordQueue queue = new RecordQueue(partition, source, sourceTimestampExtractor, defaultDeserializationExceptionHandler, processorContext, logContext);
partitionQueues.put(partition, queue);
}
@@ -151,6 +151,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
}
}
+ @Override
public boolean initialize() {
log.trace("Initializing");
initializeStateStores();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e141c46..8d13558 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -641,7 +641,7 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final StreamsMetadataState streamsMetadataState,
final long cacheSizeBytes,
final StateDirectory stateDirectory,
- final StateRestoreListener stateRestoreListener) {
+ final StateRestoreListener userStateRestoreListener) {
final String threadClientId = clientId + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement();
final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(metrics,
@@ -666,8 +666,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
restoreConsumer,
- stateRestoreListener,
- logContext);
+ userStateRestoreListener,
+ logContext);
Producer<byte[], byte[]> threadProducer = null;
if (!eosEnabled) {
@@ -757,6 +757,8 @@ public class StreamThread extends Thread implements ThreadDataProvider {
/**
* Main event loop for polling, and processing records through topologies.
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
*/
private void runLoop() {
long recordsProcessedBeforeCommit = UNLIMITED_RECORDS;
@@ -767,6 +769,10 @@ public class StreamThread extends Thread implements ThreadDataProvider {
}
}
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
// Visible for testing
long runOnce(final long recordsProcessedBeforeCommit) {
long processedBeforeCommit = recordsProcessedBeforeCommit;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index a481c31..80e5423 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
@@ -71,6 +72,8 @@ public interface Task {
/**
* initialize the task and return true if the task is ready to run, i.e, it has not state stores
* @return true if this task has no state stores that may need restoring.
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
*/
boolean initialize();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index f12ed91..278957e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -239,7 +239,10 @@ class TaskManager {
this.consumer = consumer;
}
-
+ /**
+ * @throws IllegalStateException If store gets registered after initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the partition
+ */
boolean updateNewAndRestoringTasks() {
active.initializeNewTasks();
standby.initializeNewTasks();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 7e24969..929d584 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -72,7 +72,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {
if (root != null) {
// register the store
- context.register(root, true, new StateRestoreCallback() {
+ context.register(root, false, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
// this is a delete
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index beb9ce1..f1aa96f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -108,7 +108,7 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
// register the store
- context.register(root, true, new StateRestoreCallback() {
+ context.register(root, false, new StateRestoreCallback() {
@Override
public void restore(byte[] key, byte[] value) {
restoring = true;
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index ded2732..7786348 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -121,7 +121,7 @@ public class TopologyBuilderTest {
final Serde<String> stringSerde = Serdes.String();
try {
- builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), new String[]{});
+ builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer());
fail("Should throw TopologyBuilderException with no topics");
} catch (TopologyBuilderException tpe) {
//no-op
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
index 88aba94..e2ea668 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CompositeRestoreListenerTest.java
@@ -124,7 +124,7 @@ public class CompositeRestoreListenerTest {
@Test
public void shouldHandleNullReportStoreListener() {
compositeRestoreListener = new CompositeRestoreListener(batchingStateRestoreCallback);
- compositeRestoreListener.setGlobalRestoreListener(null);
+ compositeRestoreListener.setUserRestoreListener(null);
compositeRestoreListener.restoreAll(records);
compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -138,7 +138,7 @@ public class CompositeRestoreListenerTest {
@Test
public void shouldHandleNoRestoreListener() {
compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
- compositeRestoreListener.setGlobalRestoreListener(null);
+ compositeRestoreListener.setUserRestoreListener(null);
compositeRestoreListener.restoreAll(records);
compositeRestoreListener.onRestoreStart(topicPartition, storeName, startOffset, endOffset);
@@ -151,7 +151,7 @@ public class CompositeRestoreListenerTest {
@Test(expected = UnsupportedOperationException.class)
public void shouldThrowExceptionWhenSinglePutDirectlyCalled() {
compositeRestoreListener = new CompositeRestoreListener(noListenBatchingStateRestoreCallback);
- compositeRestoreListener.setGlobalRestoreListener(null);
+ compositeRestoreListener.setUserRestoreListener(null);
compositeRestoreListener.restore(key, value);
}
@@ -179,7 +179,7 @@ public class CompositeRestoreListenerTest {
private void setUpCompositeRestoreListener(StateRestoreCallback stateRestoreCallback) {
compositeRestoreListener = new CompositeRestoreListener(stateRestoreCallback);
- compositeRestoreListener.setGlobalRestoreListener(reportingStoreListener);
+ compositeRestoreListener.setUserRestoreListener(reportingStoreListener);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index b438347..0519fb0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -166,7 +166,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
try {
- stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), false, new TheStateRestoreCallback());
+ stateManager.register(new NoOpReadOnlyStore<>("not-in-topology"), new TheStateRestoreCallback());
fail("should have raised an illegal argument exception as store is not in the topology");
} catch (final IllegalArgumentException e) {
// pass
@@ -177,9 +177,9 @@ public class GlobalStateManagerImplTest {
public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() {
stateManager.initialize(context);
initializeConsumer(2, 1, t1);
- stateManager.register(store1, false, new TheStateRestoreCallback());
+ stateManager.register(store1, new TheStateRestoreCallback());
try {
- stateManager.register(store1, false, new TheStateRestoreCallback());
+ stateManager.register(store1, new TheStateRestoreCallback());
fail("should have raised an illegal argument exception as store has already been registered");
} catch (final IllegalArgumentException e) {
// pass
@@ -190,7 +190,7 @@ public class GlobalStateManagerImplTest {
public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() {
stateManager.initialize(context);
try {
- stateManager.register(store1, false, new TheStateRestoreCallback());
+ stateManager.register(store1, new TheStateRestoreCallback());
fail("Should have raised a StreamsException as there are no partition for the store");
} catch (final StreamsException e) {
// pass
@@ -204,7 +204,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
assertEquals(2, stateRestoreCallback.restored.size());
}
@@ -236,7 +236,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
assertEquals(5, stateRestoreCallback.restored.size());
}
@@ -247,9 +247,9 @@ public class GlobalStateManagerImplTest {
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
// register the stores
initializeConsumer(1, 1, t1);
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
initializeConsumer(1, 1, t2);
- stateManager.register(store2, false, stateRestoreCallback);
+ stateManager.register(store2, stateRestoreCallback);
stateManager.flush();
assertTrue(store1.flushed);
@@ -267,7 +267,7 @@ public class GlobalStateManagerImplTest {
public void flush() {
throw new RuntimeException("KABOOM!");
}
- }, false, stateRestoreCallback);
+ }, stateRestoreCallback);
stateManager.flush();
}
@@ -278,9 +278,9 @@ public class GlobalStateManagerImplTest {
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
// register the stores
initializeConsumer(1, 1, t1);
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
initializeConsumer(1, 1, t2);
- stateManager.register(store2, false, stateRestoreCallback);
+ stateManager.register(store2, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
assertFalse(store1.isOpen());
@@ -292,7 +292,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
initializeConsumer(1, 1, t1);
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
final Map<TopicPartition, Long> expected = Collections.singletonMap(t1, 25L);
stateManager.close(expected);
final Map<TopicPartition, Long> result = readOffsetsCheckpoint();
@@ -308,7 +308,7 @@ public class GlobalStateManagerImplTest {
public void close() {
throw new RuntimeException("KABOOM!");
}
- }, false, stateRestoreCallback);
+ }, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
}
@@ -317,7 +317,7 @@ public class GlobalStateManagerImplTest {
public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() {
stateManager.initialize(context);
try {
- stateManager.register(store1, false, null);
+ stateManager.register(store1, null);
fail("should have thrown due to null callback");
} catch (IllegalArgumentException e) {
//pass
@@ -349,7 +349,7 @@ public class GlobalStateManagerImplTest {
}
super.close();
}
- }, false, stateRestoreCallback);
+ }, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -368,9 +368,9 @@ public class GlobalStateManagerImplTest {
throw new RuntimeException("KABOOM!");
}
};
- stateManager.register(store, false, stateRestoreCallback);
+ stateManager.register(store, stateRestoreCallback);
- stateManager.register(store2, false, stateRestoreCallback);
+ stateManager.register(store2, stateRestoreCallback);
try {
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
@@ -415,9 +415,9 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
initializeConsumer(10, 1, t1);
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
initializeConsumer(20, 1, t2);
- stateManager.register(store2, false, stateRestoreCallback);
+ stateManager.register(store2, stateRestoreCallback);
final Map<TopicPartition, Long> initialCheckpoint = stateManager.checkpointed();
stateManager.checkpoint(Collections.singletonMap(t1, 101L));
@@ -444,7 +444,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
final KeyValue<byte[], byte[]> restoredKv = stateRestoreCallback.restored.get(0);
assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value))));
}
@@ -454,7 +454,7 @@ public class GlobalStateManagerImplTest {
stateManager.initialize(context);
final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
initializeConsumer(10, 1, t1);
- stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.register(store1, stateRestoreCallback);
stateManager.close(Collections.<TopicPartition, Long>emptyMap());
final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 4ece443..63783a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
@@ -51,6 +52,7 @@ import static org.junit.Assert.fail;
public class GlobalStateTaskTest {
+ private final LogContext logContext = new LogContext();
private Map<TopicPartition, Long> offsets;
private GlobalStateUpdateTask globalStateTask;
private GlobalStateManagerStub stateMgr;
@@ -92,7 +94,7 @@ public class GlobalStateTaskTest {
offsets.put(t1, 50L);
offsets.put(t2, 100L);
stateMgr = new GlobalStateManagerStub(storeNames, offsets);
- globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler());
+ globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, new LogAndFailExceptionHandler(), logContext);
}
@Test
@@ -175,8 +177,12 @@ public class GlobalStateTaskTest {
@Test
public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() throws Exception {
- final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
- new LogAndContinueExceptionHandler());
+ final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+ topology,
+ context,
+ stateMgr,
+ new LogAndContinueExceptionHandler(),
+ logContext);
final byte[] key = new LongSerializer().serialize("t2", 1L);
final byte[] recordValue = new IntegerSerializer().serialize("t2", 10);
@@ -185,8 +191,12 @@ public class GlobalStateTaskTest {
@Test
public void shouldNotThrowStreamsExceptionWhenValueDeserializationFails() throws Exception {
- final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(topology, context, stateMgr,
- new LogAndContinueExceptionHandler());
+ final GlobalStateUpdateTask globalStateTask2 = new GlobalStateUpdateTask(
+ topology,
+ context,
+ stateMgr,
+ new LogAndContinueExceptionHandler(),
+ logContext);
final byte[] key = new IntegerSerializer().serialize("t2", 1);
final byte[] recordValue = new LongSerializer().serialize("t2", 10L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 95636ec..e223699 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -295,6 +295,7 @@ public class InternalTopologyBuilderTest {
} catch (final TopologyException expected) { /* ok */ }
}
+ @SuppressWarnings("deprecation")
@Test
public void testAddStateStore() {
final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
@@ -344,6 +345,7 @@ public class InternalTopologyBuilderTest {
assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
}
+ @SuppressWarnings("deprecation")
@Test
public void testTopicGroupsByStateStore() {
builder.setApplicationId("X");
@@ -470,6 +472,7 @@ public class InternalTopologyBuilderTest {
builder.setApplicationId(null);
}
+ @SuppressWarnings("deprecation")
@Test(expected = NullPointerException.class)
public void shouldNotAddNullStateStoreSupplier() throws Exception {
builder.addStateStore((StateStoreSupplier) null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e5f2471c/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index d9f38eb..00a2c35 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.MockSourceNode;
@@ -35,14 +36,27 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
public class PartitionGroupTest {
+ private final LogContext logContext = new LogContext();
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
private final String[] topics = {"topic"};
private final TopicPartition partition1 = new TopicPartition(topics[0], 1);
private final TopicPartition partition2 = new TopicPartition(topics[0], 2);
- private final RecordQueue queue1 = new RecordQueue(partition1, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
- private final RecordQueue queue2 = new RecordQueue(partition2, new MockSourceNode<>(topics, intDeserializer, intDeserializer), timestampExtractor, new LogAndContinueExceptionHandler(), null);
+ private final RecordQueue queue1 = new RecordQueue(
+ partition1,
+ new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+ timestampExtractor,
+ new LogAndContinueExceptionHandler(),
+ null,
+ logContext);
+ private final RecordQueue queue2 = new RecordQueue(
+ partition2,
+ new MockSourceNode<>(topics, intDeserializer, intDeserializer),
+ timestampExtractor,
+ new LogAndContinueExceptionHandler(),
+ null,
+ logContext);
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);