You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 01:27:31 UTC
[5/8] kafka git commit: MINOR: remove Kafka Streams in 0.9.0
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
deleted file mode 100644
index febd938..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StampedRecord.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-public class StampedRecord extends Stamped<ConsumerRecord<Object, Object>> {
-
- public StampedRecord(ConsumerRecord<Object, Object> record, long timestamp) {
- super(record, timestamp);
- }
-
- public String topic() {
- return value.topic();
- }
-
- public int partition() {
- return value.partition();
- }
-
- public Object key() {
- return value.key();
- }
-
- public Object value() {
- return value.value();
- }
-
- public long offset() {
- return value.offset();
- }
-
- @Override
- public String toString() {
- return value.toString() + ", timestamp = " + timestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
deleted file mode 100644
index a9c14e5..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TimestampExtractor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A StreamTask is associated with a {@link PartitionGroup}, and is assigned to a StreamThread for processing.
- */
-public class StreamTask implements Punctuator {
-
- private static final Logger log = LoggerFactory.getLogger(StreamTask.class);
-
- private final TaskId id;
- private final int maxBufferedSize;
-
- private final Consumer consumer;
- private final PartitionGroup partitionGroup;
- private final PartitionGroup.RecordInfo recordInfo = new PartitionGroup.RecordInfo();
- private final PunctuationQueue punctuationQueue;
- private final ProcessorContextImpl processorContext;
- private final ProcessorTopology topology;
-
- private final Map<TopicPartition, Long> consumedOffsets;
- private final RecordCollector recordCollector;
- private final ProcessorStateManager stateMgr;
-
- private boolean commitRequested = false;
- private boolean commitOffsetNeeded = false;
- private StampedRecord currRecord = null;
- private ProcessorNode currNode = null;
-
- private boolean requiresPoll = true;
-
- /**
- * Create {@link StreamTask} with its assigned partitions
- *
- * @param id the ID of this task
- * @param consumer the instance of {@link Consumer}
- * @param producer the instance of {@link Producer}
- * @param restoreConsumer the instance of {@link Consumer} used when restoring state
- * @param partitions the collection of assigned {@link TopicPartition}
- * @param topology the instance of {@link ProcessorTopology}
- * @param config the {@link StreamingConfig} specified by the user
- * @param metrics the {@link StreamingMetrics} created by the thread
- */
- public StreamTask(TaskId id,
- Consumer<byte[], byte[]> consumer,
- Producer<byte[], byte[]> producer,
- Consumer<byte[], byte[]> restoreConsumer,
- Collection<TopicPartition> partitions,
- ProcessorTopology topology,
- StreamingConfig config,
- StreamingMetrics metrics) {
-
- this.id = id;
- this.consumer = consumer;
- this.punctuationQueue = new PunctuationQueue();
- this.maxBufferedSize = config.getInt(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
- this.topology = topology;
-
- // create queues for each assigned partition and associate them
- // to corresponding source nodes in the processor topology
- Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
-
- for (TopicPartition partition : partitions) {
- SourceNode source = topology.source(partition.topic());
- RecordQueue queue = createRecordQueue(partition, source);
- partitionQueues.put(partition, queue);
- }
-
- TimestampExtractor timestampExtractor = config.getConfiguredInstance(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
- this.partitionGroup = new PartitionGroup(partitionQueues, timestampExtractor);
-
- // initialize the consumed and produced offset cache
- this.consumedOffsets = new HashMap<>();
-
- // create the record recordCollector that maintains the produced offsets
- this.recordCollector = new RecordCollector(producer);
-
- log.info("Creating restoration consumer client for stream task #" + id());
-
- // create the processor state manager
- try {
- File stateFile = new File(config.getString(StreamingConfig.STATE_DIR_CONFIG), id.toString());
- this.stateMgr = new ProcessorStateManager(id.partition, stateFile, restoreConsumer);
- } catch (IOException e) {
- throw new KafkaException("Error while creating the state manager", e);
- }
-
- // initialize the topology with its own context
- this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
-
- // initialize the state stores
- for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
- StateStore store = stateStoreSupplier.get();
- store.init(this.processorContext);
- }
-
- // initialize the task by initializing all its processor nodes in the topology
- for (ProcessorNode node : this.topology.processors()) {
- this.currNode = node;
- try {
- node.init(this.processorContext);
- } finally {
- this.currNode = null;
- }
- }
-
- this.processorContext.initialized();
- }
-
- public TaskId id() {
- return id;
- }
-
- public Set<TopicPartition> partitions() {
- return this.partitionGroup.partitions();
- }
-
- /**
- * Adds records to queues
- *
- * @param partition the partition
- * @param records the records
- */
- @SuppressWarnings("unchecked")
- public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
- int queueSize = partitionGroup.addRawRecords(partition, records);
-
- // if after adding these records, its partition queue's buffered size has been
- // increased beyond the threshold, we can then pause the consumption for this partition
- if (queueSize > this.maxBufferedSize) {
- consumer.pause(partition);
- }
- }
-
- /**
- * Process one record
- *
- * @return number of records left in the buffer of this task's partition group after the processing is done
- */
- @SuppressWarnings("unchecked")
- public int process() {
- synchronized (this) {
- // get the next record to process
- StampedRecord record = partitionGroup.nextRecord(recordInfo);
-
- // if there is no record to process, return immediately
- if (record == null) {
- requiresPoll = true;
- return 0;
- }
-
- requiresPoll = false;
-
- try {
- // process the record by passing to the source node of the topology
- this.currRecord = record;
- this.currNode = recordInfo.node();
- TopicPartition partition = recordInfo.partition();
-
- log.debug("Start processing one record [" + currRecord + "]");
-
- this.currNode.process(currRecord.key(), currRecord.value());
-
- log.debug("Completed processing one record [" + currRecord + "]");
-
- // update the consumed offset map after processing is done
- consumedOffsets.put(partition, currRecord.offset());
- commitOffsetNeeded = true;
-
- // after processing this record, if its partition queue's buffered size has been
- // decreased to the threshold, we can then resume the consumption on this partition
- if (partitionGroup.numBuffered(partition) == this.maxBufferedSize) {
- consumer.resume(partition);
- requiresPoll = true;
- }
-
- if (partitionGroup.topQueueSize() <= this.maxBufferedSize) {
- requiresPoll = true;
- }
- } finally {
- this.currRecord = null;
- this.currNode = null;
- }
-
- return partitionGroup.numBuffered();
- }
- }
-
- public boolean requiresPoll() {
- return requiresPoll;
- }
-
- /**
- * Possibly trigger registered punctuation functions if
- * current time has reached the defined stamp
- *
- * @param timestamp
- */
- public boolean maybePunctuate(long timestamp) {
- return punctuationQueue.mayPunctuate(timestamp, this);
- }
-
- @Override
- public void punctuate(ProcessorNode node, long timestamp) {
- if (currNode != null)
- throw new IllegalStateException("Current node is not null");
-
- currNode = node;
- try {
- node.processor().punctuate(timestamp);
- } finally {
- currNode = null;
- }
- }
-
- public StampedRecord record() {
- return this.currRecord;
- }
-
- public ProcessorNode node() {
- return this.currNode;
- }
-
- public ProcessorTopology topology() {
- return this.topology;
- }
-
- /**
- * Commit the current task state
- */
- public void commit() {
- // 1) flush local state
- stateMgr.flush();
-
- // 2) flush produced records in the downstream and change logs of local states
- recordCollector.flush();
-
- // 3) commit consumed offsets if it is dirty already
- if (commitOffsetNeeded) {
- Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());
- for (Map.Entry<TopicPartition, Long> entry : consumedOffsets.entrySet()) {
- consumedOffsetsAndMetadata.put(entry.getKey(), new OffsetAndMetadata(entry.getValue() + 1L));
- }
- consumer.commitSync(consumedOffsetsAndMetadata);
- commitOffsetNeeded = false;
- }
-
- commitRequested = false;
- }
-
- /**
- * Whether or not a request has been made to commit the current state
- */
- public boolean commitNeeded() {
- return this.commitRequested;
- }
-
- /**
- * Request committing the current task's state
- */
- public void needCommit() {
- this.commitRequested = true;
- }
-
- /**
- * Schedules a punctuation for the processor
- *
- * @param interval the interval in milliseconds
- */
- public void schedule(long interval) {
- if (currNode == null)
- throw new IllegalStateException("Current node is null");
-
- punctuationQueue.schedule(new PunctuationSchedule(currNode, interval));
- }
-
- public void close() {
- this.partitionGroup.close();
- this.consumedOffsets.clear();
-
- // close the processors
- // make sure close() is called for each node even when there is a RuntimeException
- RuntimeException exception = null;
- for (ProcessorNode node : this.topology.processors()) {
- currNode = node;
- try {
- node.close();
- } catch (RuntimeException e) {
- exception = e;
- } finally {
- currNode = null;
- }
- }
-
- if (exception != null)
- throw exception;
-
- try {
- stateMgr.close(recordCollector.offsets());
- } catch (IOException e) {
- throw new KafkaException("Error while closing the state manager in processor context", e);
- }
- }
-
- private RecordQueue createRecordQueue(TopicPartition partition, SourceNode source) {
- return new RecordQueue(partition, source);
- }
-
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value) {
- ProcessorNode thisNode = currNode;
- for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
- currNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currNode = thisNode;
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- public <K, V> void forward(K key, V value, int childIndex) {
- ProcessorNode thisNode = currNode;
- ProcessorNode childNode = (ProcessorNode<K, V>) thisNode.children().get(childIndex);
- currNode = childNode;
- try {
- childNode.process(key, value);
- } finally {
- currNode = thisNode;
- }
- }
-
- public ProcessorContext context() {
- return processorContext;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
deleted file mode 100644
index ba81421..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ /dev/null
@@ -1,586 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamingConfig;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.PartitionGrouper;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.TopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.channels.FileLock;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class StreamThread extends Thread {
-
- private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
- private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
-
- private final AtomicBoolean running;
-
- protected final StreamingConfig config;
- protected final TopologyBuilder builder;
- protected final PartitionGrouper partitionGrouper;
- protected final Producer<byte[], byte[]> producer;
- protected final Consumer<byte[], byte[]> consumer;
- protected final Consumer<byte[], byte[]> restoreConsumer;
-
- private final Map<TaskId, StreamTask> tasks;
- private final String clientId;
- private final Time time;
- private final File stateDir;
- private final long pollTimeMs;
- private final long cleanTimeMs;
- private final long commitTimeMs;
- private final long totalRecordsToProcess;
- private final StreamingMetricsImpl sensors;
-
- private long lastClean;
- private long lastCommit;
- private long recordsProcessed;
-
- final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() {
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> assignment) {
- addPartitions(assignment);
- lastClean = time.milliseconds(); // start the cleaning cycle
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> assignment) {
- commitAll();
- removePartitions();
- lastClean = Long.MAX_VALUE; // stop the cleaning cycle until partitions are assigned
- }
- };
-
- public StreamThread(TopologyBuilder builder,
- StreamingConfig config,
- String clientId,
- Metrics metrics,
- Time time) throws Exception {
- this(builder, config, null , null, null, clientId, metrics, time);
- }
-
- StreamThread(TopologyBuilder builder,
- StreamingConfig config,
- Producer<byte[], byte[]> producer,
- Consumer<byte[], byte[]> consumer,
- Consumer<byte[], byte[]> restoreConsumer,
- String clientId,
- Metrics metrics,
- Time time) throws Exception {
- super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
-
- this.config = config;
- this.builder = builder;
- this.clientId = clientId;
- this.partitionGrouper = config.getConfiguredInstance(StreamingConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
- this.partitionGrouper.topicGroups(builder.topicGroups());
-
- // set the producer and consumer clients
- this.producer = (producer != null) ? producer : createProducer();
- this.consumer = (consumer != null) ? consumer : createConsumer();
- this.restoreConsumer = (restoreConsumer != null) ? restoreConsumer : createRestoreConsumer();
-
- // initialize the task list
- this.tasks = new HashMap<>();
-
- // read in task specific config values
- this.stateDir = new File(this.config.getString(StreamingConfig.STATE_DIR_CONFIG));
- this.stateDir.mkdir();
- this.pollTimeMs = config.getLong(StreamingConfig.POLL_MS_CONFIG);
- this.commitTimeMs = config.getLong(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG);
- this.cleanTimeMs = config.getLong(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
- this.totalRecordsToProcess = config.getLong(StreamingConfig.TOTAL_RECORDS_TO_PROCESS);
-
- this.lastClean = Long.MAX_VALUE; // the cleaning cycle won't start until partition assignment
- this.lastCommit = time.milliseconds();
- this.recordsProcessed = 0;
- this.time = time;
-
- this.sensors = new StreamingMetricsImpl(metrics);
-
- this.running = new AtomicBoolean(true);
- }
-
- private Producer<byte[], byte[]> createProducer() {
- log.info("Creating producer client for stream thread [" + this.getName() + "]");
- return new KafkaProducer<>(config.getProducerConfigs(),
- new ByteArraySerializer(),
- new ByteArraySerializer());
- }
-
- private Consumer<byte[], byte[]> createConsumer() {
- log.info("Creating consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(partitionGrouper),
- new ByteArrayDeserializer(),
- new ByteArrayDeserializer());
- }
-
- private Consumer<byte[], byte[]> createRestoreConsumer() {
- log.info("Creating restore consumer client for stream thread [" + this.getName() + "]");
- return new KafkaConsumer<>(config.getConsumerConfigs(),
- new ByteArrayDeserializer(),
- new ByteArrayDeserializer());
- }
-
- /**
- * Execute the stream processors
- */
- @Override
- public void run() {
- log.info("Starting stream thread [" + this.getName() + "]");
-
- try {
- runLoop();
- } catch (RuntimeException e) {
- log.error("Uncaught error during processing in thread [" + this.getName() + "]: ", e);
- throw e;
- } finally {
- shutdown();
- }
- }
-
- /**
- * Shutdown this streaming thread.
- */
- public void close() {
- running.set(false);
- }
-
- public Map<TaskId, StreamTask> tasks() {
- return Collections.unmodifiableMap(tasks);
- }
-
- private void shutdown() {
- log.info("Shutting down stream thread [" + this.getName() + "]");
-
- // Exceptions should not prevent this call from going through all shutdown steps.
- try {
- commitAll();
- } catch (Throwable e) {
- // already logged in commitAll()
- }
- try {
- producer.close();
- } catch (Throwable e) {
- log.error("Failed to close producer in thread [" + this.getName() + "]: ", e);
- }
- try {
- consumer.close();
- } catch (Throwable e) {
- log.error("Failed to close consumer in thread [" + this.getName() + "]: ", e);
- }
- try {
- restoreConsumer.close();
- } catch (Throwable e) {
- log.error("Failed to close restore consumer in thread [" + this.getName() + "]: ", e);
- }
- try {
- removePartitions();
- } catch (Throwable e) {
- // already logged in removePartition()
- }
-
- log.info("Stream thread shutdown complete [" + this.getName() + "]");
- }
-
- private void runLoop() {
- try {
- int totalNumBuffered = 0;
- boolean requiresPoll = true;
-
- ensureCopartitioning(builder.copartitionGroups());
-
- consumer.subscribe(new ArrayList<>(builder.sourceTopics()), rebalanceListener);
-
- while (stillRunning()) {
- // try to fetch some records if necessary
- if (requiresPoll) {
- long startPoll = time.milliseconds();
-
- ConsumerRecords<byte[], byte[]> records = consumer.poll(totalNumBuffered == 0 ? this.pollTimeMs : 0);
-
- if (!records.isEmpty()) {
- for (StreamTask task : tasks.values()) {
- for (TopicPartition partition : task.partitions()) {
- task.addRecords(partition, records.records(partition));
- }
- }
- }
-
- long endPoll = time.milliseconds();
- sensors.pollTimeSensor.record(endPoll - startPoll);
- }
-
- totalNumBuffered = 0;
-
- if (!tasks.isEmpty()) {
- // try to process one record from each task
- requiresPoll = false;
-
- for (StreamTask task : tasks.values()) {
- long startProcess = time.milliseconds();
-
- totalNumBuffered += task.process();
- requiresPoll = requiresPoll || task.requiresPoll();
-
- sensors.processTimeSensor.record(time.milliseconds() - startProcess);
- }
-
- maybePunctuate();
- maybeCommit();
- } else {
- // even when no task is assigned, we must poll to get a task.
- requiresPoll = true;
- }
-
- maybeClean();
- }
- } catch (Exception e) {
- throw new KafkaException(e);
- }
- }
-
- private boolean stillRunning() {
- if (!running.get()) {
- log.debug("Shutting down at user request.");
- return false;
- }
-
- if (totalRecordsToProcess >= 0 && recordsProcessed >= totalRecordsToProcess) {
- log.debug("Shutting down as we've reached the user configured limit of {} records to process.", totalRecordsToProcess);
- return false;
- }
-
- return true;
- }
-
- private void maybePunctuate() {
- for (StreamTask task : tasks.values()) {
- try {
- long now = time.milliseconds();
-
- if (task.maybePunctuate(now))
- sensors.punctuateTimeSensor.record(time.milliseconds() - now);
-
- } catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- }
- }
-
- protected void maybeCommit() {
- long now = time.milliseconds();
-
- if (commitTimeMs >= 0 && lastCommit + commitTimeMs < now) {
- log.trace("Committing processor instances because the commit interval has elapsed.");
-
- commitAll();
- lastCommit = now;
- } else {
- for (StreamTask task : tasks.values()) {
- try {
- if (task.commitNeeded())
- commitOne(task, time.milliseconds());
- } catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- }
- }
- }
-
- /**
- * Commit the states of all its tasks
- */
- private void commitAll() {
- for (StreamTask task : tasks.values()) {
- try {
- commitOne(task, time.milliseconds());
- } catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- }
- }
-
- /**
- * Commit the state of a task
- */
- private void commitOne(StreamTask task, long now) {
- try {
- task.commit();
- } catch (Exception e) {
- log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
-
- sensors.commitTimeSensor.record(time.milliseconds() - now);
- }
-
- /**
- * Cleanup any states of the tasks that have been removed from this thread
- */
- protected void maybeClean() {
- long now = time.milliseconds();
-
- if (now > lastClean + cleanTimeMs) {
- File[] stateDirs = stateDir.listFiles();
- if (stateDirs != null) {
- for (File dir : stateDirs) {
- try {
- TaskId id = TaskId.parse(dir.getName());
-
- // try to acquire the exclusive lock on the state directory
- FileLock directoryLock = null;
- try {
- directoryLock = ProcessorStateManager.lockStateDirectory(dir);
- if (directoryLock != null) {
- log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs);
- Utils.delete(dir);
- }
- } catch (IOException e) {
- log.error("Failed to lock the state directory due to an unexpected exception", e);
- } finally {
- if (directoryLock != null) {
- try {
- directoryLock.release();
- } catch (IOException e) {
- log.error("Failed to release the state directory lock");
- }
- }
- }
- } catch (TaskId.TaskIdFormatException e) {
- // there may be some unknown files that sits in the same directory,
- // we should ignore these files instead trying to delete them as well
- }
- }
- }
-
- lastClean = now;
- }
- }
-
- protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- sensors.taskCreationSensor.record();
-
- ProcessorTopology topology = builder.build(id.topicGroupId);
-
- return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, topology, config, sensors);
- }
-
- private void addPartitions(Collection<TopicPartition> assignment) {
-
- HashMap<TaskId, Set<TopicPartition>> partitionsForTask = new HashMap<>();
-
- for (TopicPartition partition : assignment) {
- Set<TaskId> taskIds = partitionGrouper.taskIds(partition);
- for (TaskId taskId : taskIds) {
- Set<TopicPartition> partitions = partitionsForTask.get(taskId);
- if (partitions == null) {
- partitions = new HashSet<>();
- partitionsForTask.put(taskId, partitions);
- }
- partitions.add(partition);
- }
- }
-
- // create the tasks
- for (TaskId taskId : partitionsForTask.keySet()) {
- try {
- tasks.put(taskId, createStreamTask(taskId, partitionsForTask.get(taskId)));
- } catch (Exception e) {
- log.error("Failed to create a task #" + taskId + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- }
-
- lastClean = time.milliseconds();
- }
-
- private void removePartitions() {
-
- // TODO: change this clearing tasks behavior
- for (StreamTask task : tasks.values()) {
- log.info("Removing task {}", task.id());
- try {
- task.close();
- } catch (Exception e) {
- log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
- throw e;
- }
- sensors.taskDestructionSensor.record();
- }
- tasks.clear();
- }
-
- public PartitionGrouper partitionGrouper() {
- return partitionGrouper;
- }
-
- private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {
- for (Set<String> copartitionGroup : copartitionGroups) {
- ensureCopartitioning(copartitionGroup);
- }
- }
-
- private void ensureCopartitioning(Set<String> copartitionGroup) {
- int numPartitions = -1;
-
- for (String topic : copartitionGroup) {
- List<PartitionInfo> infos = consumer.partitionsFor(topic);
-
- if (infos == null)
- throw new KafkaException("topic not found: " + topic);
-
- if (numPartitions == -1) {
- numPartitions = infos.size();
- } else if (numPartitions != infos.size()) {
- String[] topics = copartitionGroup.toArray(new String[copartitionGroup.size()]);
- Arrays.sort(topics);
- throw new KafkaException("topics not copartitioned: [" + Utils.mkString(Arrays.asList(topics), ",") + "]");
- }
- }
- }
-
- private class StreamingMetricsImpl implements StreamingMetrics {
- final Metrics metrics;
- final String metricGrpName;
- final Map<String, String> metricTags;
-
- final Sensor commitTimeSensor;
- final Sensor pollTimeSensor;
- final Sensor processTimeSensor;
- final Sensor punctuateTimeSensor;
- final Sensor taskCreationSensor;
- final Sensor taskDestructionSensor;
-
- public StreamingMetricsImpl(Metrics metrics) {
-
- this.metrics = metrics;
- this.metricGrpName = "streaming-metrics";
- this.metricTags = new LinkedHashMap<>();
- this.metricTags.put("client-id", clientId + "-" + getName());
-
- this.commitTimeSensor = metrics.sensor("commit-time");
- this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
- this.commitTimeSensor.add(new MetricName("commit-time-max", metricGrpName, "The maximum commit time in ms", metricTags), new Max());
- this.commitTimeSensor.add(new MetricName("commit-calls-rate", metricGrpName, "The average per-second number of commit calls", metricTags), new Rate(new Count()));
-
- this.pollTimeSensor = metrics.sensor("poll-time");
- this.pollTimeSensor.add(new MetricName("poll-time-avg", metricGrpName, "The average poll time in ms", metricTags), new Avg());
- this.pollTimeSensor.add(new MetricName("poll-time-max", metricGrpName, "The maximum poll time in ms", metricTags), new Max());
- this.pollTimeSensor.add(new MetricName("poll-calls-rate", metricGrpName, "The average per-second number of record-poll calls", metricTags), new Rate(new Count()));
-
- this.processTimeSensor = metrics.sensor("process-time");
- this.processTimeSensor.add(new MetricName("process-time-avg-ms", metricGrpName, "The average process time in ms", metricTags), new Avg());
- this.processTimeSensor.add(new MetricName("process-time-max-ms", metricGrpName, "The maximum process time in ms", metricTags), new Max());
- this.processTimeSensor.add(new MetricName("process-calls-rate", metricGrpName, "The average per-second number of process calls", metricTags), new Rate(new Count()));
-
- this.punctuateTimeSensor = metrics.sensor("punctuate-time");
- this.punctuateTimeSensor.add(new MetricName("punctuate-time-avg", metricGrpName, "The average punctuate time in ms", metricTags), new Avg());
- this.punctuateTimeSensor.add(new MetricName("punctuate-time-max", metricGrpName, "The maximum punctuate time in ms", metricTags), new Max());
- this.punctuateTimeSensor.add(new MetricName("punctuate-calls-rate", metricGrpName, "The average per-second number of punctuate calls", metricTags), new Rate(new Count()));
-
- this.taskCreationSensor = metrics.sensor("task-creation");
- this.taskCreationSensor.add(new MetricName("task-creation-rate", metricGrpName, "The average per-second number of newly created tasks", metricTags), new Rate(new Count()));
-
- this.taskDestructionSensor = metrics.sensor("task-destruction");
- this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
- }
-
- @Override
- public void recordLatency(Sensor sensor, long startNs, long endNs) {
- sensor.record((endNs - startNs) / 1000000, endNs);
- }
-
- @Override
- public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
- // extract the additional tags if there are any
- Map<String, String> tagMap = new HashMap<>(this.metricTags);
- if ((tags.length % 2) != 0)
- throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
-
- for (int i = 0; i < tags.length; i += 2)
- tagMap.put(tags[i], tags[i + 1]);
-
- // first add the global operation metrics if not yet, with the global tags only
- Sensor parent = metrics.sensor(operationName);
- addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
-
- // add the store operation metrics with additional tags
- Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
- addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
-
- return sensor;
- }
-
- private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
- maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
- "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
- maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
- "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
- maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
- "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
- }
-
- private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
- if (!metrics.metrics().containsKey(name))
- sensor.add(name, stat);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
deleted file mode 100644
index d8a012a..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampTracker.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals;
-
-/**
- * TimestampTracker is a helper class for a sliding window implementation.
- * It is assumed that stamped elements are added or removed in a FIFO manner.
- * It maintains the timestamp, such as the min timestamp, the max timestamp, etc.
- * of stamped elements that were added but not yet removed.
- */
-public interface TimestampTracker<E> {
-
- static final long NOT_KNOWN = -1L;
-
- /**
- * Adds a stamped elements to this tracker.
- *
- * @param elem the added element
- */
- void addElement(Stamped<E> elem);
-
- /**
- * Removed a stamped elements to this tracker.
- *
- * @param elem the removed element
- */
- void removeElement(Stamped<E> elem);
-
- /**
- * Returns the current tracked timestamp
- *
- * @return timestamp, or {@link #NOT_KNOWN} when empty
- */
- long get();
-
- /**
- * Returns the size of internal structure. The meaning of "size" depends on the implementation.
- *
- * @return size
- */
- int size();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java b/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
deleted file mode 100644
index 183b691..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/Entry.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-public class Entry<K, V> {
-
- private final K key;
- private final V value;
-
- public Entry(K key, V value) {
- this.key = key;
- this.value = value;
- }
-
- public K key() {
- return key;
- }
-
- public V value() {
- return value;
- }
-
- public String toString() {
- return "Entry(" + key() + ", " + value() + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
deleted file mode 100644
index d1f845c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * An in-memory key-value store based on a TreeMap.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- * @see Stores#create(String, org.apache.kafka.streams.StreamingConfig)
- */
-public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
- }
-
- private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final NavigableMap<K, V> map;
-
- public MemoryStore(String name) {
- super();
- this.name = name;
- this.map = new TreeMap<>();
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- return this.map.remove(key);
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<Map.Entry<K, V>> iter;
-
- public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
- this.iter = iter;
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- Map.Entry<K, V> entry = iter.next();
- return new Entry<>(entry.getKey(), entry.getValue());
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
deleted file mode 100644
index a346534..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryLRUCacheStoreSupplier.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
-/**
- * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
- *
- * @param <K> The key type
- * @param <V> The value type
- *
- */
-public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
-
- private final String name;
- private final int capacity;
- private final Serdes serdes;
- private final Time time;
-
- protected InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
- this.name = name;
- this.capacity = capacity;
- this.serdes = serdes;
- this.time = time;
- }
-
- public String name() {
- return name;
- }
-
- public StateStore get() {
- MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
- final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
- cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
- @Override
- public void apply(K key, V value) {
- store.removed(key);
- }
- });
- return store;
- }
-
- private static interface EldestEntryRemovalListener<K, V> {
- public void apply(K key, V value);
- }
-
- protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
- private final String name;
- private final Map<K, V> map;
- private final NavigableSet<K> keys;
- private EldestEntryRemovalListener<K, V> listener;
-
- public MemoryLRUCache(String name, final int maxCacheSize) {
- this.name = name;
- this.keys = new TreeSet<>();
- // leave room for one extra entry to handle adding an entry before the oldest can be removed
- this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
- private static final long serialVersionUID = 1L;
-
- @Override
- protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
- if (size() > maxCacheSize) {
- K key = eldest.getKey();
- keys.remove(key);
- if (listener != null) listener.apply(key, eldest.getValue());
- return true;
- }
- return false;
- }
- };
- }
-
- protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public String name() {
- return this.name;
- }
-
- @Override
- public void init(ProcessorContext context) {
- // do-nothing since it is in-memory
- }
-
- @Override
- public boolean persistent() {
- return false;
- }
-
- @Override
- public V get(K key) {
- return this.map.get(key);
- }
-
- @Override
- public void put(K key, V value) {
- this.map.put(key, value);
- this.keys.add(key);
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry : entries)
- put(entry.key(), entry.value());
- }
-
- @Override
- public V delete(K key) {
- V value = this.map.remove(key);
- this.keys.remove(key);
- return value;
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
- }
-
- @Override
- public void flush() {
- // do-nothing since it is in-memory
- }
-
- @Override
- public void close() {
- // do-nothing
- }
-
- private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
- private final Iterator<K> keys;
- private final Map<K, V> entries;
- private K lastKey;
-
- public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
- this.keys = keys;
- this.entries = entries;
- }
-
- @Override
- public boolean hasNext() {
- return keys.hasNext();
- }
-
- @Override
- public Entry<K, V> next() {
- lastKey = keys.next();
- return new Entry<>(lastKey, entries.get(lastKey));
- }
-
- @Override
- public void remove() {
- keys.remove();
- entries.remove(lastKey);
- }
-
- @Override
- public void close() {
- // do nothing
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
deleted file mode 100644
index 0fbd4ae..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import java.io.Closeable;
-import java.util.Iterator;
-
-public interface KeyValueIterator<K, V> extends Iterator<Entry<K, V>>, Closeable {
-
- @Override
- public void close();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
deleted file mode 100644
index e4faed1..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.streams.processor.StateStore;
-
-import java.util.List;
-
-/**
- * A key-value store that supports put/get/delete and range queries.
- *
- * @param <K> The key type
- * @param <V> The value type
- */
-public interface KeyValueStore<K, V> extends StateStore {
-
- /**
- * Get the value corresponding to this key
- *
- * @param key The key to fetch
- * @return The value or null if no value is found.
- * @throws NullPointerException If null is used for key.
- */
- abstract public V get(K key);
-
- /**
- * Update the value associated with this key
- *
- * @param key They key to associate the value to
- * @param value The value
- * @throws NullPointerException If null is used for key or value.
- */
- abstract public void put(K key, V value);
-
- /**
- * Update all the given key/value pairs
- *
- * @param entries A list of entries to put into the store.
- * @throws NullPointerException If null is used for any key or value.
- */
- abstract public void putAll(List<Entry<K, V>> entries);
-
- /**
- * Delete the value from the store (if there is one)
- *
- * @param key The key
- * @return The old value or null if there is no such key.
- * @throws NullPointerException If null is used for key.
- */
- abstract public V delete(K key);
-
- /**
- * Get an iterator over a given range of keys. This iterator MUST be closed after use.
- *
- * @param from The first key that could be in the range
- * @param to The last key that could be in the range
- * @return The iterator for this range.
- * @throws NullPointerException If null is used for from or to.
- */
- abstract public KeyValueIterator<K, V> range(K from, K to);
-
- /**
- * Return an iterator over all keys in the database. This iterator MUST be closed after use.
- *
- * @return An iterator of all key/value pairs in the store.
- */
- abstract public KeyValueIterator<K, V> all();
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
deleted file mode 100644
index c1ccbd4..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
-
- protected final KeyValueStore<K, V> inner;
- protected final Serdes<K, V> serialization;
- protected final String metricGrp;
- protected final Time time;
-
- private final String topic;
-
- private Sensor putTime;
- private Sensor getTime;
- private Sensor deleteTime;
- private Sensor putAllTime;
- private Sensor allTime;
- private Sensor rangeTime;
- private Sensor flushTime;
- private Sensor restoreTime;
- private StreamingMetrics metrics;
-
- private final Set<K> dirty;
- private final Set<K> removed;
- private final int maxDirty;
- private final int maxRemoved;
-
- private int partition;
- private ProcessorContext context;
-
- // always wrap the logged store with the metered store
- public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricGrp, Time time) {
- this.inner = inner;
- this.serialization = serialization;
- this.metricGrp = metricGrp;
- this.time = time != null ? time : new SystemTime();
- this.topic = inner.name();
-
- this.dirty = new HashSet<K>();
- this.removed = new HashSet<K>();
- this.maxDirty = 100; // TODO: this needs to be configurable
- this.maxRemoved = 100; // TODO: this needs to be configurable
- }
-
- @Override
- public String name() {
- return inner.name();
- }
-
- @Override
- public void init(ProcessorContext context) {
- String name = name();
- this.metrics = context.metrics();
- this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
- this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
- this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
- this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
- this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
- this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
- this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
- this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
-
- this.context = context;
- this.partition = context.id().partition;
-
- // register and possibly restore the state from the logs
- long startNs = time.nanoseconds();
- inner.init(context);
- try {
- final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
- final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
- context.register(this, new StateRestoreCallback() {
- @Override
- public void restore(byte[] key, byte[] value) {
- inner.put(keyDeserializer.deserialize(topic, key),
- valDeserializer.deserialize(topic, value));
- }
- });
- } finally {
- this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public boolean persistent() {
- return inner.persistent();
- }
-
- @Override
- public V get(K key) {
- long startNs = time.nanoseconds();
- try {
- return this.inner.get(key);
- } finally {
- this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void put(K key, V value) {
- long startNs = time.nanoseconds();
- try {
- this.inner.put(key, value);
-
- this.dirty.add(key);
- this.removed.remove(key);
- maybeLogChange();
- } finally {
- this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public void putAll(List<Entry<K, V>> entries) {
- long startNs = time.nanoseconds();
- try {
- this.inner.putAll(entries);
-
- for (Entry<K, V> entry : entries) {
- K key = entry.key();
- this.dirty.add(key);
- this.removed.remove(key);
- }
-
- maybeLogChange();
- } finally {
- this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
- }
- }
-
- @Override
- public V delete(K key) {
- long startNs = time.nanoseconds();
- try {
- V value = this.inner.delete(key);
-
- this.dirty.remove(key);
- this.removed.add(key);
- maybeLogChange();
-
- return value;
- } finally {
- this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
- }
- }
-
- /**
- * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
- * store other than {@link #delete(Object)}.
- *
- * @param key the key for the entry that the inner store removed
- */
- protected void removed(K key) {
- this.dirty.remove(key);
- this.removed.add(key);
- maybeLogChange();
- }
-
- @Override
- public KeyValueIterator<K, V> range(K from, K to) {
- return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
- }
-
- @Override
- public KeyValueIterator<K, V> all() {
- return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
- }
-
- @Override
- public void close() {
- inner.close();
- }
-
- @Override
- public void flush() {
- long startNs = time.nanoseconds();
- try {
- this.inner.flush();
- logChange();
- } finally {
- this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
- }
- }
-
- private void maybeLogChange() {
- if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
- logChange();
- }
-
- private void logChange() {
- RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
- if (collector != null) {
- Serializer<K> keySerializer = serialization.keySerializer();
- Serializer<V> valueSerializer = serialization.valueSerializer();
-
- for (K k : this.removed) {
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
- }
- for (K k : this.dirty) {
- V v = this.inner.get(k);
- collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
- }
- this.removed.clear();
- this.dirty.clear();
- }
- }
-
- private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
- private final KeyValueIterator<K1, V1> iter;
- private final Sensor sensor;
- private final long startNs;
-
- public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
- this.iter = iter;
- this.sensor = sensor;
- this.startNs = time.nanoseconds();
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<K1, V1> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- iter.remove();
- }
-
- @Override
- public void close() {
- try {
- iter.close();
- } finally {
- metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
deleted file mode 100644
index e04de68..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class saves out a map of topic/partition=>offsets to a file. The format of the file is UTF-8 text containing the following:
- * <pre>
- * <version>
- * <n>
- * <topic_name_1> <partition_1> <offset_1>
- * .
- * .
- * .
- * <topic_name_n> <partition_n> <offset_n>
- * </pre>
- * The first line contains a number designating the format version (currently 0), the get line contains
- * a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
- * separated by spaces.
- */
-public class OffsetCheckpoint {
-
- private static final int VERSION = 0;
-
- private final File file;
- private final Object lock;
-
- public OffsetCheckpoint(File file) throws IOException {
- new File(file + ".tmp").delete(); // try to delete any existing temp files for cleanliness
- this.file = file;
- this.lock = new Object();
- }
-
- public void write(Map<TopicPartition, Long> offsets) throws IOException {
- synchronized (lock) {
- // write to temp file and then swap with the existing file
- File temp = new File(file.getAbsolutePath() + ".tmp");
-
- FileOutputStream fileOutputStream = new FileOutputStream(temp);
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
- try {
- writeIntLine(writer, VERSION);
- writeIntLine(writer, offsets.size());
-
- // write the entries
- for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
- writeEntry(writer, entry.getKey(), entry.getValue());
-
- // flush the buffer and then fsync the underlying file
- writer.flush();
- fileOutputStream.getFD().sync();
- } finally {
- writer.close();
- }
-
- // swap new offset checkpoint file with previous one
- if (!temp.renameTo(file)) {
- // renameTo() fails on Windows if the destination file exists.
- file.delete();
- if (!temp.renameTo(file))
- throw new IOException(String.format("File rename from %s to %s failed.",
- temp.getAbsolutePath(),
- file.getAbsolutePath()));
- }
- }
- }
-
- private void writeIntLine(BufferedWriter writer, int number) throws IOException {
- writer.write(Integer.toString(number));
- writer.newLine();
- }
-
- private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
- writer.write(part.topic());
- writer.write(' ');
- writer.write(Integer.toString(part.partition()));
- writer.write(' ');
- writer.write(Long.toString(offset));
- writer.newLine();
- }
-
- public Map<TopicPartition, Long> read() throws IOException {
- synchronized (lock) {
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new FileReader(file));
- } catch (FileNotFoundException e) {
- return Collections.emptyMap();
- }
-
- try {
- int version = readInt(reader);
- switch (version) {
- case 0:
- int expectedSize = readInt(reader);
- Map<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
- String line = reader.readLine();
- while (line != null) {
- String[] pieces = line.split("\\s+");
- if (pieces.length != 3)
- throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
- line));
-
- String topic = pieces[0];
- int partition = Integer.parseInt(pieces[1]);
- long offset = Long.parseLong(pieces[2]);
- offsets.put(new TopicPartition(topic, partition), offset);
- line = reader.readLine();
- }
- if (offsets.size() != expectedSize)
- throw new IOException(String.format("Expected %d entries but found only %d",
- expectedSize,
- offsets.size()));
- return offsets;
-
- default:
- throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
- }
- } finally {
- if (reader != null)
- reader.close();
- }
- }
- }
-
- private int readInt(BufferedReader reader) throws IOException {
- String line = reader.readLine();
- if (line == null)
- throw new EOFException("File ended prematurely.");
- int val = Integer.parseInt(line);
- return val;
- }
-
- public void delete() throws IOException {
- file.delete();
- }
-
- @Override
- public String toString() {
- return this.file.getAbsolutePath();
- }
-
-}