You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/03 05:08:40 UTC
kafka git commit: KAFKA-2591: Fix StreamingMetrics
Repository: kafka
Updated Branches:
refs/heads/trunk b56e02b65 -> 37f7d75e3
KAFKA-2591: Fix StreamingMetrics
Remove state storage upon unclean shutdown and fix streaming metrics used for local state.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Edward Ribeiro, Yasuhiro Matsuda, Jun Rao
Closes #265 from guozhangwang/K2591
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37f7d75e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37f7d75e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37f7d75e
Branch: refs/heads/trunk
Commit: 37f7d75e3d55c600902cd15c3cb219ddd221d23c
Parents: b56e02b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Oct 2 20:12:34 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 2 20:12:34 2015 -0700
----------------------------------------------------------------------
.../org/apache/kafka/common/MetricName.java | 5 +-
.../kafka/common/metrics/MetricsTest.java | 3 +-
.../apache/kafka/streams/KafkaStreaming.java | 35 +++++++-
.../apache/kafka/streams/StreamingConfig.java | 46 ++++++++--
.../apache/kafka/streams/StreamingMetrics.java | 27 ++++++
.../kafka/streams/kstream/SlidingWindowDef.java | 10 +--
.../streams/processor/ProcessorContext.java | 8 +-
.../kafka/streams/processor/RestoreFunc.java | 27 ------
.../streams/processor/StateRestoreCallback.java | 27 ++++++
.../internals/ProcessorContextImpl.java | 14 +--
.../internals/ProcessorStateManager.java | 13 ++-
.../streams/processor/internals/StreamTask.java | 8 +-
.../processor/internals/StreamThread.java | 94 +++++++++++++++-----
.../streams/state/InMemoryKeyValueStore.java | 7 +-
.../streams/state/MeteredKeyValueStore.java | 73 +++++----------
.../streams/state/RocksDBKeyValueStore.java | 6 +-
.../internals/ProcessorStateManagerTest.java | 18 ++--
.../processor/internals/StreamTaskTest.java | 4 +-
.../processor/internals/StreamThreadTest.java | 9 +-
.../apache/kafka/test/MockProcessorContext.java | 10 +--
.../kafka/test/ProcessorTopologyTestDriver.java | 21 ++++-
21 files changed, 295 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 04b4a09..ee50f33 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -87,11 +87,12 @@ public final class MetricName {
private static Map<String, String> getTags(String... keyValue) {
if ((keyValue.length % 2) != 0)
- throw new IllegalArgumentException("keyValue needs to be specified in paris");
+ throw new IllegalArgumentException("keyValue needs to be specified in pairs");
Map<String, String> tags = new HashMap<String, String>();
- for (int i = 0; i < keyValue.length / 2; i++)
+ for (int i = 0; i < keyValue.length; i += 2)
tags.put(keyValue[i], keyValue[i + 1]);
+
return tags;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5b7736e..175a036 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -47,9 +47,10 @@ public class MetricsTest {
@Test
public void testMetricName() {
- MetricName n1 = new MetricName("name", "group", "description", "key1", "value1");
+ MetricName n1 = new MetricName("name", "group", "description", "key1", "value1", "key2", "value2");
Map<String, String> tags = new HashMap<String, String>();
tags.put("key1", "value1");
+ tags.put("key2", "value2");
MetricName n2 = new MetricName("name", "group", "description", tags);
assertEquals("metric names created in two different ways should be equal", n1, n2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
index f3a99e0..d274fb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -17,11 +17,21 @@
package org.apache.kafka.streams;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
* sends output to zero or more output topics.
@@ -61,8 +71,12 @@ import org.slf4j.LoggerFactory;
public class KafkaStreaming {
private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
+ private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+ private static final String JMX_PREFIX = "kafka.streaming";
+
+ private final Time time;
- // Container States
+ // container states
private static final int CREATED = 0;
private static final int RUNNING = 1;
private static final int STOPPED = 2;
@@ -70,10 +84,27 @@ public class KafkaStreaming {
private final StreamThread[] threads;
+ private String clientId;
+ private final Metrics metrics;
+
public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
+ // create the metrics
+ this.time = new SystemTime();
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+ clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
+ if (clientId.length() <= 0)
+ clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+ List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(JMX_PREFIX));
+ this.metrics = new Metrics(metricConfig, reporters, time);
+
this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config);
+ this.threads[i] = new StreamThread(builder, config, this.clientId, this.metrics, this.time);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index dce69b6..93df4c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
public class StreamingConfig extends AbstractConfig {
private static final ConfigDef CONFIG;
@@ -83,6 +85,15 @@ public class StreamingConfig extends AbstractConfig {
/** <code>value.deserializer</code> */
public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+ /** <code>metrics.sample.window.ms</code> */
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+ /** <code>metrics.num.samples</code> */
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+ /** <code>metric.reporters</code> */
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
/**
* <code>bootstrap.servers</code>
*/
@@ -97,15 +108,15 @@ public class StreamingConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.CLIENT_ID_DOC)
.define(STATE_DIR_CONFIG,
- Type.STRING,
- SYSTEM_TEMP_DIRECTORY,
- Importance.MEDIUM,
- STATE_DIR_DOC)
+ Type.STRING,
+ SYSTEM_TEMP_DIRECTORY,
+ Importance.MEDIUM,
+ STATE_DIR_DOC)
.define(COMMIT_INTERVAL_MS_CONFIG,
- Type.LONG,
- 30000,
- Importance.HIGH,
- COMMIT_INTERVAL_MS_DOC)
+ Type.LONG,
+ 30000,
+ Importance.HIGH,
+ COMMIT_INTERVAL_MS_DOC)
.define(POLL_MS_CONFIG,
Type.LONG,
100,
@@ -159,7 +170,24 @@ public class StreamingConfig extends AbstractConfig {
.define(BOOTSTRAP_SERVERS_CONFIG,
Type.STRING,
Importance.HIGH,
- CommonClientConfigs.BOOSTRAP_SERVERS_DOC);
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG,
+ Type.INT,
+ 2,
+ atLeast(1),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
}
public StreamingConfig(Map<?, ?> props) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
new file mode 100644
index 0000000..ebf80b3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamingMetrics {
+
+ Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+ void recordLatency(Sensor sensor, long startNs, long endNs);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
index cc03541..5927db6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
@@ -25,10 +25,10 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.internals.FilteredIterator;
import org.apache.kafka.streams.kstream.internals.WindowSupport;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
import org.apache.kafka.streams.processor.internals.Stamped;
import java.util.Collections;
@@ -83,7 +83,7 @@ public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
@Override
public void init(ProcessorContext context) {
this.context = context;
- RestoreFuncImpl restoreFunc = new RestoreFuncImpl();
+ SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
context.register(this, restoreFunc);
for (ValueList<V> valueList : map.values()) {
@@ -229,17 +229,17 @@ public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
return false;
}
- private class RestoreFuncImpl implements RestoreFunc {
+ private class SlidingWindowRegistryCallback implements StateRestoreCallback {
final IntegerDeserializer intDeserializer;
int slotNum = 0;
- RestoreFuncImpl() {
+ SlidingWindowRegistryCallback() {
intDeserializer = new IntegerDeserializer();
}
@Override
- public void apply(byte[] slot, byte[] bytes) {
+ public void restore(byte[] slot, byte[] bytes) {
slotNum = intDeserializer.deserialize("", slot);
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 6b32b83..adffe0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,9 +17,9 @@
package org.apache.kafka.streams.processor;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingMetrics;
import java.io.File;
@@ -70,9 +70,9 @@ public interface ProcessorContext {
/**
* Returns Metrics instance
*
- * @return Metrics
+ * @return StreamingMetrics
*/
- Metrics metrics();
+ StreamingMetrics metrics();
/**
* Check if this process's incoming streams are joinable
@@ -84,7 +84,7 @@ public interface ProcessorContext {
*
* @param store the storage engine
*/
- void register(StateStore store, RestoreFunc restoreFunc);
+ void register(StateStore store, StateRestoreCallback stateRestoreCallback);
StateStore getStateStore(String name);
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java b/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
deleted file mode 100644
index 883147e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * Restoration logic for log-backed state stores upon restart,
- * it takes one record at a time from the logs to apply to the restoring state.
- */
-public interface RestoreFunc {
-
- void apply(byte[] key, byte[] value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
new file mode 100644
index 0000000..39decec
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * Restoration logic for log-backed state stores upon restart,
+ * it takes one record at a time from the logs to apply to the restoring state.
+ */
+public interface StateRestoreCallback {
+
+ void restore(byte[] key, byte[] value);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index b350222..60ac1df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@ public class ProcessorContextImpl implements ProcessorContext {
private final int id;
private final StreamTask task;
- private final Metrics metrics;
+ private final StreamingMetrics metrics;
private final RecordCollector collector;
private final ProcessorStateManager stateMgr;
@@ -60,7 +60,7 @@ public class ProcessorContextImpl implements ProcessorContext {
StreamingConfig config,
RecordCollector collector,
ProcessorStateManager stateMgr,
- Metrics metrics) {
+ StreamingMetrics metrics) {
this.id = id;
this.task = task;
this.metrics = metrics;
@@ -143,16 +143,16 @@ public class ProcessorContextImpl implements ProcessorContext {
}
@Override
- public Metrics metrics() {
+ public StreamingMetrics metrics() {
return metrics;
}
@Override
- public void register(StateStore store, RestoreFunc restoreFunc) {
+ public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
if (initialized)
throw new KafkaException("Can only create state stores during initialization.");
- stateMgr.register(store, restoreFunc);
+ stateMgr.register(store, stateRestoreCallback);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2f1fb35..59a6394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -96,7 +96,7 @@ public class ProcessorStateManager {
return this.baseDir;
}
- public void register(StateStore store, RestoreFunc restoreFunc) {
+ public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
if (store.name().equals(CHECKPOINT_FILE_NAME))
throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
@@ -138,12 +138,11 @@ public class ProcessorStateManager {
restoreConsumer.seekToEnd(storePartition);
long endOffset = restoreConsumer.position(storePartition);
- // load the previously flushed state and restore from the checkpointed offset of the change log
- // if it exists in the offset file; restore the state from the beginning of the change log otherwise
- if (checkpointedOffsets.containsKey(storePartition)) {
+ // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
+ // restore the state from the beginning of the change log otherwise
+ if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
} else {
- // TODO: in this case, we need to ignore the preciously flushed state
restoreConsumer.seekToBeginning(storePartition);
}
@@ -151,7 +150,7 @@ public class ProcessorStateManager {
// should not change since it is only written by this thread.
while (true) {
for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
- restoreFunc.apply(record.key(), record.value());
+ stateRestoreCallback.restore(record.key(), record.value());
}
if (restoreConsumer.position(storePartition) == endOffset) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 40fb723..6afa427 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -23,8 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
@@ -74,6 +74,7 @@ public class StreamTask implements Punctuator {
* @param partitions the collection of assigned {@link TopicPartition}
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamingConfig} specified by the user
+ * @param metrics the {@link StreamingMetrics} created by the thread
*/
public StreamTask(int id,
Consumer<byte[], byte[]> consumer,
@@ -81,7 +82,8 @@ public class StreamTask implements Punctuator {
Consumer<byte[], byte[]> restoreConsumer,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
- StreamingConfig config) {
+ StreamingConfig config,
+ StreamingMetrics metrics) {
this.id = id;
this.consumer = consumer;
@@ -119,7 +121,7 @@ public class StreamTask implements Punctuator {
}
// initialize the topology with its own context
- this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, new Metrics());
+ this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
// initialize the task by initializing all its processor nodes in the topology
for (ProcessorNode node : this.topology.processors()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f37903f..95a923d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
@@ -34,10 +35,10 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class StreamThread extends Thread {
private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
- private static AtomicInteger nextThreadNumber = new AtomicInteger(1);
+ private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
private final AtomicBoolean running;
@@ -69,13 +70,14 @@ public class StreamThread extends Thread {
protected final Consumer<byte[], byte[]> restoreConsumer;
private final Map<Integer, StreamTask> tasks;
+ private final String clientId;
private final Time time;
private final File stateDir;
private final long pollTimeMs;
private final long cleanTimeMs;
private final long commitTimeMs;
private final long totalRecordsToProcess;
- private final StreamingMetrics metrics;
+ private final StreamingMetricsImpl sensors;
private long lastClean;
private long lastCommit;
@@ -96,19 +98,27 @@ public class StreamThread extends Thread {
}
};
- public StreamThread(TopologyBuilder builder, StreamingConfig config) throws Exception {
- this(builder, config, null , null, null, new SystemTime());
+ public StreamThread(TopologyBuilder builder,
+ StreamingConfig config,
+ String clientId,
+ Metrics metrics,
+ Time time) throws Exception {
+ this(builder, config, null , null, null, clientId, metrics, time);
}
- StreamThread(TopologyBuilder builder, StreamingConfig config,
+ StreamThread(TopologyBuilder builder,
+ StreamingConfig config,
Producer<byte[], byte[]> producer,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
+ String clientId,
+ Metrics metrics,
Time time) throws Exception {
- super("StreamThread-" + nextThreadNumber.getAndIncrement());
+ super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
this.config = config;
this.builder = builder;
+ this.clientId = clientId;
// set the producer and consumer clients
this.producer = (producer != null) ? producer : createProducer();
@@ -131,7 +141,7 @@ public class StreamThread extends Thread {
this.recordsProcessed = 0;
this.time = time;
- this.metrics = new StreamingMetrics();
+ this.sensors = new StreamingMetricsImpl(metrics);
this.running = new AtomicBoolean(true);
}
@@ -237,7 +247,7 @@ public class StreamThread extends Thread {
}
long endPoll = time.milliseconds();
- metrics.pollTimeSensor.record(endPoll - startPoll);
+ sensors.pollTimeSensor.record(endPoll - startPoll);
// try to process one record from each task
totalNumBuffered = 0;
@@ -247,7 +257,7 @@ public class StreamThread extends Thread {
totalNumBuffered += task.process();
- metrics.processTimeSensor.record(time.milliseconds() - startProcess);
+ sensors.processTimeSensor.record(time.milliseconds() - startProcess);
}
maybePunctuate();
@@ -279,7 +289,7 @@ public class StreamThread extends Thread {
long now = time.milliseconds();
if (task.maybePunctuate(now))
- metrics.punctuateTimeSensor.record(time.milliseconds() - now);
+ sensors.punctuateTimeSensor.record(time.milliseconds() - now);
} catch (Exception e) {
log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
@@ -334,7 +344,7 @@ public class StreamThread extends Thread {
throw e;
}
- metrics.commitTimeSensor.record(time.milliseconds() - now);
+ sensors.commitTimeSensor.record(time.milliseconds() - now);
}
/**
@@ -381,9 +391,9 @@ public class StreamThread extends Thread {
}
protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
- metrics.taskCreationSensor.record();
+ sensors.taskCreationSensor.record();
- return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config);
+ return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
}
private void addPartitions(Collection<TopicPartition> assignment) {
@@ -425,13 +435,15 @@ public class StreamThread extends Thread {
log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
throw e;
}
- metrics.taskDestructionSensor.record();
+ sensors.taskDestructionSensor.record();
}
tasks.clear();
}
- private class StreamingMetrics {
+ private class StreamingMetricsImpl implements StreamingMetrics {
final Metrics metrics;
+ final String metricGrpName;
+ final Map<String, String> metricTags;
final Sensor commitTimeSensor;
final Sensor pollTimeSensor;
@@ -440,12 +452,12 @@ public class StreamThread extends Thread {
final Sensor taskCreationSensor;
final Sensor taskDestructionSensor;
- public StreamingMetrics() {
- String metricGrpName = "streaming-metrics";
+ public StreamingMetricsImpl(Metrics metrics) {
- this.metrics = new Metrics();
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
- metricTags.put("client-id", config.getString(StreamingConfig.CLIENT_ID_CONFIG) + "-" + getName());
+ this.metrics = metrics;
+ this.metricGrpName = "streaming-metrics";
+ this.metricTags = new LinkedHashMap<>();
+ this.metricTags.put("client-id", clientId + "-" + getName());
this.commitTimeSensor = metrics.sensor("commit-time");
this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
@@ -473,5 +485,45 @@ public class StreamThread extends Thread {
this.taskDestructionSensor = metrics.sensor("task-destruction");
this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
}
+
+ @Override
+ public void recordLatency(Sensor sensor, long startNs, long endNs) {
+ sensor.record((endNs - startNs) / 1000000, endNs);
+ }
+
+ @Override
+ public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+ // extract the additional tags if there are any
+ Map<String, String> tagMap = new HashMap<>(this.metricTags);
+ if ((tags.length % 2) != 0)
+ throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
+
+ for (int i = 0; i < tags.length; i += 2)
+ tagMap.put(tags[i], tags[i + 1]);
+
+ // first add the global operation metrics if not yet, with the global tags only
+ Sensor parent = metrics.sensor(operationName);
+ addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
+
+ // add the store operation metrics with additional tags
+ Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
+ addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
+
+ return sensor;
+ }
+
+ private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
+ maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
+ "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
+ maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
+ "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
+ maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
+ "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
+ }
+
+ private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
+ if (!metrics.metrics().containsKey(name))
+ sensor.add(name, stat);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
index e9aaa20..59a8496 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
@@ -40,7 +40,7 @@ public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
}
public InMemoryKeyValueStore(String name, ProcessorContext context, Time time) {
- super(name, new MemoryStore<K, V>(name, context), context, "kafka-streams", time);
+ super(name, new MemoryStore<K, V>(name, context), context, "in-memory-state", time);
}
private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -103,11 +103,6 @@ public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
// do-nothing since it is in-memory
}
- public void restore() {
- // this should not happen since it is in-memory, hence no state to load from disk
- throw new IllegalStateException("This should not happen");
- }
-
@Override
public void close() {
// do-nothing
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 018f1c6..68333d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,17 +17,11 @@
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Time;
@@ -43,7 +37,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
protected final KeyValueStore<K, V> inner;
private final Time time;
- private final String group;
private final Sensor putTime;
private final Sensor getTime;
private final Sensor deleteTime;
@@ -52,7 +45,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private final Sensor rangeTime;
private final Sensor flushTime;
private final Sensor restoreTime;
- private final Metrics metrics;
+ private final StreamingMetrics metrics;
private final String topic;
private final int partition;
@@ -61,20 +54,19 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
private final ProcessorContext context;
// always wrap the logged store with the metered store
- public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String group, Time time) {
+ public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String metricGrp, Time time) {
this.inner = inner;
this.time = time;
- this.group = group;
this.metrics = context.metrics();
- this.putTime = createSensor(name, "put");
- this.getTime = createSensor(name, "get");
- this.deleteTime = createSensor(name, "delete");
- this.putAllTime = createSensor(name, "put-all");
- this.allTime = createSensor(name, "all");
- this.rangeTime = createSensor(name, "range");
- this.flushTime = createSensor(name, "flush");
- this.restoreTime = createSensor(name, "restore");
+ this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
+ this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
+ this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
+ this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
+ this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
+ this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
+ this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
+ this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
this.topic = name;
this.partition = context.id();
@@ -90,37 +82,18 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
final Deserializer<K> keyDeserializer = (Deserializer<K>) context.keyDeserializer();
final Deserializer<V> valDeserializer = (Deserializer<V>) context.valueDeserializer();
- context.register(this, new RestoreFunc() {
+ context.register(this, new StateRestoreCallback() {
@Override
- public void apply(byte[] key, byte[] value) {
+ public void restore(byte[] key, byte[] value) {
inner.put(keyDeserializer.deserialize(topic, key),
valDeserializer.deserialize(topic, value));
}
});
} finally {
- recordLatency(this.restoreTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
}
}
- private Sensor createSensor(String storeName, String operation) {
- Sensor parent = metrics.sensor(operation);
- addLatencyMetrics(parent, operation);
- Sensor sensor = metrics.sensor(storeName + "- " + operation, parent);
- addLatencyMetrics(sensor, operation, "store-name", storeName);
- return sensor;
- }
-
- private void addLatencyMetrics(Sensor sensor, String opName, String... kvs) {
- maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", group, "The average latency in milliseconds of the key-value store operation.", kvs), new Avg());
- maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", group, "The max latency in milliseconds of the key-value store operation.", kvs), new Max());
- maybeAddMetric(sensor, new MetricName(opName + "-qps", group, "The average number of occurance of the given key-value store operation per second.", kvs), new Rate(new Count()));
- }
-
- private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
- if (!metrics.metrics().containsKey(name))
- sensor.add(name, stat);
- }
-
@Override
public String name() {
return inner.name();
@@ -137,7 +110,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
return this.inner.get(key);
} finally {
- recordLatency(this.getTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
}
}
@@ -151,7 +124,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
if (this.dirty.size() > this.maxDirty)
logChange();
} finally {
- recordLatency(this.putTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
}
}
@@ -168,7 +141,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
if (this.dirty.size() > this.maxDirty)
logChange();
} finally {
- recordLatency(this.putAllTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
}
}
@@ -184,7 +157,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
return value;
} finally {
- recordLatency(this.deleteTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
}
}
@@ -210,7 +183,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
this.inner.flush();
logChange();
} finally {
- recordLatency(this.flushTime, startNs, time.nanoseconds());
+ this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
}
}
@@ -228,10 +201,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
}
}
- private void recordLatency(Sensor sensor, long startNs, long endNs) {
- sensor.record((endNs - startNs) / 1000000, endNs);
- }
-
private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
private final KeyValueIterator<K1, V1> iter;
@@ -264,7 +233,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
try {
iter.close();
} finally {
- recordLatency(this.sensor, this.startNs, time.nanoseconds());
+ metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index e0962a2..373bba0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -44,7 +44,7 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
}
public RocksDBKeyValueStore(String name, ProcessorContext context, Time time) {
- super(name, new RocksDBStore(name, context), context, "kafka-streams", time);
+ super(name, new RocksDBStore(name, context), context, "rocksdb-state", time);
}
private static class RocksDBStore implements KeyValueStore<byte[], byte[]> {
@@ -52,13 +52,13 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
private static final int TTL_NOT_USED = -1;
// TODO: these values should be configurable
+ private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+ private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
private static final long BLOCK_SIZE = 4096L;
private static final int TTL_SECONDS = TTL_NOT_USED;
private static final int MAX_WRITE_BUFFERS = 3;
- private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
- private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
private static final String DB_FILE_DIR = "rocksdb";
private final String topic;
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 343ed52..3049d51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.OffsetCheckpoint;
import org.junit.Test;
@@ -82,11 +82,11 @@ public class ProcessorStateManagerTest {
return persistent;
}
- public final RestoreFunc restoreFunc = new RestoreFunc() {
+ public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
private final Deserializer<Integer> deserializer = new IntegerDeserializer();
@Override
- public void apply(byte[] key, byte[] value) {
+ public void restore(byte[] key, byte[] value) {
keys.add(deserializer.deserialize("", key));
}
};
@@ -259,7 +259,7 @@ public class ProcessorStateManagerTest {
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
try {
- stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+ stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
} finally {
stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
}
@@ -299,7 +299,7 @@ public class ProcessorStateManagerTest {
);
}
- stateMgr.register(persistentStore, persistentStore.restoreFunc);
+ stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
@@ -347,7 +347,7 @@ public class ProcessorStateManagerTest {
);
}
- stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+ stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
assertEquals(0L, restoreConsumer.seekOffset);
@@ -375,7 +375,7 @@ public class ProcessorStateManagerTest {
ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
try {
- stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+ stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
assertNull(stateMgr.getStore("noSuchStore"));
assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
@@ -420,10 +420,10 @@ public class ProcessorStateManagerTest {
assertFalse(checkpointFile.exists());
restoreConsumer.reset();
- stateMgr.register(persistentStore, persistentStore.restoreFunc);
+ stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
restoreConsumer.reset();
- stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+ stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
} finally {
// close the state manager with the ack'ed offsets
stateMgr.close(ackedOffsets);
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8dcfc40..f93093c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -89,7 +89,7 @@ public class StreamTaskTest {
@SuppressWarnings("unchecked")
@Test
public void testProcessOrder() {
- StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config);
+ StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -133,7 +133,7 @@ public class StreamTaskTest {
@SuppressWarnings("unchecked")
@Test
public void testPauseResume() {
- StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config);
+ StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1f3e541..a7e707e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
@@ -79,7 +80,7 @@ public class StreamThreadTest {
Collection<TopicPartition> partitions,
ProcessorTopology topology,
StreamingConfig config) {
- super(id, consumer, producer, restoreConsumer, partitions, topology, config);
+ super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
}
@Override
@@ -104,7 +105,7 @@ public class StreamThreadTest {
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, new SystemTime()) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
@@ -207,7 +208,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
@Override
public void maybeClean() {
super.maybeClean();
@@ -325,7 +326,7 @@ public class StreamThreadTest {
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("source1", "topic1");
- StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+ StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
@Override
public void maybeCommit() {
super.maybeCommit();
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 3fdfc82..c0b09f6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,10 +17,10 @@
package org.apache.kafka.test;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
@@ -83,13 +83,13 @@ public class MockProcessorContext implements ProcessorContext {
}
@Override
- public Metrics metrics() {
+ public StreamingMetrics metrics() {
throw new UnsupportedOperationException("metrics() not supported.");
}
@Override
- public void register(StateStore store, RestoreFunc func) {
- if (func != null) throw new UnsupportedOperationException("RestoreFunc not supported.");
+ public void register(StateStore store, StateRestoreCallback func) {
+ if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
storeMap.put(store.name(), store);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 75f8b4c..8eb2c62 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -23,10 +23,12 @@ import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -153,7 +155,24 @@ public class ProcessorTopologyTestDriver {
offsetsByTopicPartition.put(tp, new AtomicLong());
}
- task = new StreamTask(id, consumer, producer, restoreStateConsumer, partitionsByTopic.values(), topology, config);
+ task = new StreamTask(id,
+ consumer,
+ producer,
+ restoreStateConsumer,
+ partitionsByTopic.values(),
+ topology,
+ config,
+ new StreamingMetrics() {
+ @Override
+ public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+ return null;
+ }
+
+ @Override
+ public void recordLatency(Sensor sensor, long startNs, long endNs) {
+ // do nothing
+ }
+ });
}
/**