You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:34 UTC
[8/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
MINOR: remove Kafka Streams in 0.9.0
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jun Rao
Closes #474 from guozhangwang/removeKStream
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b382d6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b382d6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b382d6f
Branch: refs/heads/0.9.0
Commit: 2b382d6f92ed1d6f414529ce8d41089110ed560a
Parents: c258868
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Nov 9 16:33:23 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Nov 9 16:33:23 2015 -0800
----------------------------------------------------------------------
bin/kafka-run-class.sh | 5 -
build.gradle | 59 +-
checkstyle/import-control.xml | 14 -
settings.gradle | 2 +-
.../apache/kafka/streams/KafkaStreaming.java | 156 -----
.../apache/kafka/streams/StreamingConfig.java | 269 ---------
.../apache/kafka/streams/StreamingMetrics.java | 27 -
.../kafka/streams/examples/KStreamJob.java | 84 ---
.../kafka/streams/examples/ProcessorJob.java | 115 ----
.../examples/WallclockTimestampExtractor.java | 28 -
.../apache/kafka/streams/kstream/KStream.java | 180 ------
.../kafka/streams/kstream/KStreamBuilder.java | 74 ---
.../kafka/streams/kstream/KStreamWindowed.java | 38 --
.../apache/kafka/streams/kstream/KeyValue.java | 34 --
.../kafka/streams/kstream/KeyValueMapper.java | 23 -
.../apache/kafka/streams/kstream/Predicate.java | 30 -
.../streams/kstream/SlidingWindowSupplier.java | 266 ---------
.../kafka/streams/kstream/Transformer.java | 57 --
.../streams/kstream/TransformerSupplier.java | 24 -
.../kafka/streams/kstream/ValueJoiner.java | 23 -
.../kafka/streams/kstream/ValueMapper.java | 23 -
.../kafka/streams/kstream/ValueTransformer.java | 56 --
.../kstream/ValueTransformerSupplier.java | 24 -
.../apache/kafka/streams/kstream/Window.java | 36 --
.../kafka/streams/kstream/WindowSupplier.java | 25 -
.../kstream/internals/FilteredIterator.java | 63 --
.../kstream/internals/KStreamBranch.java | 52 --
.../kstream/internals/KStreamFilter.java | 48 --
.../kstream/internals/KStreamFlatMap.java | 47 --
.../kstream/internals/KStreamFlatMapValues.java | 47 --
.../streams/kstream/internals/KStreamImpl.java | 227 -------
.../streams/kstream/internals/KStreamJoin.java | 93 ---
.../streams/kstream/internals/KStreamMap.java | 46 --
.../kstream/internals/KStreamMapValues.java | 45 --
.../kstream/internals/KStreamPassThrough.java | 37 --
.../kstream/internals/KStreamTransform.java | 71 ---
.../internals/KStreamTransformValues.java | 69 ---
.../kstream/internals/KStreamWindow.java | 68 ---
.../kstream/internals/KStreamWindowedImpl.java | 67 ---
.../kstream/internals/WindowSupport.java | 159 -----
.../streams/processor/AbstractProcessor.java | 71 ---
.../processor/DefaultPartitionGrouper.java | 73 ---
.../streams/processor/PartitionGrouper.java | 53 --
.../kafka/streams/processor/Processor.java | 59 --
.../streams/processor/ProcessorContext.java | 101 ----
.../streams/processor/ProcessorSupplier.java | 23 -
.../streams/processor/StateRestoreCallback.java | 27 -
.../kafka/streams/processor/StateStore.java | 57 --
.../streams/processor/StateStoreSupplier.java | 25 -
.../apache/kafka/streams/processor/TaskId.java | 66 ---
.../streams/processor/TimestampExtractor.java | 34 --
.../streams/processor/TopologyBuilder.java | 498 ----------------
.../streams/processor/TopologyException.java | 38 --
.../KafkaStreamingPartitionAssignor.java | 135 -----
.../internals/MinTimestampTracker.java | 67 ---
.../processor/internals/PartitionGroup.java | 170 ------
.../internals/ProcessorContextImpl.java | 192 ------
.../processor/internals/ProcessorNode.java | 74 ---
.../internals/ProcessorStateManager.java | 231 --------
.../processor/internals/ProcessorTopology.java | 62 --
.../processor/internals/PunctuationQueue.java | 56 --
.../internals/PunctuationSchedule.java | 41 --
.../streams/processor/internals/Punctuator.java | 24 -
.../streams/processor/internals/QuickUnion.java | 67 ---
.../processor/internals/RecordCollector.java | 91 ---
.../processor/internals/RecordQueue.java | 140 -----
.../streams/processor/internals/SinkNode.java | 64 --
.../streams/processor/internals/SourceNode.java | 64 --
.../streams/processor/internals/Stamped.java | 38 --
.../processor/internals/StampedRecord.java | 52 --
.../streams/processor/internals/StreamTask.java | 378 ------------
.../processor/internals/StreamThread.java | 586 -------------------
.../processor/internals/TimestampTracker.java | 58 --
.../org/apache/kafka/streams/state/Entry.java | 42 --
.../state/InMemoryKeyValueStoreSupplier.java | 155 -----
.../state/InMemoryLRUCacheStoreSupplier.java | 195 ------
.../kafka/streams/state/KeyValueIterator.java | 29 -
.../kafka/streams/state/KeyValueStore.java | 86 ---
.../streams/state/MeteredKeyValueStore.java | 278 ---------
.../kafka/streams/state/OffsetCheckpoint.java | 172 ------
.../state/RocksDBKeyValueStoreSupplier.java | 298 ----------
.../org/apache/kafka/streams/state/Serdes.java | 161 -----
.../org/apache/kafka/streams/state/Stores.java | 278 ---------
.../streams/kstream/KStreamBuilderTest.java | 51 --
.../kstream/internals/FilteredIteratorTest.java | 94 ---
.../kstream/internals/KStreamBranchTest.java | 90 ---
.../kstream/internals/KStreamFilterTest.java | 85 ---
.../kstream/internals/KStreamFlatMapTest.java | 80 ---
.../internals/KStreamFlatMapValuesTest.java | 77 ---
.../kstream/internals/KStreamImplTest.java | 137 -----
.../kstream/internals/KStreamJoinTest.java | 195 ------
.../kstream/internals/KStreamMapTest.java | 73 ---
.../kstream/internals/KStreamMapValuesTest.java | 71 ---
.../kstream/internals/KStreamTransformTest.java | 93 ---
.../internals/KStreamTransformValuesTest.java | 92 ---
.../kstream/internals/KStreamWindowedTest.java | 91 ---
.../processor/DefaultPartitionGrouperTest.java | 84 ---
.../streams/processor/TopologyBuilderTest.java | 254 --------
.../internals/MinTimestampTrackerTest.java | 93 ---
.../processor/internals/PartitionGroupTest.java | 102 ----
.../internals/ProcessorStateManagerTest.java | 407 -------------
.../internals/ProcessorTopologyTest.java | 311 ----------
.../internals/PunctuationQueueTest.java | 85 ---
.../processor/internals/QuickUnionTest.java | 97 ---
.../processor/internals/RecordQueueTest.java | 116 ----
.../processor/internals/StreamTaskTest.java | 211 -------
.../processor/internals/StreamThreadTest.java | 464 ---------------
.../state/AbstractKeyValueStoreTest.java | 193 ------
.../state/InMemoryKeyValueStoreTest.java | 50 --
.../state/InMemoryLRUCacheStoreTest.java | 155 -----
.../streams/state/KeyValueStoreTestDriver.java | 464 ---------------
.../streams/state/RocksDBKeyValueStoreTest.java | 52 --
.../apache/kafka/streams/state/StateUtils.java | 77 ---
.../apache/kafka/test/KStreamTestDriver.java | 95 ---
.../apache/kafka/test/MockProcessorContext.java | 173 ------
.../kafka/test/MockProcessorSupplier.java | 59 --
.../org/apache/kafka/test/MockSourceNode.java | 46 --
.../kafka/test/MockStateStoreSupplier.java | 97 ---
.../kafka/test/MockTimestampExtractor.java | 30 -
.../kafka/test/ProcessorTopologyTestDriver.java | 338 -----------
.../apache/kafka/test/UnlimitedWindowDef.java | 104 ----
121 files changed, 2 insertions(+), 13604 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index b18a9cf..88d575b 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -57,11 +57,6 @@ do
CLASSPATH=$CLASSPATH:$file
done
-for file in $base_dir/stream/build/libs/kafka-streams*.jar;
-do
- CLASSPATH=$CLASSPATH:$file
-done
-
for file in $base_dir/tools/build/libs/kafka-tools*.jar;
do
CLASSPATH=$CLASSPATH:$file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 4f5e482..fc7dd79 100644
--- a/build.gradle
+++ b/build.gradle
@@ -229,7 +229,7 @@ for ( sv in ['2_10_5', '2_11_7'] ) {
}
def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
-def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs
+def pkgs = ['clients', 'examples', 'log4j-appender', 'tools'] + connectPkgs
tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { }
@@ -543,63 +543,6 @@ project(':tools') {
test.dependsOn('checkstyleMain', 'checkstyleTest')
}
-project(':streams') {
- apply plugin: 'checkstyle'
- archivesBaseName = "kafka-streams"
-
- dependencies {
- compile project(':clients')
- compile "$slf4jlog4j"
- compile 'org.rocksdb:rocksdbjni:3.10.1'
-
- testCompile "$junit"
- testCompile project(path: ':clients', configuration: 'archives')
- }
-
- task testJar(type: Jar) {
- classifier = 'test'
- from sourceSets.test.output
- }
-
- test {
- testLogging {
- events "passed", "skipped", "failed"
- exceptionFormat = 'full'
- }
- }
-
- javadoc {
- include "**/org/apache/kafka/streams/*"
- }
-
- tasks.create(name: "copyDependantLibs", type: Copy) {
- from (configurations.testRuntime) {
- include('slf4j-log4j12*')
- }
- from (configurations.runtime) {
- exclude('kafka-clients*')
- }
- into "$buildDir/dependant-libs-${scalaVersion}"
- }
-
- jar {
- dependsOn 'copyDependantLibs'
- }
-
- artifacts {
- archives testJar
- }
-
- configurations {
- archives.extendsFrom (testCompile)
- }
-
- checkstyle {
- configFile = new File(rootDir, "checkstyle/checkstyle.xml")
- }
- test.dependsOn('checkstyleMain', 'checkstyleTest')
-}
-
project(':log4j-appender') {
apply plugin: 'checkstyle'
archivesBaseName = "kafka-log4j-appender"
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 95ea3b7..f3a1c38 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -115,20 +115,6 @@
<allow pkg="org.apache.log4j" />
</subpackage>
- <subpackage name="streams">
- <allow pkg="org.apache.kafka.common"/>
- <allow pkg="org.apache.kafka.test"/>
- <allow pkg="org.apache.kafka.clients"/>
- <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/>
- <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/>
-
- <allow pkg="org.apache.kafka.streams"/>
-
- <subpackage name="state">
- <allow pkg="org.rocksdb" />
- </subpackage>
- </subpackage>
-
<subpackage name="log4jappender">
<allow pkg="org.apache.log4j" />
<allow pkg="org.apache.kafka.clients" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 3d69fac..df602ef 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -14,5 +14,5 @@
// limitations under the License.
apply from: file('scala.gradle')
-include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender',
+include 'core', 'examples', 'clients', 'tools', 'log4j-appender',
'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
deleted file mode 100644
index d274fb9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.JmxReporter;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
- * sends output to zero or more output topics.
- * <p>
- * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify
- * the transformation.
- * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and
- * start one or more of these processors to process the Kafka partitions assigned to this particular instance.
- * <p>
- * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes
- * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being
- * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or
- * started in the appropriate processes to balance processing load.
- * <p>
- * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer}
- * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output.
- * <p>
- * A simple example might look like this:
- * <pre>
- * Map<String, Object> props = new HashMap<>();
- * props.put("bootstrap.servers", "localhost:4242");
- * props.put("key.deserializer", StringDeserializer.class);
- * props.put("value.deserializer", StringDeserializer.class);
- * props.put("key.serializer", StringSerializer.class);
- * props.put("value.serializer", IntegerSerializer.class);
- * props.put("timestamp.extractor", MyTimestampExtractor.class);
- * StreamingConfig config = new StreamingConfig(props);
- *
- * KStreamBuilder builder = new KStreamBuilder();
- * builder.from("topic1").mapValue(value -> value.length()).to("topic2");
- *
- * KafkaStreaming streaming = new KafkaStreaming(builder, config);
- * streaming.start();
- * </pre>
- *
- */
-public class KafkaStreaming {
-
- private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
- private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
- private static final String JMX_PREFIX = "kafka.streaming";
-
- private final Time time;
-
- // container states
- private static final int CREATED = 0;
- private static final int RUNNING = 1;
- private static final int STOPPED = 2;
- private int state = CREATED;
-
- private final StreamThread[] threads;
-
- private String clientId;
- private final Metrics metrics;
-
- public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
- // create the metrics
- this.time = new SystemTime();
-
- MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
- .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
- TimeUnit.MILLISECONDS);
- clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
- if (clientId.length() <= 0)
- clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
- List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
- MetricsReporter.class);
- reporters.add(new JmxReporter(JMX_PREFIX));
- this.metrics = new Metrics(metricConfig, reporters, time);
-
- this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
- for (int i = 0; i < this.threads.length; i++) {
- this.threads[i] = new StreamThread(builder, config, this.clientId, this.metrics, this.time);
- }
- }
-
- /**
- * Start the stream process by starting all its threads
- */
- public synchronized void start() {
- log.debug("Starting Kafka Stream process");
-
- if (state == CREATED) {
- for (StreamThread thread : threads)
- thread.start();
-
- state = RUNNING;
-
- log.info("Started Kafka Stream process");
- } else {
- throw new IllegalStateException("This process was already started.");
- }
- }
-
- /**
- * Shutdown this stream process by signaling the threads to stop,
- * wait for them to join and clean up the process instance.
- */
- public synchronized void close() {
- log.debug("Stopping Kafka Stream process");
-
- if (state == RUNNING) {
- // signal the threads to stop and wait
- for (StreamThread thread : threads)
- thread.close();
-
- for (StreamThread thread : threads) {
- try {
- thread.join();
- } catch (InterruptedException ex) {
- Thread.interrupted();
- }
- }
-
- state = STOPPED;
-
- log.info("Stopped Kafka Stream process");
- } else {
- throw new IllegalStateException("This process has not started yet.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
deleted file mode 100644
index 88bd844..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor;
-
-import java.util.Map;
-
-import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
-
-public class StreamingConfig extends AbstractConfig {
-
- private static final ConfigDef CONFIG;
-
- /** <code>state.dir</code> */
- public static final String STATE_DIR_CONFIG = "state.dir";
- private static final String STATE_DIR_DOC = "Directory location for state store.";
-
- /** <code>commit.interval.ms</code> */
- public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms";
- private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor.";
-
- /** <code>poll.ms</code> */
- public static final String POLL_MS_CONFIG = "poll.ms";
- private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input.";
-
- /** <code>num.stream.threads</code> */
- public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads";
- private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing.";
-
- /** <code>buffered.records.per.partition</code> */
- public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition";
- private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition.";
-
- /** <code>state.cleanup.delay</code> */
- public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms";
- private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated.";
-
- /** <code>total.records.to.process</code> */
- public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process";
- private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records.";
-
- /** <code>window.time.ms</code> */
- public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms";
- private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called "
- + "with this frequency even if there is no message.";
-
- /** <code>timestamp.extractor</code> */
- public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
- private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface.";
-
- /** <code>partition.grouper</code> */
- public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
- private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface.";
-
- /** <code>client.id</code> */
- public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
- /** <code>key.serializer</code> */
- public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
-
- /** <code>value.serializer</code> */
- public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
-
- /** <code>key.deserializer</code> */
- public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>value.deserializer</code> */
- public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
- /** <code>metrics.sample.window.ms</code> */
- public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
- /** <code>metrics.num.samples</code> */
- public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
- /** <code>metric.reporters</code> */
- public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
- /**
- * <code>bootstrap.servers</code>
- */
- public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
- private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir");
-
- static {
- CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG,
- Type.STRING,
- "",
- Importance.MEDIUM,
- CommonClientConfigs.CLIENT_ID_DOC)
- .define(STATE_DIR_CONFIG,
- Type.STRING,
- SYSTEM_TEMP_DIRECTORY,
- Importance.MEDIUM,
- STATE_DIR_DOC)
- .define(COMMIT_INTERVAL_MS_CONFIG,
- Type.LONG,
- 30000,
- Importance.HIGH,
- COMMIT_INTERVAL_MS_DOC)
- .define(POLL_MS_CONFIG,
- Type.LONG,
- 100,
- Importance.LOW,
- POLL_MS_DOC)
- .define(NUM_STREAM_THREADS_CONFIG,
- Type.INT,
- 1,
- Importance.LOW,
- NUM_STREAM_THREADS_DOC)
- .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
- Type.INT,
- 1000,
- Importance.LOW,
- BUFFERED_RECORDS_PER_PARTITION_DOC)
- .define(STATE_CLEANUP_DELAY_MS_CONFIG,
- Type.LONG,
- 60000,
- Importance.LOW,
- STATE_CLEANUP_DELAY_MS_DOC)
- .define(TOTAL_RECORDS_TO_PROCESS,
- Type.LONG,
- -1L,
- Importance.LOW,
- TOTAL_RECORDS_TO_DOC)
- .define(WINDOW_TIME_MS_CONFIG,
- Type.LONG,
- -1L,
- Importance.MEDIUM,
- WINDOW_TIME_MS_DOC)
- .define(KEY_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.KEY_SERIALIZER_CLASS_DOC)
- .define(VALUE_SERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ProducerConfig.VALUE_SERIALIZER_CLASS_DOC)
- .define(KEY_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC)
- .define(VALUE_DESERIALIZER_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
- .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
- Type.CLASS,
- Importance.HIGH,
- TIMESTAMP_EXTRACTOR_CLASS_DOC)
- .define(PARTITION_GROUPER_CLASS_CONFIG,
- Type.CLASS,
- DefaultPartitionGrouper.class,
- Importance.HIGH,
- PARTITION_GROUPER_CLASS_DOC)
- .define(BOOTSTRAP_SERVERS_CONFIG,
- Type.STRING,
- Importance.HIGH,
- CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG,
- Type.LIST,
- "",
- Importance.LOW,
- CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
- .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
- Type.LONG,
- 30000,
- atLeast(0),
- Importance.LOW,
- CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
- .define(METRICS_NUM_SAMPLES_CONFIG,
- Type.INT,
- 2,
- atLeast(1),
- Importance.LOW,
- CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
- }
-
- public static class InternalConfig {
- public static final String PARTITION_GROUPER_INSTANCE = "__partition.grouper.instance__";
- }
-
- public StreamingConfig(Map<?, ?> props) {
- super(CONFIG, props);
- }
-
- public Map<String, Object> getConsumerConfigs(PartitionGrouper partitionGrouper) {
- Map<String, Object> props = getConsumerConfigs();
- props.put(StreamingConfig.InternalConfig.PARTITION_GROUPER_INSTANCE, partitionGrouper);
- props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
- return props;
- }
-
- public Map<String, Object> getConsumerConfigs() {
- Map<String, Object> props = this.originals();
-
- // set consumer default property values
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- // remove properties that are not required for consumers
- props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
- return props;
- }
-
- public Map<String, Object> getProducerConfigs() {
- Map<String, Object> props = this.originals();
-
- // set producer default property values
- props.put(ProducerConfig.LINGER_MS_CONFIG, "100");
-
- // remove properties that are not required for producers
- props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG);
-
- return props;
- }
-
- public Serializer keySerializer() {
- return getConfiguredInstance(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Serializer valueSerializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
- }
-
- public Deserializer keyDeserializer() {
- return getConfiguredInstance(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public Deserializer valueDeserializer() {
- return getConfiguredInstance(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
- }
-
- public static void main(String[] args) {
- System.out.println(CONFIG.toHtmlTable());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
deleted file mode 100644
index ebf80b3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.metrics.Sensor;
-
-public interface StreamingMetrics {
-
- Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
-
- void recordLatency(Sensor sensor, long startNs, long endNs);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
deleted file mode 100644
index 87368c1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-
-import java.util.Properties;
-
-public class KStreamJob {
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
-
- KStreamBuilder builder = new KStreamBuilder();
-
- KStream<String, String> stream1 = builder.from("topic1");
-
- KStream<String, Integer> stream2 =
- stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() {
- @Override
- public KeyValue<String, Integer> apply(String key, String value) {
- return new KeyValue<>(key, new Integer(value));
- }
- }).filter(new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
- }
- });
-
- KStream<String, Integer>[] streams = stream2.branch(
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return (value % 2) == 0;
- }
- },
- new Predicate<String, Integer>() {
- @Override
- public boolean test(String key, Integer value) {
- return true;
- }
- }
- );
-
- streams[0].to("topic2");
- streams[1].to("topic3");
-
- KafkaStreaming kstream = new KafkaStreaming(builder, config);
- kstream.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
deleted file mode 100644
index 3274aae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreaming;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.apache.kafka.streams.state.Entry;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-
-import java.util.Properties;
-
-public class ProcessorJob {
-
- private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
-
- @Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
- private ProcessorContext context;
- private KeyValueStore<String, Integer> kvStore;
-
- @Override
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
- this.context = context;
- this.context.schedule(1000);
- this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("local-state");
- }
-
- @Override
- public void process(String key, String value) {
- Integer oldValue = this.kvStore.get(key);
- Integer newValue = Integer.parseInt(value);
- if (oldValue == null) {
- this.kvStore.put(key, newValue);
- } else {
- this.kvStore.put(key, oldValue + newValue);
- }
-
- context.commit();
- }
-
- @Override
- public void punctuate(long timestamp) {
- KeyValueIterator<String, Integer> iter = this.kvStore.all();
-
- while (iter.hasNext()) {
- Entry<String, Integer> entry = iter.next();
-
- System.out.println("[" + entry.key() + ", " + entry.value() + "]");
-
- context.forward(entry.key(), entry.value());
- }
-
- iter.close();
- }
-
- @Override
- public void close() {
- this.kvStore.close();
- }
- };
- }
- }
-
- public static void main(String[] args) throws Exception {
- Properties props = new Properties();
- props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
- props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
- StreamingConfig config = new StreamingConfig(props);
-
- TopologyBuilder builder = new TopologyBuilder();
-
- builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source");
-
- builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE");
- builder.addStateStore(Stores.create("local-state", config).withStringKeys().withIntegerValues().inMemory().build());
- builder.connectProcessorAndStateStores("local-state", "PROCESS");
-
- builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS");
-
- KafkaStreaming streaming = new KafkaStreaming(builder, config);
- streaming.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
deleted file mode 100644
index 26281d6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.examples;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-
-public class WallclockTimestampExtractor implements TimestampExtractor {
- @Override
- public long extract(ConsumerRecord<Object, Object> record) {
- return System.currentTimeMillis();
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
deleted file mode 100644
index 8f0794c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
-
-/**
- * KStream is an abstraction of a stream of key-value pairs.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- */
-public interface KStream<K, V> {
-
- /**
- * Creates a new stream consists of all elements of this stream which satisfy a predicate
- *
- * @param predicate the instance of Predicate
- * @return the stream with only those elements that satisfy the predicate
- */
- KStream<K, V> filter(Predicate<K, V> predicate);
-
- /**
- * Creates a new stream consists all elements of this stream which do not satisfy a predicate
- *
- * @param predicate the instance of Predicate
- * @return the stream with only those elements that do not satisfy the predicate
- */
- KStream<K, V> filterOut(Predicate<K, V> predicate);
-
- /**
- * Creates a new stream by applying transforming each element in this stream into a different element in the new stream.
- *
- * @param mapper the instance of KeyValueMapper
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return the mapped stream
- */
- <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper);
-
- /**
- * Creates a new stream by applying transforming each value in this stream into a different value in the new stream.
- *
- * @param mapper the instance of ValueMapper
- * @param <V1> the value type of the new stream
- * @return the mapped stream
- */
- <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper);
-
- /**
- * Creates a new stream by applying transforming each element in this stream into zero or more elements in the new stream.
- *
- * @param mapper the instance of KeyValueMapper
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return the mapped stream
- */
- <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper);
-
- /**
- * Creates a new stream by applying transforming each value in this stream into zero or more values in the new stream.
- *
- * @param processor the instance of Processor
- * @param <V1> the value type of the new stream
- * @return the mapped stream
- */
- <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor);
-
- /**
- * Creates a new windowed stream using a specified window instance.
- *
- * @param windowDef the instance of Window
- * @return the windowed stream
- */
- KStreamWindowed<K, V> with(WindowSupplier<K, V> windowDef);
-
- /**
- * Creates an array of streams from this stream. Each stream in the array corresponds to a predicate in
- * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to
- * a corresponding stream for the first predicate is evaluated true.
- * An element will be dropped if none of the predicates evaluate true.
- *
- * @param predicates the ordered list of Predicate instances
- * @return the new streams that each contain those elements for which their Predicate evaluated to true.
- */
- KStream<K, V>[] branch(Predicate<K, V>... predicates);
-
- /**
- * Sends key-value to a topic, also creates a new stream from the topic.
- * This is equivalent to calling to(topic) and from(topic).
- *
- * @param topic the topic name
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return the new stream that consumes the given topic
- */
- <K1, V1> KStream<K1, V1> through(String topic);
-
- /**
- * Sends key-value to a topic, also creates a new stream from the topic.
- * This is equivalent to calling to(topic) and from(topic).
- *
- * @param topic the topic name
- * @param keySerializer key serializer used to send key-value pairs,
- * if not specified the default key serializer defined in the configuration will be used
- * @param valSerializer value serializer used to send key-value pairs,
- * if not specified the default value serializer defined in the configuration will be used
- * @param keyDeserializer key deserializer used to create the new KStream,
- * if not specified the default key deserializer defined in the configuration will be used
- * @param valDeserializer value deserializer used to create the new KStream,
- * if not specified the default value deserializer defined in the configuration will be used
- * @param <K1> the key type of the new stream
- * @param <V1> the value type of the new stream
- * @return the new stream that consumes the given topic
- */
- <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer);
-
- /**
- * Sends key-value to a topic using default serializers specified in the config.
- *
- * @param topic the topic name
- */
- void to(String topic);
-
- /**
- * Sends key-value to a topic.
- *
- * @param topic the topic name
- * @param keySerializer key serializer used to send key-value pairs,
- * if not specified the default serializer defined in the configs will be used
- * @param valSerializer value serializer used to send key-value pairs,
- * if not specified the default serializer defined in the configs will be used
- */
- void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer);
-
- /**
- * Applies a stateful transformation to all elements in this stream.
- *
- * @param transformerSupplier the class of TransformerDef
- * @param stateStoreNames the names of the state store used by the processor
- * @return KStream
- */
- <K1, V1> KStream<K1, V1> transform(TransformerSupplier<K, V, KeyValue<K1, V1>> transformerSupplier, String... stateStoreNames);
-
- /**
- * Applies a stateful transformation to all values in this stream.
- *
- * @param valueTransformerSupplier the class of TransformerDef
- * @param stateStoreNames the names of the state store used by the processor
- * @return KStream
- */
- <R> KStream<K, R> transformValues(ValueTransformerSupplier<V, R> valueTransformerSupplier, String... stateStoreNames);
-
- /**
- * Processes all elements in this stream by applying a processor.
- *
- * @param processorSupplier the supplier of the Processor to use
- * @param stateStoreNames the names of the state store used by the processor
- * @return the new stream containing the processed output
- */
- void process(ProcessorSupplier<K, V> processorSupplier, String... stateStoreNames);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
deleted file mode 100644
index c8a8bd3..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-
-import java.util.Collections;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * KStreamBuilder is the class to create KStream instances.
- */
-public class KStreamBuilder extends TopologyBuilder {
-
- private final AtomicInteger index = new AtomicInteger(0);
-
- public KStreamBuilder() {
- super();
- }
-
- /**
- * Creates a KStream instance for the specified topic.
- * The default deserializers specified in the config are used.
- *
- * @param topics the topic names, if empty default to all the topics in the config
- * @return KStream
- */
- public <K, V> KStream<K, V> from(String... topics) {
- String name = newName(KStreamImpl.SOURCE_NAME);
-
- addSource(name, topics);
-
- return new KStreamImpl<>(this, name, Collections.singleton(name));
- }
-
- /**
- * Creates a KStream instance for the specified topic.
- *
- * @param keyDeserializer key deserializer used to read this source KStream,
- * if not specified the default deserializer defined in the configs will be used
- * @param valDeserializer value deserializer used to read this source KStream,
- * if not specified the default deserializer defined in the configs will be used
- * @param topics the topic names, if empty default to all the topics in the config
- * @return KStream
- */
- public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) {
- String name = newName(KStreamImpl.SOURCE_NAME);
-
- addSource(name, keyDeserializer, valDeserializer, topics);
-
- return new KStreamImpl<>(this, name, Collections.singleton(name));
- }
-
- public String newName(String prefix) {
- return prefix + String.format("%010d", index.getAndIncrement());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
deleted file mode 100644
index 4d73128..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * KStreamWindowed is an abstraction of a stream of key-value pairs with a window.
- */
-public interface KStreamWindowed<K, V> extends KStream<K, V> {
-
- /**
- * Creates a new stream by joining this windowed stream with the other windowed stream.
- * Each element arrived from either of the streams is joined with elements in a window of each other.
- * The resulting values are computed by applying a joiner.
- *
- * @param other the other windowed stream
- * @param joiner ValueJoiner
- * @param <V1> the value type of the other stream
- * @param <V2> the value type of the new stream
- * @return KStream
- */
- <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
deleted file mode 100644
index f633f6e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public class KeyValue<K, V> {
-
- public final K key;
- public final V value;
-
- public KeyValue(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public static <K, V> KeyValue<K, V> pair(K key, V value) {
- return new KeyValue<>(key, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
deleted file mode 100644
index 62b07f6..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface KeyValueMapper<K, V, R> {
-
- R apply(K key, V value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
deleted file mode 100644
index c73622e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-/**
- * Represents a predicate (boolean-valued function) of two arguments.
- *
- * @param <K> the type of key
- * @param <V> the type of value
- */
-public interface Predicate<K, V> {
-
- boolean test(K key, V value);
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
deleted file mode 100644
index 0cf969f..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowSupplier.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.kstream.internals.WindowSupport;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-
-public class SlidingWindowSupplier<K, V> implements WindowSupplier<K, V> {
- private final String name;
- private final long duration;
- private final int maxCount;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
- private final Deserializer<K> keyDeserializer;
- private final Deserializer<V> valueDeserializer;
-
- public SlidingWindowSupplier(
- String name,
- long duration,
- int maxCount,
- Serializer<K> keySerializer,
- Serializer<V> valueSerializer,
- Deserializer<K> keyDeseriaizer,
- Deserializer<V> valueDeserializer) {
- this.name = name;
- this.duration = duration;
- this.maxCount = maxCount;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
- this.keyDeserializer = keyDeseriaizer;
- this.valueDeserializer = valueDeserializer;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public Window<K, V> get() {
- return new SlidingWindow();
- }
-
- public class SlidingWindow extends WindowSupport implements Window<K, V> {
- private final Object lock = new Object();
- private ProcessorContext context;
- private int partition;
- private int slotNum; // used as a key for Kafka log compaction
- private LinkedList<K> list = new LinkedList<K>();
- private HashMap<K, ValueList<V>> map = new HashMap<>();
-
- @Override
- public void init(ProcessorContext context) {
- this.context = context;
- this.partition = context.id().partition;
- SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
- context.register(this, restoreFunc);
-
- for (ValueList<V> valueList : map.values()) {
- valueList.clearDirtyValues();
- }
- this.slotNum = restoreFunc.slotNum;
- }
-
- @Override
- public Iterator<V> findAfter(K key, final long timestamp) {
- return find(key, timestamp, timestamp + duration);
- }
-
- @Override
- public Iterator<V> findBefore(K key, final long timestamp) {
- return find(key, timestamp - duration, timestamp);
- }
-
- @Override
- public Iterator<V> find(K key, final long timestamp) {
- return find(key, timestamp - duration, timestamp + duration);
- }
-
- /*
- * finds items in the window between startTime and endTime (both inclusive)
- */
- private Iterator<V> find(K key, final long startTime, final long endTime) {
- final ValueList<V> values = map.get(key);
-
- if (values == null) {
- return Collections.emptyIterator();
- } else {
- return new FilteredIterator<V, Value<V>>(values.iterator()) {
- @Override
- protected V filter(Value<V> item) {
- if (startTime <= item.timestamp && item.timestamp <= endTime)
- return item.value;
- else
- return null;
- }
- };
- }
- }
-
- @Override
- public void put(K key, V value, long timestamp) {
- synchronized (lock) {
- slotNum++;
-
- list.offerLast(key);
-
- ValueList<V> values = map.get(key);
- if (values == null) {
- values = new ValueList<>();
- map.put(key, values);
- }
-
- values.add(slotNum, value, timestamp);
- }
- evictExcess();
- evictExpired(timestamp - duration);
- }
-
- private void evictExcess() {
- while (list.size() > maxCount) {
- K oldestKey = list.pollFirst();
-
- ValueList<V> values = map.get(oldestKey);
- values.removeFirst();
-
- if (values.isEmpty()) map.remove(oldestKey);
- }
- }
-
- private void evictExpired(long cutoffTime) {
- while (true) {
- K oldestKey = list.peekFirst();
-
- ValueList<V> values = map.get(oldestKey);
- Stamped<V> oldestValue = values.first();
-
- if (oldestValue.timestamp < cutoffTime) {
- list.pollFirst();
- values.removeFirst();
-
- if (values.isEmpty()) map.remove(oldestKey);
- } else {
- break;
- }
- }
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public void flush() {
- IntegerSerializer intSerializer = new IntegerSerializer();
- ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
-
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-
- for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) {
- ValueList<V> values = entry.getValue();
- if (values.hasDirtyValues()) {
- K key = entry.getKey();
-
- byte[] keyBytes = keySerializer.serialize(name, key);
-
- Iterator<Value<V>> iterator = values.dirtyValueIterator();
- while (iterator.hasNext()) {
- Value<V> dirtyValue = iterator.next();
- byte[] slot = intSerializer.serialize("", dirtyValue.slotNum);
- byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value);
-
- byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length];
-
- int offset = 0;
- offset += putLong(combined, offset, dirtyValue.timestamp);
- offset += puts(combined, offset, keyBytes);
- offset += puts(combined, offset, valBytes);
-
- if (offset != combined.length)
- throw new IllegalStateException("serialized length does not match");
-
- collector.send(new ProducerRecord<>(name, partition, slot, combined), byteArraySerializer, byteArraySerializer);
- }
- values.clearDirtyValues();
- }
- }
- }
-
- @Override
- public void close() {
- // TODO
- }
-
- @Override
- public boolean persistent() {
- // TODO: should not be persistent, right?
- return false;
- }
-
- private class SlidingWindowRegistryCallback implements StateRestoreCallback {
-
- final IntegerDeserializer intDeserializer;
- int slotNum = 0;
-
- SlidingWindowRegistryCallback() {
- intDeserializer = new IntegerDeserializer();
- }
-
- @Override
- public void restore(byte[] slot, byte[] bytes) {
-
- slotNum = intDeserializer.deserialize("", slot);
-
- int offset = 0;
- // timestamp
- long timestamp = getLong(bytes, offset);
- offset += 8;
- // key
- int length = getInt(bytes, offset);
- offset += 4;
- K key = deserialize(bytes, offset, length, name, keyDeserializer);
- offset += length;
- // value
- length = getInt(bytes, offset);
- offset += 4;
- V value = deserialize(bytes, offset, length, name, valueDeserializer);
-
- put(key, value, timestamp);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
deleted file mode 100644
index b67f619..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public interface Transformer<K, V, R> {
-
- /**
- * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
- * that contains it is initialized.
- * <p>
- * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
- * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
- *
- * @param context the context; may not be null
- */
- void init(ProcessorContext context);
-
- /**
- * Transform the message with the given key and value.
- *
- * @param key the key for the message
- * @param value the value for the message
- * @return new value
- */
- R transform(K key, V value);
-
- /**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- *
- * @param timestamp the stream time when this method is being called
- */
- void punctuate(long timestamp);
-
- /**
- * Close this processor and clean up any resources.
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
deleted file mode 100644
index 2c2d8dd..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/TransformerSupplier.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface TransformerSupplier<K, V, R> {
-
- Transformer<K, V, R> get();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
deleted file mode 100644
index 93fc359..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueJoiner<V1, V2, R> {
-
- R apply(V1 value1, V2 value2);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
deleted file mode 100644
index a32423d..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueMapper<V1, V2> {
-
- V2 apply(V1 value);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
deleted file mode 100644
index 5b9e2ff..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-public interface ValueTransformer<V, R> {
-
- /**
- * Initialize this transformer with the given context. The framework ensures this is called once per processor when the topology
- * that contains it is initialized.
- * <p>
- * If this tranformer is to be {@link #punctuate(long) called periodically} by the framework, then this method should
- * {@link ProcessorContext#schedule(long) schedule itself} with the provided context.
- *
- * @param context the context; may not be null
- */
- void init(ProcessorContext context);
-
- /**
- * Transform the message with the given key and value.
- *
- * @param value the value for the message
- * @return new value
- */
- R transform(V value);
-
- /**
- * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context
- * during {@link #init(ProcessorContext) initialization}.
- *
- * @param timestamp the stream time when this method is being called
- */
- void punctuate(long timestamp);
-
- /**
- * Close this processor and clean up any resources.
- */
- void close();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
deleted file mode 100644
index 5c053c7..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream;
-
-public interface ValueTransformerSupplier<V, R> {
-
- ValueTransformer<V, R> get();
-
-}