You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/01/22 22:00:12 UTC
[4/4] kafka git commit: KAFKA-3136: Rename KafkaStreaming to
KafkaStreams
KAFKA-3136: Rename KafkaStreaming to KafkaStreams
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Gwen Shapira
Closes #800 from guozhangwang/KRename
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/21c6cfe5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/21c6cfe5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/21c6cfe5
Branch: refs/heads/trunk
Commit: 21c6cfe50dbe818a392c28f48ce8891f7f99aaf6
Parents: 91ba074
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Jan 22 13:00:00 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Jan 22 13:00:00 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/KafkaStreaming.java | 167 ------
.../org/apache/kafka/streams/KafkaStreams.java | 171 +++++++
.../java/org/apache/kafka/streams/KeyValue.java | 37 ++
.../apache/kafka/streams/StreamingConfig.java | 314 ------------
.../apache/kafka/streams/StreamingMetrics.java | 27 -
.../org/apache/kafka/streams/StreamsConfig.java | 303 +++++++++++
.../apache/kafka/streams/StreamsMetrics.java | 27 +
.../kafka/streams/examples/KStreamJob.java | 24 +-
.../kafka/streams/examples/ProcessorJob.java | 34 +-
.../apache/kafka/streams/kstream/KStream.java | 1 +
.../apache/kafka/streams/kstream/KTable.java | 1 +
.../apache/kafka/streams/kstream/KeyValue.java | 34 --
.../kstream/internals/KStreamAggregate.java | 2 +-
.../kstream/internals/KStreamFlatMap.java | 2 +-
.../streams/kstream/internals/KStreamImpl.java | 10 +-
.../kstream/internals/KStreamKStreamJoin.java | 2 +-
.../streams/kstream/internals/KStreamMap.java | 2 +-
.../kstream/internals/KStreamReduce.java | 2 +-
.../kstream/internals/KStreamTransform.java | 2 +-
.../streams/kstream/internals/KTableImpl.java | 2 +-
.../kstream/internals/KTableRepartitionMap.java | 2 +-
.../internals/WindowedStreamPartitioner.java | 52 --
.../internals/WindowedStreamsPartitioner.java | 52 ++
.../streams/processor/ProcessorContext.java | 6 +-
.../streams/processor/StreamPartitioner.java | 59 ---
.../streams/processor/StreamsPartitioner.java | 59 +++
.../streams/processor/TopologyBuilder.java | 76 +--
.../processor/internals/AbstractTask.java | 6 +-
.../KafkaStreamingPartitionAssignor.java | 483 ------------------
.../internals/ProcessorContextImpl.java | 12 +-
.../processor/internals/RecordCollector.java | 4 +-
.../streams/processor/internals/SinkNode.java | 6 +-
.../processor/internals/StandbyContextImpl.java | 12 +-
.../processor/internals/StandbyTask.java | 12 +-
.../internals/StreamPartitionAssignor.java | 483 ++++++++++++++++++
.../streams/processor/internals/StreamTask.java | 16 +-
.../processor/internals/StreamThread.java | 48 +-
.../org/apache/kafka/streams/state/Entry.java | 42 --
.../kafka/streams/state/KeyValueIterator.java | 4 +-
.../kafka/streams/state/KeyValueStore.java | 3 +-
.../streams/state/WindowStoreIterator.java | 2 +-
.../InMemoryKeyValueStoreSupplier.java | 12 +-
.../InMemoryLRUCacheStoreSupplier.java | 12 +-
.../state/internals/MeteredKeyValueStore.java | 14 +-
.../state/internals/MeteredWindowStore.java | 6 +-
.../streams/state/internals/RocksDBStore.java | 16 +-
.../state/internals/RocksDBWindowStore.java | 9 +-
.../kafka/streams/StreamingConfigTest.java | 75 ---
.../apache/kafka/streams/StreamsConfigTest.java | 76 +++
.../kstream/internals/KStreamFlatMapTest.java | 2 +-
.../internals/KStreamKTableLeftJoinTest.java | 2 +-
.../kstream/internals/KStreamMapTest.java | 2 +-
.../kstream/internals/KStreamTransformTest.java | 2 +-
.../kstream/internals/KTableKTableJoinTest.java | 2 +-
.../internals/KTableKTableLeftJoinTest.java | 2 +-
.../internals/KTableKTableOuterJoinTest.java | 2 +-
.../WindowedStreamPartitionerTest.java | 84 ---
.../WindowedStreamsPartitionerTest.java | 84 +++
.../KafkaStreamingPartitionAssignorTest.java | 508 ------------------
.../internals/ProcessorTopologyTest.java | 27 +-
.../processor/internals/StandbyTaskTest.java | 31 +-
.../internals/StreamPartitionAssignorTest.java | 509 +++++++++++++++++++
.../processor/internals/StreamTaskTest.java | 27 +-
.../processor/internals/StreamThreadTest.java | 37 +-
.../streams/state/KeyValueStoreTestDriver.java | 45 +-
.../internals/AbstractKeyValueStoreTest.java | 22 +-
.../state/internals/RocksDBWindowStoreTest.java | 38 +-
.../apache/kafka/test/KStreamTestDriver.java | 4 +-
.../apache/kafka/test/MockProcessorContext.java | 14 +-
.../apache/kafka/test/NoOpKeyValueMapper.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 32 +-
71 files changed, 2133 insertions(+), 2168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index 0d99739..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- * Map<String, Object> props = new HashMap<>();
- * props.put("bootstrap.servers", "localhost:4242");
- * props.put("key.deserializer", StringDeserializer.class);
- * props.put("value.deserializer", StringDeserializer.class);
- * props.put("key.serializer", StringSerializer.class);
- * props.put("value.serializer", IntegerSerializer.class);
- * props.put("timestamp.extractor", MyTimestampExtractor.class);
- * StreamingConfig config = new StreamingConfig(props);
- *
- * KStreamBuilder builder = new KStreamBuilder();
- * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
- *
- * KafkaStreaming streaming = new KafkaStreaming(builder, config);
- * streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
- private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
- private static final String JMX_PREFIX = "kafka.streams";
-
- // container states
- private static final int CREATED = 0;
- private static final int RUNNING = 1;
- private static final int STOPPED = 2;
- private int state = CREATED;
-
- private final StreamThread[] threads;
-
- // processId is expected to be unique across JVMs and to be used
- // in userData of the subscription request to allow assignor be aware
- // of the co-location of stream thread's consumers. It is for internal
- // usage only and should not be exposed to users at all.
- private final UUID processId;
-
- public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
- // create the metrics
- Time time = new SystemTime();
-
- this.processId = UUID.randomUUID();
-
- String jobId = config.getString(StreamingConfig.JOB_ID_CONFIG);
- if (jobId.length() <= 0)
- jobId = "kafka-streams";
-
- String clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
- if (clientId.length() <= 0)
- clientId = jobId + "-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
-
- List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- reporters.add(new JmxReporter(JMX_PREFIX));
-
- MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
- TimeUnit.MILLISECONDS);
-
- Metrics metrics = new Metrics(metricConfig, reporters, time);
-
- this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
- for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
- }
- }
-
- /**
- * Start the stream process by starting all its threads
- */
- public synchronized void start() {
- log.debug("Starting Kafka Stream process");
-
- if (state == CREATED) {
- for (StreamThread thread : threads)
- thread.start();
-
- state = RUNNING;
-
- log.info("Started Kafka Stream process");
- } else {
- throw new IllegalStateException("This process was already started.");
- }
- }
-
- /**
- * Shutdown this stream process by signaling the threads to stop,
- * wait for them to join and clean up the process instance.
- */
- public synchronized void close() {
- log.debug("Stopping Kafka Stream process");
-
- if (state == RUNNING) {
- // signal the threads to stop and wait
- for (StreamThread thread : threads)
- thread.close();
-
- for (StreamThread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException ex) {
- Thread.interrupted();
- }
- }
-
- state = STOPPED;
-
- log.info("Stopped Kafka Stream process");
- } else {
- throw new IllegalStateException("This process has not started yet.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
new file mode 100644
index 0000000..071cef6
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Kafka Streams allows for performing continuous computation on input coming from one or more input topics and
+ * sends output to zero or more output topics.
+ * <p>
+ * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
+ * the transformation.
+ * The {@link KafkaStreams} instance will be responsible for the lifecycle of these processors. It will instantiate and
+ * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
+ * <p>
+ * This {@link KafkaStreams} instance will co-ordinate with any other instances (whether in this same process, on other processes
+ * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
+ * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
+ * started in the appropriate processes to balance processing load.
+ * <p>
+ * Internally the {@link KafkaStreams} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
+ * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
+ * <p>
+ * A simple example might look like this:
+ * <pre>
+ * Map<String, Object> props = new HashMap<>();
+ * props.put("bootstrap.servers", "localhost:4242");
+ * props.put("key.deserializer", StringDeserializer.class);
+ * props.put("value.deserializer", StringDeserializer.class);
+ * props.put("key.serializer", StringSerializer.class);
+ * props.put("value.serializer", IntegerSerializer.class);
+ * props.put("timestamp.extractor", MyTimestampExtractor.class);
+ * StreamsConfig config = new StreamsConfig(props);
+ *
+ * KStreamBuilder builder = new KStreamBuilder();
+ * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
+ *
+ * KafkaStreams streams = new KafkaStreams(builder, config);
+ * streams.start();
+ * </pre>
+ *
+ */
+public class KafkaStreams {
+
+ private static final Logger log = LoggerFactory.getLogger(KafkaStreams.class);
+ private static final AtomicInteger STREAM_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+ private static final String JMX_PREFIX = "kafka.streams";
+
+ // container states
+ private static final int CREATED = 0;
+ private static final int RUNNING = 1;
+ private static final int STOPPED = 2;
+ private int state = CREATED;
+
+ private final StreamThread[] threads;
+
+ // processId is expected to be unique across JVMs and to be used
+ // in userData of the subscription request to allow assignor be aware
+ // of the co-location of stream thread's consumers. It is for internal
+ // usage only and should not be exposed to users at all.
+ private final UUID processId;
+
+ public KafkaStreams(TopologyBuilder builder, Properties props) {
+ this(builder, new StreamsConfig(props));
+ }
+
+ public KafkaStreams(TopologyBuilder builder, StreamsConfig config) {
+ // create the metrics
+ Time time = new SystemTime();
+
+ this.processId = UUID.randomUUID();
+
+ // JobId is a required config and hence should always have value
+ String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG);
+
+ String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
+ if (clientId.length() <= 0)
+ clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
+
+ List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+ MetricsReporter.class);
+ reporters.add(new JmxReporter(JMX_PREFIX));
+
+ MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+ .timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+ TimeUnit.MILLISECONDS);
+
+ Metrics metrics = new Metrics(metricConfig, reporters, time);
+
+ this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+ for (int i = 0; i < this.threads.length; i++) {
+ this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time);
+ }
+ }
+
+ /**
+ * Start the stream process by starting all its threads
+ */
+ public synchronized void start() {
+ log.debug("Starting Kafka Stream process");
+
+ if (state == CREATED) {
+ for (StreamThread thread : threads)
+ thread.start();
+
+ state = RUNNING;
+
+ log.info("Started Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process was already started.");
+ }
+ }
+
+ /**
+ * Shutdown this stream process by signaling the threads to stop,
+ * wait for them to join and clean up the process instance.
+ */
+ public synchronized void close() {
+ log.debug("Stopping Kafka Stream process");
+
+ if (state == RUNNING) {
+ // signal the threads to stop and wait
+ for (StreamThread thread : threads)
+ thread.close();
+
+ for (StreamThread thread : threads) {
+ try {
+ thread.join();
+ } catch (InterruptedException ex) {
+ Thread.interrupted();
+ }
+ }
+
+ state = STOPPED;
+
+ log.info("Stopped Kafka Stream process");
+ } else {
+ throw new IllegalStateException("This process has not started yet.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
new file mode 100644
index 0000000..472e677
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/KeyValue.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+public class KeyValue<K, V> {
+
+ public final K key;
+ public final V value;
+
+ public KeyValue(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public static <K, V> KeyValue<K, V> pair(K key, V value) {
+ return new KeyValue<>(key, value);
+ }
+
+ public String toString() {
+ return "KeyValue(" + key + ", " + value + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index e89d030..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
- private static final ConfigDef CONFIG;
-
- /** <code>state.dir</code> */
- public static final String STATE_DIR_CONFIG = "state.dir";
- private static final String STATE_DIR_DOC = "Directory location for state store.";
-
- /** <code>zookeeper.connect<code/> */
- public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
- private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
-
- /** <code>commit.interval.ms</code> */
- public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
- private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
- /** <code>poll.ms</code> */
- public static final String POLL_MS_CONFIG = "poll.ms";
- private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
- /** <code>num.stream.threads</code> */
- public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
- private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
- /** <code>num.stream.threads</code> */
- public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
- private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
-
- /** <code>buffered.records.per.partition</code> */
- public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
- private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
- /** <code>state.cleanup.delay</code> */
- public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
- private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
- /** <code>total.records.to.process</code> */
- public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
- private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
- /** <code>window.time.ms</code> */
- public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
- private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
- + "with this frequency even if there is no message.";
-
- /** <code>timestamp.extractor</code> */
- public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
- private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
- /** <code>partition.grouper</code> */
- public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
- private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
- /** <code>job.id</code> */
- public static final String JOB_ID_CONFIG = "job.id";
- public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
-
- /** <code>key.serializer</code> */
- public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
- /** <code>value.serializer</code> */
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
- /** <code>key.deserializer</code> */
- public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>value.deserializer</code> */
- public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>metrics.sample.window.ms</code> */
- public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
- /** <code>metrics.num.samples</code> */
- public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
- /** <code>metric.reporters</code> */
- public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
- /** <code>bootstrap.servers</code> */
- public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
- /** <code>client.id</code> */
- public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
- private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
- static {
- CONFIG = new ConfigDef().define(JOB_ID_CONFIG,
- Type.STRING,
- "",
- Importance.MEDIUM,
- StreamingConfig.JOB_ID_DOC)
- .define(CLIENT_ID_CONFIG,
- Type.STRING,
- "",
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_ID_DOC)
- .define(ZOOKEEPER_CONNECT_CONFIG,
- Type.STRING,
- "",
- Importance.HIGH,
- StreamingConfig.ZOOKEEPER_CONNECT_DOC)
- .define(STATE_DIR_CONFIG,
- Type.STRING,
- SYSTEM_TEMP_DIRECTORY,
- Importance.MEDIUM,
- STATE_DIR_DOC)
- .define(COMMIT_INTERVAL_MS_CONFIG,
- Type.LONG,
- 30000,
- Importance.HIGH,
- COMMIT_INTERVAL_MS_DOC)
- .define(POLL_MS_CONFIG,
- Type.LONG,
- 100,
- Importance.LOW,
- POLL_MS_DOC)
- .define(NUM_STREAM_THREADS_CONFIG,
- Type.INT,
- 1,
- Importance.LOW,
- NUM_STREAM_THREADS_DOC)
- .define(NUM_STANDBY_REPLICAS_CONFIG,
- Type.INT,
- 0,
- Importance.LOW,
- NUM_STANDBY_REPLICAS_DOC)
- .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
- Type.INT,
- 1000,
- Importance.LOW,
- BUFFERED_RECORDS_PER_PARTITION_DOC)
- .define(STATE_CLEANUP_DELAY_MS_CONFIG,
- Type.LONG,
- 60000,
- Importance.LOW,
- STATE_CLEANUP_DELAY_MS_DOC)
- .define(TOTAL_RECORDS_TO_PROCESS,
- Type.LONG,
- -1L,
- Importance.LOW,
- TOTAL_RECORDS_TO_DOC)
- .define(WINDOW_TIME_MS_CONFIG,
- Type.LONG,
- -1L,
- Importance.MEDIUM,
- WINDOW_TIME_MS_DOC)
- .define(KEY_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
- .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- TIMESTAMP_EXTRACTOR_CLASS_DOC)
- .define(PARTITION_GROUPER_CLASS_CONFIG,
- Type.CLASS,
- DefaultPartitionGrouper.class,
- Importance.HIGH,
- PARTITION_GROUPER_CLASS_DOC)
- .define(BOOTSTRAP_SERVERS_CONFIG,
- Type.STRING,
- Importance.HIGH,
- CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG,
- Type.LIST,
- "",
- Importance.LOW,
- CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
- .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
- Type.LONG,
- 30000,
- atLeast(0),
- Importance.LOW,
- CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG,
- Type.INT,
- 2,
- atLeast(1),
- Importance.LOW,
- CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
- }
-
- public static class InternalConfig {
- public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
- }
-
- public StreamingConfig(Map<?, ?> props) {
- super(CONFIG, props);
- }
-
- public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
- Map<String, Object> props = getBaseConsumerConfigs();
-
- props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
- props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
-
- props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
-
- return props;
- }
-
- public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
- Map<String, Object> props = getBaseConsumerConfigs();
-
- // no need to set group id for a restore consumer
- props.remove(ConsumerConfig.GROUP_ID_CONFIG);
-
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
-
- return props;
- }
-
- private Map<String, Object> getBaseConsumerConfigs() {
- Map<String, Object> props = this.originals();
-
- // set consumer default property values
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- // remove properties that are not required for consumers
- props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
- props.remove(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG);
-
- return props;
- }
-
- public Map<String, Object> getProducerConfigs(String clientId) {
- Map<String, Object> props = this.originals();
-
- // set producer default property values
- props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
- // remove properties that are not required for producers
- props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
- props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
-
- return props;
- }
-
- public Serializer keySerializer() {
- return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Serializer valueSerializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Deserializer keyDeserializer() {
- return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public Deserializer valueDeserializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public static void main(String[] args) {
- System.out.println(CONFIG.toHtmlTable());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
- Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
- void recordLatency(Sensor sensor, long startNs, long endNs);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
new file mode 100644
index 0000000..3843b1d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class StreamsConfig extends AbstractConfig {
+
+ private static final ConfigDef CONFIG;
+
+ /** <code>state.dir</code> */
+ public static final String STATE_DIR_CONFIG = "state.dir";
+ private static final String STATE_DIR_DOC = "Directory location for state store.";
+
+ /** <code>zookeeper.connect<code/> */
+ public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
+ private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management.";
+
+ /** <code>commit.interval.ms</code> */
+ public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
+ private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
+
+ /** <code>poll.ms</code> */
+ public static final String POLL_MS_CONFIG = "poll.ms";
+ private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
+
+ /** <code>num.stream.threads</code> */
+ public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
+ private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
+
+ /** <code>num.stream.threads</code> */
+ public static final String NUM_STANDBY_REPLICAS_CONFIG = "num.standby.replicas";
+ private static final String NUM_STANDBY_REPLICAS_DOC = "The number of standby replicas for each task.";
+
+ /** <code>buffered.records.per.partition</code> */
+ public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
+ private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
+
+ /** <code>state.cleanup.delay</code> */
+ public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
+ private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
+
+ /** <code>total.records.to.process</code> */
+ public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
+ private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
+
+ /** <code>timestamp.extractor</code> */
+ public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
+ private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
+
+ /** <code>partition.grouper</code> */
+ public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
+ private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
+
+ /** <code>job.id</code> */
+ public static final String JOB_ID_CONFIG = "job.id";
+ public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix.";
+
+ /** <code>key.serializer</code> */
+ public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+
+ /** <code>value.serializer</code> */
+ public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+ /** <code>key.deserializer</code> */
+ public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+
+ /** <code>value.deserializer</code> */
+ public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+ /** <code>metrics.sample.window.ms</code> */
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+ /** <code>metrics.num.samples</code> */
+ public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+ /** <code>metric.reporters</code> */
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
+ /** <code>bootstrap.servers</code> */
+ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+ /** <code>client.id</code> */
+ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+ private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
+
+ static {
+ CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value
+ Type.STRING,
+ Importance.HIGH,
+ StreamsConfig.JOB_ID_DOC)
+ .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value
+ Type.STRING,
+ Importance.HIGH,
+ CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+ .define(CLIENT_ID_CONFIG,
+ Type.STRING,
+ "",
+ Importance.HIGH,
+ CommonClientConfigs.CLIENT_ID_DOC)
+ .define(ZOOKEEPER_CONNECT_CONFIG,
+ Type.STRING,
+ "",
+ Importance.HIGH,
+ StreamsConfig.ZOOKEEPER_CONNECT_DOC)
+ .define(STATE_DIR_CONFIG,
+ Type.STRING,
+ SYSTEM_TEMP_DIRECTORY,
+ Importance.HIGH,
+ STATE_DIR_DOC)
+ .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
+ .define(VALUE_SERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
+ .define(KEY_DESERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
+ .define(VALUE_DESERIALIZER_CLASS_CONFIG, // required with no default value
+ Type.CLASS,
+ Importance.HIGH,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
+ .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+ Type.CLASS,
+ Importance.MEDIUM,
+ TIMESTAMP_EXTRACTOR_CLASS_DOC)
+ .define(PARTITION_GROUPER_CLASS_CONFIG,
+ Type.CLASS,
+ DefaultPartitionGrouper.class,
+ Importance.MEDIUM,
+ PARTITION_GROUPER_CLASS_DOC)
+ .define(COMMIT_INTERVAL_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ Importance.LOW,
+ COMMIT_INTERVAL_MS_DOC)
+ .define(POLL_MS_CONFIG,
+ Type.LONG,
+ 100,
+ Importance.LOW,
+ POLL_MS_DOC)
+ .define(NUM_STREAM_THREADS_CONFIG,
+ Type.INT,
+ 1,
+ Importance.LOW,
+ NUM_STREAM_THREADS_DOC)
+ .define(NUM_STANDBY_REPLICAS_CONFIG,
+ Type.INT,
+ 0,
+ Importance.LOW,
+ NUM_STANDBY_REPLICAS_DOC)
+ .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+ Type.INT,
+ 1000,
+ Importance.LOW,
+ BUFFERED_RECORDS_PER_PARTITION_DOC)
+ .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+ Type.LONG,
+ 60000,
+ Importance.LOW,
+ STATE_CLEANUP_DELAY_MS_DOC)
+ .define(TOTAL_RECORDS_TO_PROCESS,
+ Type.LONG,
+ -1L,
+ Importance.LOW,
+ TOTAL_RECORDS_TO_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+ .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+ Type.LONG,
+ 30000,
+ atLeast(0),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+ .define(METRICS_NUM_SAMPLES_CONFIG,
+ Type.INT,
+ 2,
+ atLeast(1),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
+ }
+
+ public static class InternalConfig {
+ public static final String STREAM_THREAD_INSTANCE = "__stream.thread.instance__";
+ }
+
+ public StreamsConfig(Map<?, ?> props) {
+ super(CONFIG, props);
+ }
+
+ public Map<String, Object> getConsumerConfigs(StreamThread streamThread, String groupId, String clientId) {
+ Map<String, Object> props = getBaseConsumerConfigs();
+
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG));
+ props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamPartitionAssignor.class.getName());
+
+ props.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
+
+ return props;
+ }
+
+ public Map<String, Object> getRestoreConsumerConfigs(String clientId) {
+ Map<String, Object> props = getBaseConsumerConfigs();
+
+ // no need to set group id for a restore consumer
+ props.remove(ConsumerConfig.GROUP_ID_CONFIG);
+
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-restore-consumer");
+
+ return props;
+ }
+
+ private Map<String, Object> getBaseConsumerConfigs() {
+ Map<String, Object> props = this.originals();
+
+ // set consumer default property values
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ // remove properties that are not required for consumers
+ props.remove(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+ props.remove(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
+
+ return props;
+ }
+
+ public Map<String, Object> getProducerConfigs(String clientId) {
+ Map<String, Object> props = this.originals();
+
+ // set producer default property values
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
+
+ // remove properties that are not required for producers
+ props.remove(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ props.remove(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
+
+ props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-producer");
+
+ return props;
+ }
+
+ public Serializer keySerializer() {
+ return getConfiguredInstance(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Serializer valueSerializer() {
+ return getConfiguredInstance(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ }
+
+ public Deserializer keyDeserializer() {
+ return getConfiguredInstance(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public Deserializer valueDeserializer() {
+ return getConfiguredInstance(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ }
+
+ public static void main(String[] args) {
+ System.out.println(CONFIG.toHtmlTable());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
new file mode 100644
index 0000000..a151392
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamsMetrics {
+
+ Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+ void recordLatency(Sensor sensor, long startNs, long endNs);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
index 88a8955..a234395 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
@@ -20,11 +20,11 @@ package org.apache.kafka.streams.examples;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
@@ -34,14 +34,14 @@ public class KStreamJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-kstream");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
+ props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
@@ -78,7 +78,7 @@ public class KStreamJob {
streams[0].to("topic2");
streams[1].to("topic3");
- KafkaStreaming kstream = new KafkaStreaming(builder, config);
+ KafkaStreams kstream = new KafkaStreams(builder, config);
kstream.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
index 2d0b79f..e17c16b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
@@ -21,13 +21,13 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -70,11 +70,11 @@ public class ProcessorJob {
KeyValueIterator<String, Integer> iter = this.kvStore.all();
while (iter.hasNext()) {
- Entry<String, Integer> entry = iter.next();
+ KeyValue<String, Integer> entry = iter.next();
- System.out.println("[" + entry.key() + ", " + entry.value() + "]");
+ System.out.println("[" + entry.key + ", " + entry.value + "]");
- context.forward(entry.key(), entry.value());
+ context.forward(entry.key, entry.value);
}
iter.close();
@@ -90,15 +90,15 @@ public class ProcessorJob {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
- props.put(StreamingConfig.JOB_ID_CONFIG, "example-processor");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
+ props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
+ props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
+ StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = new TopologyBuilder();
@@ -109,7 +109,7 @@ public class ProcessorJob {
builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
- KafkaStreaming streaming = new KafkaStreaming(builder, config);
- streaming.start();
+ KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.start();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index dfed661..26f04f0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index 87298d1..feb28ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.KeyValue;
/**
* KTable is an abstraction of a change log stream.
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
- public final K key;
- public final V value;
-
- public KeyValue(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public static <K, V> KeyValue<K, V> pair(K key, V value) {
- return new KeyValue<>(key, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 91bfa9e..26002f5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
index daef8b1..ff7f9ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index ce89220..98e50c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
@@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
import org.apache.kafka.streams.state.Serdes;
@@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
@Override
public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) {
String name = topology.newName(SINK_NAME);
- StreamPartitioner<K, V> streamPartitioner = null;
+ StreamsPartitioner<K, V> streamsPartitioner = null;
if (keySerializer != null && keySerializer instanceof WindowedSerializer) {
WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer;
- streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer);
+ streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer);
}
- topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name);
+ topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 01e3325..a4ac9b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
index 57f1431..a40449b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.ProcessorSupplier;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index 7d6eb27..c484c7b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
index a9d8f97..4299c66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java
@@ -17,7 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 8ee557c..d046090 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Reducer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 12fcc17..499f721 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -18,7 +18,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
deleted file mode 100644
index 10e69cc..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.StreamPartitioner;
-
-public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> {
-
- private final WindowedSerializer<K> serializer;
-
- public WindowedStreamPartitioner(WindowedSerializer<K> serializer) {
- this.serializer = serializer;
- }
-
- /**
- * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value
- * and the current number of partitions. The partition number id determined by the original key of the windowed key
- * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
- *
- * @param windowedKey the key of the message
- * @param value the value of the message
- * @param numPartitions the total number of partitions
- * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
- */
- public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
- byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
-
- // hash the keyBytes to choose a partition
- return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
-
- private static int toPositive(int number) {
- return number & 0x7fffffff;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
new file mode 100644
index 0000000..ff1fa2c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StreamsPartitioner;
+
+public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> {
+
+ private final WindowedSerializer<K> serializer;
+
+ public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value
+ * and the current number of partitions. The partition number id determined by the original key of the windowed key
+ * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key.
+ *
+ * @param windowedKey the key of the message
+ * @param value the value of the message
+ * @param numPartitions the total number of partitions
+ * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used
+ */
+ public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) {
+ byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey);
+
+ // hash the keyBytes to choose a partition
+ return toPositive(Utils.murmur2(keyBytes)) % numPartitions;
+ }
+
+ private static int toPositive(int number) {
+ return number & 0x7fffffff;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index fa19ed7..41e2235 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.StreamsMetrics;
import java.io.File;
@@ -70,9 +70,9 @@ public interface ProcessorContext {
/**
* Returns Metrics instance
*
- * @return StreamingMetrics
+ * @return StreamsMetrics
*/
- StreamingMetrics metrics();
+ StreamsMetrics metrics();
/**
* Registers and possibly restores the specified storage engine.