You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:05 UTC
[04/14] storm git commit: STORM-2937: Overwrite storm-kafka-client
1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from
1.x-branch
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
new file mode 100644
index 0000000..afe8f74
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used compute the partition and topic level offset metrics
+ * <p>
+ * Partition level metrics are:
+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
+ * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
+ * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
+ * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
+ * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
+ * topicName/partition_{number}/recordsInPartition // total number of records in the partition
+ * </p>
+ * <p>
+ * Topic level metrics are:
+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
+ * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
+ * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
+ * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
+ * topicName/spoutLag // total spout lag of all the associated partitions of this spout
+ * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
+ * </p>
+ */
+public class KafkaOffsetMetric implements IMetric {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
+ private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+ private final Supplier<KafkaConsumer> consumerSupplier;
+
+ public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) {
+ this.offsetManagerSupplier = offsetManagerSupplier;
+ this.consumerSupplier = consumerSupplier;
+ }
+
+ @Override
+ public Object getValueAndReset() {
+
+ Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
+ KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+ if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
+ LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
+ return null;
+ }
+
+ Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+ Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+ Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+ Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
+ //map to hold partition level and topic level metrics
+ Map<String, Long> result = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ OffsetManager offsetManager = entry.getValue();
+
+ long latestTimeOffset = endOffsets.get(topicPartition);
+ long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+ long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
+ long latestCompletedOffset = offsetManager.getCommittedOffset();
+ long spoutLag = latestTimeOffset - latestCompletedOffset;
+ long recordsInPartition = latestTimeOffset - earliestTimeOffset;
+
+ String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
+ result.put(metricPath + "/" + "spoutLag", spoutLag);
+ result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+ result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+ result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+ result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
+ result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);
+
+ TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
+ if (topicMetrics == null) {
+ topicMetrics = new TopicMetrics();
+ topicMetricsMap.put(topicPartition.topic(), topicMetrics);
+ }
+
+ topicMetrics.totalSpoutLag += spoutLag;
+ topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+ topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+ topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+ topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
+ topicMetrics.totalRecordsInPartitions += recordsInPartition;
+ }
+
+ for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+ String topic = e.getKey();
+ TopicMetrics topicMetrics = e.getValue();
+ result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+ result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+ result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+ result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+ result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
+ result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
+ }
+
+ LOG.debug("Metrics Tick: value : {}", result);
+ return result;
+ }
+
+ private class TopicMetrics {
+ long totalSpoutLag = 0;
+ long totalEarliestTimeOffset = 0;
+ long totalLatestTimeOffset = 0;
+ long totalLatestEmittedOffset = 0;
+ long totalLatestCompletedOffset = 0;
+ long totalRecordsInPartitions = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
new file mode 100644
index 0000000..6fa81aa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Wraps transaction batch information
+ */
+public class KafkaTridentSpoutBatchMetadata implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
+ private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
+
+ public static final String TOPIC_PARTITION_KEY = "tp";
+ public static final String FIRST_OFFSET_KEY = "firstOffset";
+ public static final String LAST_OFFSET_KEY = "lastOffset";
+
+ // topic partition of this batch
+ private final TopicPartition topicPartition;
+ // first offset of this batch
+ private final long firstOffset;
+ // last offset of this batch
+ private final long lastOffset;
+
+ /**
+ * Builds a metadata object.
+ * @param topicPartition The topic partition
+ * @param firstOffset The first offset for the batch
+ * @param lastOffset The last offset for the batch
+ */
+ public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
+ this.topicPartition = topicPartition;
+ this.firstOffset = firstOffset;
+ this.lastOffset = lastOffset;
+ }
+
+ /**
+ * Builds a metadata object from a non-empty set of records.
+ * @param topicPartition The topic partition the records belong to.
+ * @param consumerRecords The non-empty set of records.
+ */
+ public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
+ Validate.notNull(consumerRecords.records(topicPartition));
+ List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
+ Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata");
+
+ this.topicPartition = topicPartition;
+ firstOffset = records.get(0).offset();
+ lastOffset = records.get(records.size() - 1).offset();
+ LOG.debug("Created {}", this.toString());
+ }
+
+ public long getFirstOffset() {
+ return firstOffset;
+ }
+
+ public long getLastOffset() {
+ return lastOffset;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ /**
+ * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
+ * @param map The source map
+ * @return A new metadata object
+ */
+ public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
+ Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY);
+ TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
+ return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
+ ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+ }
+
+ /**
+ * Writes this metadata object to a Map so Trident can read/write it to Zookeeper.
+ */
+ public Map<String, Object> toMap() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
+ map.put(FIRST_OFFSET_KEY, firstOffset);
+ map.put(LAST_OFFSET_KEY, lastOffset);
+ return map;
+ }
+
+ @Override
+ public final String toString() {
+ return super.toString() +
+ "{topicPartition=" + topicPartition +
+ ", firstOffset=" + firstOffset +
+ ", lastOffset=" + lastOffset +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
new file mode 100644
index 0000000..3b4aa4b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
+ List<Map<String, Object>>,
+ KafkaTridentSpoutTopicPartition,
+ Map<String, Object>>,
+ Serializable {
+
+ private static final long serialVersionUID = -7343927794834130435L;
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+
+ // Kafka
+ private final KafkaConsumer<K, V> kafkaConsumer;
+
+ // Bookkeeping
+ private final KafkaTridentSpoutManager<K, V> kafkaManager;
+ // set of topic-partitions for which first poll has already occurred, and the first polled txid
+ private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>();
+
+ // Declare some KafkaTridentSpoutManager references for convenience
+ private final long pollTimeoutMs;
+ private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final RecordTranslator<K, V> translator;
+ private final Timer refreshSubscriptionTimer;
+ private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+
+ private TopologyContext topologyContext;
+
+ /**
+ * Create a new Kafka spout emitter.
+ * @param kafkaManager The Kafka consumer manager to use
+ * @param topologyContext The topology context
+ * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
+ */
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
+ this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
+ this.kafkaManager = kafkaManager;
+ this.topologyContext = topologyContext;
+ this.refreshSubscriptionTimer = refreshSubscriptionTimer;
+ this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
+
+ final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
+ this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
+ this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+ LOG.debug("Created {}", this.toString());
+ }
+
+ /**
+ * Creates instance of this class with default 500 millisecond refresh subscription timer
+ */
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
+ this(kafkaManager, topologyContext, new Timer(500,
+ kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
+ }
+
+ @Override
+ public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
+ KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
+
+ LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
+ tx, currBatchPartition, lastBatch, collector);
+
+ final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
+ final Set<TopicPartition> assignments = kafkaConsumer.assignment();
+ KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
+ KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
+ Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
+
+ if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
+ LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+ "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
+ "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
+ kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+ } else {
+ try {
+ // pause other topic-partitions to only poll from current topic-partition
+ pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
+
+ seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+
+ // poll
+ if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+ kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+ }
+
+ final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
+ LOG.debug("Polled [{}] records from Kafka.", records.count());
+
+ if (!records.isEmpty()) {
+ emitTuples(collector, records);
+ // build new metadata
+ currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
+ }
+ } finally {
+ kafkaConsumer.resume(pausedTopicPartitions);
+ LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
+ }
+ LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+ "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
+ }
+
+ return currentBatch == null ? null : currentBatch.toMap();
+ }
+
+ private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
+ for (ConsumerRecord<K, V> record : records) {
+ final List<Object> tuple = translator.apply(record);
+ collector.emit(tuple);
+ LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
+ }
+ }
+
+ /**
+ * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
+ * Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
+ *
+ * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
+ * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
+ * In the second case, there are either no previous transactions, or the MBC is still committing them
+ * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null.
+ * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
+ *
+ * @return the offset of the next fetch
+ */
+ private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
+ if (isFirstPoll(tp, transactionId)) {
+ if (firstPollOffsetStrategy == EARLIEST) {
+ LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+ } else if (firstPollOffsetStrategy == LATEST) {
+ LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ } else if (lastBatchMeta != null) {
+ LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+ kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
+ } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+ LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+ } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+ LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
+ }
+ firstPollTransaction.put(tp, transactionId);
+ } else {
+ kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1); // seek next offset after last offset from previous batch
+ LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+ }
+
+ final long fetchOffset = kafkaConsumer.position(tp);
+ LOG.debug("Set [fetchOffset = {}]", fetchOffset);
+ return fetchOffset;
+ }
+
+ private boolean isFirstPoll(TopicPartition tp, long txid) {
+ // The first poll is either the "real" first transaction, or a replay of the first transaction
+ return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
+ }
+
+ // returns paused topic-partitions.
+ private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
+ final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
+ LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
+ pausedTopicPartitions.remove(excludedTp);
+ kafkaConsumer.pause(pausedTopicPartitions);
+ LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
+ return pausedTopicPartitions;
+ }
+
+ @Override
+ public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
+ LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
+ "No action taken by this method for topic partitions {}", partitionResponsibilities);
+ }
+
+ /**
+ * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
+ * for this task must be assigned to the Kafka consumer running on this task.
+ *
+ * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
+ * @return ordered list of topic partitions for this task
+ */
+ @Override
+ public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
+ List<TopicPartition> allTopicPartitions = new ArrayList<>();
+ for(Map<String, Object> map : allPartitionInfo) {
+ allTopicPartitions.add(tpSerializer.fromMap(map));
+ }
+ final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
+ LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
+ allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
+ return allPartitions;
+ }
+
+ @Override
+ public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
+ List<Map<String, Object>> allPartitionInfo) {
+ final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
+ LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
+ final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
+ LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
+ return taskTps;
+ }
+
+ private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
+ final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size());
+ if (tps != null) {
+ for (TopicPartition tp : tps) {
+ LOG.trace("Added topic-partition [{}]", tp);
+ kttp.add(new KafkaTridentSpoutTopicPartition(tp));
+ }
+ }
+ return kttp;
+ }
+
+ private int getNumTasks() {
+ return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
+ }
+
+ @Override
+ public void close() {
+ kafkaConsumer.close();
+ LOG.debug("Closed");
+ }
+
+ @Override
+ public final String toString() {
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
new file mode 100644
index 0000000..26db5c9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+public class KafkaTridentSpoutManager<K, V> implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
+
+ // Kafka
+ private transient KafkaConsumer<K, V> kafkaConsumer;
+
+ // Bookkeeping
+ private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ // Declare some KafkaSpoutConfig references for convenience
+ private Fields fields;
+
+ /**
+ * Create a KafkaConsumer manager for the trident spout.
+ * @param kafkaSpoutConfig The consumer config
+ */
+ public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ this.kafkaSpoutConfig = kafkaSpoutConfig;
+ this.fields = getFields();
+ LOG.debug("Created {}", this.toString());
+ }
+
+ KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
+ kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+
+ kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
+ return kafkaConsumer;
+ }
+
+ KafkaConsumer<K, V> getKafkaConsumer() {
+ return kafkaConsumer;
+ }
+
+ Set<TopicPartition> getTopicPartitions() {
+ return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
+ }
+
+ final Fields getFields() {
+ if (fields == null) {
+ RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+ Fields fs = null;
+ for (String stream : translator.streams()) {
+ if (fs == null) {
+ fs = translator.getFieldsFor(stream);
+ } else {
+ if (!fs.equals(translator.getFieldsFor(stream))) {
+ throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
+ }
+ }
+ }
+ fields = fs;
+ }
+ LOG.debug("OutputFields = {}", fields);
+ return fields;
+ }
+
+ KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
+ return kafkaSpoutConfig;
+ }
+
+ @Override
+ public final String toString() {
+ return super.toString() +
+ "{kafkaConsumer=" + kafkaConsumer +
+ ", kafkaSpoutConfig=" + kafkaSpoutConfig +
+ '}';
+ }
+
+ private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
+ }
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
+ LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+ kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
new file mode 100644
index 0000000..ecc9219
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
+ KafkaTridentSpoutTopicPartition, Map<String, Object>> {
+ private static final long serialVersionUID = -8003272486566259640L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
+
+ private final KafkaTridentSpoutManager<K, V> kafkaManager;
+
+ public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
+ this(new KafkaTridentSpoutManager<>(conf));
+ }
+
+ public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
+ this.kafkaManager = kafkaManager;
+ LOG.debug("Created {}", this.toString());
+ }
+
+ @Override
+ public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
+ Map conf, TopologyContext context) {
+ return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
+ }
+
+ @Override
+ public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
+ return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ final Fields outputFields = kafkaManager.getFields();
+ LOG.debug("OutputFields = {}", outputFields);
+ return outputFields;
+ }
+
+ @Override
+ public final String toString() {
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
new file mode 100644
index 0000000..449e24b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
+ Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
+
+ private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+ private final KafkaTridentSpoutManager<K,V> kafkaManager;
+
+ public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
+ this.kafkaManager = kafkaManager;
+ LOG.debug("Created {}", this.toString());
+ }
+
+ @Override
+ public boolean isReady(long txid) {
+ LOG.debug("isReady = true");
+ return true; // the "old" trident kafka spout always returns true, like this
+ }
+
+ @Override
+ public List<Map<String, Object>> getPartitionsForBatch() {
+ final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
+ LOG.debug("TopicPartitions for batch {}", topicPartitions);
+ List<Map<String, Object>> tps = new ArrayList<>();
+ for(TopicPartition tp : topicPartitions) {
+ tps.add(tpSerializer.toMap(tp));
+ }
+ return tps;
+ }
+
+ @Override
+ public void close() {
+ LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
+ }
+
+ @Override
+ public final String toString() {
+ return super.toString() +
+ "{kafkaManager=" + kafkaManager +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
new file mode 100644
index 0000000..b020bea
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+import java.io.Serializable;
+
+/**
+ * {@link ISpoutPartition} that wraps {@link TopicPartition} information
+ */
+public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Serializable {
+ private TopicPartition topicPartition;
+
+ public KafkaTridentSpoutTopicPartition(String topic, int partition) {
+ this(new TopicPartition(topic, partition));
+ }
+
+ public KafkaTridentSpoutTopicPartition(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ }
+
+ public TopicPartition getTopicPartition() {
+ return topicPartition;
+ }
+
+ @Override
+ public String getId() {
+ return topicPartition.topic() + "@" + topicPartition.partition();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ KafkaTridentSpoutTopicPartition that = (KafkaTridentSpoutTopicPartition) o;
+
+ return topicPartition != null ? topicPartition.equals(that.topicPartition) : that.topicPartition == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return topicPartition != null ? topicPartition.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return topicPartition.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
new file mode 100644
index 0000000..2d50ca7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public enum KafkaTridentSpoutTopicPartitionRegistry {
+ INSTANCE;
+
+ private Set<TopicPartition> topicPartitions;
+
+ KafkaTridentSpoutTopicPartitionRegistry() {
+ this.topicPartitions = new LinkedHashSet<>();
+ }
+
+ public Set<TopicPartition> getTopicPartitions() {
+ return Collections.unmodifiableSet(topicPartitions);
+ }
+
+ public void addAll(Collection<? extends TopicPartition> topicPartitions) {
+ this.topicPartitions.addAll(topicPartitions);
+ }
+
+ public void removeAll(Collection<? extends TopicPartition> topicPartitions) {
+ this.topicPartitions.removeAll(topicPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
new file mode 100644
index 0000000..6e1c587
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Map;
+
+// TODO
+public class KafkaTridentSpoutTransactional implements IPartitionedTridentSpout {
+ @Override
+ public Coordinator getCoordinator(Map conf, TopologyContext context) {
+ return null;
+ }
+
+ @Override
+ public Emitter getEmitter(Map conf, TopologyContext context) {
+ return null;
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return null;
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
new file mode 100644
index 0000000..50e78f0
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+
+public class TopicPartitionSerializer {
+
+ public static final String TOPIC_PARTITION_TOPIC_KEY = "topic";
+ public static final String TOPIC_PARTITION_PARTITION_KEY = "partition";
+
+ /**
+ * Serializes the given TopicPartition to Map so Trident can serialize it to JSON.
+ */
+ public Map<String, Object> toMap(TopicPartition topicPartition) {
+ Map<String, Object> topicPartitionMap = new HashMap<>();
+ topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, topicPartition.topic());
+ topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, topicPartition.partition());
+ return topicPartitionMap;
+ }
+
+ /**
+ * Deserializes the given map into a TopicPartition. The map keys are expected to be those produced by
+ * {@link #toMap(org.apache.kafka.common.TopicPartition)}.
+ */
+ public TopicPartition fromMap(Map<String, Object> map) {
+ return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY),
+ ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..2e2c13b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState<K, V> implements State {
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+ private KafkaProducer<K, V> producer;
+
+ private TridentTupleToKafkaMapper<K, V> mapper;
+ private KafkaTopicSelector topicSelector;
+
+ public TridentKafkaState<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaState<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is Noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is Noop.");
+ }
+
+ /**
+ * Prepare this State.
+ * @param options The KafkaProducer config.
+ */
+ public void prepare(Properties options) {
+ Objects.requireNonNull(mapper, "mapper can not be null");
+ Objects.requireNonNull(topicSelector, "topicSelector can not be null");
+ producer = new KafkaProducer<>(options);
+ }
+
+ /**
+ * Write the given tuples to Kafka.
+ * @param tuples The tuples to write.
+ * @param collector The Trident collector.
+ */
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ String topic = null;
+ try {
+ long startTime = System.currentTimeMillis();
+ int numberOfRecords = tuples.size();
+ List<Future<RecordMetadata>> futures = new ArrayList<>(numberOfRecords);
+ for (TridentTuple tuple : tuples) {
+ topic = topicSelector.getTopic(tuple);
+ V messageFromTuple = mapper.getMessageFromTuple(tuple);
+ K keyFromTuple = mapper.getKeyFromTuple(tuple);
+
+ if (topic != null) {
+ if (messageFromTuple != null) {
+ Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, keyFromTuple, messageFromTuple));
+ futures.add(result);
+ } else {
+ LOG.warn("skipping Message with Key {} as message was null", keyFromTuple);
+ }
+
+ } else {
+ LOG.warn("skipping key = {}, topic selector returned null.", keyFromTuple);
+ }
+ }
+
+ int emittedRecords = futures.size();
+ List<ExecutionException> exceptions = new ArrayList<>(emittedRecords);
+ for (Future<RecordMetadata> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ exceptions.add(e);
+ }
+ }
+
+ if (exceptions.size() > 0) {
+ StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages ");
+ errorMsg.append(tuples).append(" from topic = ").append(topic)
+ .append(" because of the following exceptions:").append(System.lineSeparator());
+
+ for (ExecutionException exception : exceptions) {
+ errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator());
+ }
+ String message = errorMsg.toString();
+ LOG.error(message);
+ throw new FailedException(message);
+ }
+ long latestTime = System.currentTimeMillis();
+ LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", emittedRecords, latestTime - startTime, topic);
+
+ } catch (Exception ex) {
+ String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
+ LOG.warn(errorMsg, ex);
+ throw new FailedException(errorMsg, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..35620de
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory<K, V> implements StateFactory {
+
+ private static final long serialVersionUID = -3613240970062343385L;
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+ private TridentTupleToKafkaMapper<K, V> mapper;
+ private KafkaTopicSelector topicSelector;
+ private Properties producerProperties = new Properties();
+
+ public TridentKafkaStateFactory<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaStateFactory<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ public TridentKafkaStateFactory<K, V> withProducerProperties(Properties props) {
+ this.producerProperties = props;
+ return this;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+ TridentKafkaState<K, V> state = new TridentKafkaState<>();
+ state.withKafkaTopicSelector(this.topicSelector)
+ .withTridentTupleToKafkaMapper(this.mapper);
+ state.prepare(producerProperties);
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
new file mode 100644
index 0000000..19e3d33
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import java.util.List;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class TridentKafkaStateUpdater<K, V> extends BaseStateUpdater<TridentKafkaState<K, V>> {
+
+ private static final long serialVersionUID = 3352659585225274402L;
+
+ @Override
+ public void updateState(TridentKafkaState<K, V> state, List<TridentTuple> tuples, TridentCollector collector) {
+ state.updateState(tuples, collector);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+ public final String keyFieldName;
+ public final String msgFieldName;
+
+ public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+ this.keyFieldName = keyFieldName;
+ this.msgFieldName = msgFieldName;
+ }
+
+ @Override
+ public K getKeyFromTuple(TridentTuple tuple) {
+ return (K) tuple.getValueByField(keyFieldName);
+ }
+
+ @Override
+ public V getMessageFromTuple(TridentTuple tuple) {
+ return (V) tuple.getValueByField(msgFieldName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V> extends Serializable {
+ K getKeyFromTuple(TridentTuple tuple);
+ V getMessageFromTuple(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..607c996
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+ private static final long serialVersionUID = -1172454882072591493L;
+ private final String topicName;
+
+ public DefaultTopicSelector(final String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Override
+ public String getTopic(TridentTuple tuple) {
+ return topicName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+ String getTopic(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
new file mode 100644
index 0000000..93b1040
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+public class KafkaUnit {
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private ZkUtils zkUtils;
+ private KafkaProducer<String, String> producer;
+ private static final String ZK_HOST = "127.0.0.1";
+ private static final String KAFKA_HOST = "127.0.0.1";
+ private static final int KAFKA_PORT = 9092;
+
+ public KafkaUnit() {
+ }
+
+ public void setUp() throws IOException {
+ // setup ZK
+ zkServer = new EmbeddedZookeeper();
+ String zkConnect = ZK_HOST + ":" + zkServer.port();
+ ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ MockTime mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ // setup default Producer
+ createProducer();
+ }
+
+ public void tearDown() {
+ closeProducer();
+ kafkaServer.shutdown();
+ zkUtils.close();
+ zkServer.shutdown();
+ }
+
+ public void createTopic(String topicName) {
+ AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+ }
+
+ public int getKafkaPort() {
+ return KAFKA_PORT;
+ }
+
+ private void createProducer() {
+ Properties producerProps = new Properties();
+ producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producer = new KafkaProducer<>(producerProps);
+ }
+
+ public void createProducer(Serializer keySerializer, Serializer valueSerializer) {
+ Properties producerProps = new Properties();
+ producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer);
+ }
+
+ public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
+ producer.send(producerRecord).get(10, TimeUnit.SECONDS);
+ }
+
+ private void closeProducer() {
+ producer.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
new file mode 100644
index 0000000..6e90c9d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.rules.ExternalResource;
+
+import java.io.IOException;
+
+
+public class KafkaUnitRule extends ExternalResource {
+
+ private final KafkaUnit kafkaUnit;
+
+ public KafkaUnitRule() {
+ this.kafkaUnit = new KafkaUnit();
+ }
+
+ @Override
+ public void before() throws IOException {
+ kafkaUnit.setUp();
+ }
+
+ @Override
+ public void after() {
+ kafkaUnit.tearDown();
+ }
+
+ public KafkaUnit getKafkaUnit() {
+ return this.kafkaUnit;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..8c8a945
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);
+
+ @SuppressWarnings({ "unchecked", "serial" })
+ @Test
+ public void testSimple() {
+ final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
+ when(producer.send((ProducerRecord<String,String>)any(), (Callback)any())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Callback c = (Callback)invocation.getArguments()[1];
+ c.onCompletion(null, null);
+ return null;
+ }
+ });
+ KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+ @Override
+ protected KafkaProducer<String, String> mkProducer(Properties props) {
+ return producer;
+ }
+ };
+ bolt.withTopicSelector("MY_TOPIC");
+
+ OutputCollector collector = mock(OutputCollector.class);
+ TopologyContext context = mock(TopologyContext.class);
+ Map<String, Object> conf = new HashMap<>();
+ bolt.prepare(conf, context, collector);
+ MkTupleParam param = new MkTupleParam();
+ param.setFields("key", "message");
+ Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param);
+ bolt.execute(testTuple);
+ verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() {
+ @Override
+ public boolean matches(Object argument) {
+ LOG.info("GOT {} ->", argument);
+ ProducerRecord<String, String> arg = (ProducerRecord<String, String>) argument;
+ LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value());
+ return "MY_TOPIC".equals(arg.topic()) &&
+ "KEY".equals(arg.key()) &&
+ "VALUE".equals(arg.value());
+ }
+ }), any(Callback.class));
+ verify(collector).ack(testTuple);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
new file mode 100644
index 0000000..abc58f0
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+ public static Func<ConsumerRecord<String, String>, List<Object>> JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.key());
+ }
+ };
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.value());
+ }
+ };
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.key(), record.value());
+ }
+ };
+
+ @Test
+ public void testBasic() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), "value-stream");
+ trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), "key-value-stream");
+ HashSet<String> expectedStreams = new HashSet<>();
+ expectedStreams.add("default");
+ expectedStreams.add("value-stream");
+ expectedStreams.add("key-value-stream");
+ assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+ ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC OTHER", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+ assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+
+ ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("value"), trans.getFieldsFor("value-stream"));
+ assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+
+ ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
+ assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFieldCollision() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testTopicCollision() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1");
+ trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), "foo2");
+ }
+
+}