You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/01 15:56:26 UTC
[3/8] storm git commit: STORM-1997: copy state/bolt from storm-kafka
to storm-kafka-client STORM-2225: change spout config to be simpler.
STORM-2228: removed ability to request a single topic go to multiple streams
STORM-2236: Reimplemented manual partit
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
new file mode 100644
index 0000000..6c5dcfb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+ private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
+ private static final long serialVersionUID = 3438543305215813839L;
+ protected final Collection<String> topics;
+
+ public NamedSubscription(Collection<String> topics) {
+ this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics));
+ }
+
+ public NamedSubscription(String ... topics) {
+ this(Arrays.asList(topics));
+ }
+
+ @Override
+ public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
+ consumer.subscribe(topics, listener);
+ LOG.info("Kafka consumer subscribed topics {}", topics);
+
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+ consumer.poll(0);
+ }
+
+ @Override
+ public String getTopicsString() {
+ return String.valueOf(topics);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
new file mode 100644
index 0000000..9a8de0f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that match a given pattern
+ */
+public class PatternSubscription extends Subscription {
+ private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
+ private static final long serialVersionUID = 3438543305215813839L;
+ protected final Pattern pattern;
+
+ public PatternSubscription(Pattern pattern) {
+ this.pattern = pattern;
+ }
+
+ @Override
+ public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
+ consumer.subscribe(pattern, listener);
+ LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
+
+ // Initial poll to get the consumer registration process going.
+ // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+ consumer.poll(0);
+ }
+
+ @Override
+ public String getTopicsString() {
+ return pattern.pattern();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
new file mode 100644
index 0000000..2e72c99
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple.
+ */
+public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecord<K, V>, List<Object>> {
+ public static final List<String> DEFAULT_STREAM = Collections.singletonList("default");
+
+ /**
+ * Translate the ConsumerRecord into a list of objects that can be emitted
+ * @param record the record to translate
+ * @return the objects in the tuple. Return a {@link KafkaTuple}
+ * if you want to route the tuple to a non-default stream
+ */
+ List<Object> apply(ConsumerRecord<K,V> record);
+
+ /**
+ * Get the fields associated with a stream. The streams passed in are
+ * returned by the {@link RecordTranslator.streams} method.
+ * @param stream the stream the fields are for
+ * @return the fields for that stream.
+ */
+ Fields getFieldsFor(String stream);
+
+ /**
+ * @return the list of streams that this will handle.
+ */
+ List<String> streams();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..e23e2dc
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ *
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+ @Override
+ public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+ int thisTaskIndex = context.getThisTaskIndex();
+ int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
+ Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1);
+ for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
+ myPartitions.add(allPartitions.get(i));
+ }
+ return new ArrayList<>(myPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
new file mode 100644
index 0000000..46c2849
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
+ private static final long serialVersionUID = 4678369144122009596L;
+ private final Fields fields;
+ private final Func<ConsumerRecord<K, V>, List<Object>> func;
+ private final String stream;
+
+ public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+ this(func, fields, "default");
+ }
+
+ public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+ this.func = func;
+ this.fields = fields;
+ this.stream = stream;
+ }
+
+ @Override
+ public List<Object> apply(ConsumerRecord<K, V> record) {
+ KafkaTuple ret = new KafkaTuple();
+ ret.addAll(func.apply(record));
+ return ret.routedTo(stream);
+ }
+
+ @Override
+ public Fields getFieldsFor(String stream) {
+ return fields;
+ }
+
+ @Override
+ public List<String> streams() {
+ return Arrays.asList(stream);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
new file mode 100644
index 0000000..db2a7bb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+ private static final long serialVersionUID = -216136367240198716L;
+
+ /**
+ * Subscribe the KafkaConsumer to the proper topics
+ * @param consumer the Consumer to get.
+ * @param listener the rebalance listener to include in the subscription
+ */
+ public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
+
+ /**
+ * @return a string representing the subscribed topics.
+ */
+ public abstract String getTopicsString();
+
+ /**
+ * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
+ * If you wish to do manual partition management, you must provide an implementation of this method
+ * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
+ * to inform the rest of the system of those changes.
+ */
+ public void refreshAssignment() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
new file mode 100644
index 0000000..dafb97c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Comparator;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Singleton comparator of TopicPartitions. Topics have precedence over partitions.
+ * Topics are compared through String.compare and partitions are compared
+ * numerically.
+ *
+ * Use INSTANCE for all sorting.
+ */
+public class TopicPartitionComparator implements Comparator<TopicPartition> {
+ public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator();
+
+ /**
+ * Private to make it a singleton
+ */
+ private TopicPartitionComparator() {
+ //Empty
+ }
+
+ @Override
+ public int compare(TopicPartition o1, TopicPartition o2) {
+ if (!o1.topic().equals(o2.topic())) {
+ return o1.topic().compareTo(o2.topic());
+ } else {
+ return o1.partition() - o2.partition();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
new file mode 100644
index 0000000..d51104d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.concurrent.TimeUnit;
+
+public class Timer {
+ private final long delay;
+ private final long period;
+ private final TimeUnit timeUnit;
+ private final long periodNanos;
+ private long start;
+
+ /**
+ * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
+ * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
+ * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
+ *
+ * @param delay the initial delay before the timer starts
+ * @param period the period between calls {@link #isExpiredResetOnTrue()}
+ * @param timeUnit the time unit of delay and period
+ */
+ public Timer(long delay, long period, TimeUnit timeUnit) {
+ this.delay = delay;
+ this.period = period;
+ this.timeUnit = timeUnit;
+
+ periodNanos = timeUnit.toNanos(period);
+ start = System.nanoTime() + timeUnit.toNanos(delay);
+ }
+
+ public long period() {
+ return period;
+ }
+
+ public long delay() {
+ return delay;
+ }
+
+ public TimeUnit getTimeUnit() {
+ return timeUnit;
+ }
+
+ /**
+ * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
+ * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
+ * (re-initiated) and a new cycle will start.
+ *
+ * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
+ * otherwise.
+ */
+ public boolean isExpiredResetOnTrue() {
+ final boolean expired = System.nanoTime() - start > periodNanos;
+ if (expired) {
+ start = System.nanoTime();
+ }
+ return expired;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 0218f07..19b4f01 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,33 +18,38 @@
package org.apache.storm.kafka.spout.trident;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
-
public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentSpout.Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>>, Serializable {
+ private static final long serialVersionUID = -7343927794834130435L;
+
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
// Kafka
@@ -53,18 +58,20 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
// Bookkeeping
private final KafkaTridentSpoutManager<K, V> kafkaManager;
// Declare some KafkaTridentSpoutManager references for convenience
- private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
private final long pollTimeoutMs;
private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+ private final RecordTranslator<K, V> translator;
+ private final Timer refreshSubscriptionTimer;
- public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager) {
+ public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext context) {
this.kafkaManager = kafkaManager;
- this.kafkaManager.subscribeKafkaConsumer();
+ this.kafkaManager.subscribeKafkaConsumer(context);
+ refreshSubscriptionTimer = new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
//must subscribeKafkaConsumer before this line
kafkaConsumer = kafkaManager.getKafkaConsumer();
+ translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
- tuplesBuilder = kafkaManager.getTuplesBuilder();
final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
@@ -88,6 +95,9 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
seek(topicPartition, lastBatch);
// poll
+ if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+ kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+ }
final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
LOG.debug("Polled [{}] records from Kafka.", records.count());
@@ -106,7 +116,7 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
for (ConsumerRecord<K, V> record : records) {
- final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ final List<Object> tuple = translator.apply(record);
collector.emit(tuple);
LOG.debug("Emitted tuple [{}] for record: [{}]", tuple, record);
}
@@ -131,18 +141,18 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
if (committedOffset != null) { // offset was committed for this TopicPartition
if (firstPollOffsetStrategy.equals(EARLIEST)) {
- kafkaConsumer.seekToBeginning(toArrayList(tp));
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy.equals(LATEST)) {
- kafkaConsumer.seekToEnd(toArrayList(tp));
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
} else {
// By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
kafkaConsumer.seek(tp, committedOffset.offset() + 1);
}
} else { // no commits have ever been done, so start at the beginning or end depending on the strategy
if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
- kafkaConsumer.seekToBeginning(toArrayList(tp));
+ kafkaConsumer.seekToBeginning(Collections.singleton(tp));
} else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
- kafkaConsumer.seekToEnd(toArrayList(tp));
+ kafkaConsumer.seekToEnd(Collections.singleton(tp));
}
}
}
@@ -151,10 +161,6 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
return fetchOffset;
}
- private Collection<TopicPartition> toArrayList(final TopicPartition tp) {
- return new ArrayList<TopicPartition>(1){{add(tp);}};
- }
-
// returns paused topic partitions
private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index d2c1700..4b60f33 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -18,23 +18,22 @@
package org.apache.storm.kafka.spout.trident;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
public class KafkaTridentSpoutManager<K, V> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
@@ -44,29 +43,30 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
// Bookkeeping
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
// Declare some KafkaSpoutConfig references for convenience
- private KafkaSpoutStreams kafkaSpoutStreams; // Object that wraps all the logic to declare output fields and emit tuples
- private KafkaSpoutTuplesBuilder<K, V> tuplesBuilder; // Object that contains the logic to build tuples for each ConsumerRecord
+ private final Fields fields;
public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this.kafkaSpoutConfig = kafkaSpoutConfig;
- kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
- tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+ RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+ Fields fields = null;
+ for (String stream: translator.streams()) {
+ if (fields == null) {
+ fields = translator.getFieldsFor(stream);
+ } else {
+ if (!fields.equals(translator.getFieldsFor(stream))) {
+ throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
+ }
+ }
+ }
+ this.fields = fields;
LOG.debug("Created {}", this);
}
- void subscribeKafkaConsumer() {
+ void subscribeKafkaConsumer(TopologyContext context) {
kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
- if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
- final List<String> subTopics = kafkaSpoutConfig.getSubscribedTopics();
- kafkaConsumer.subscribe(subTopics, new KafkaSpoutConsumerRebalanceListener());
- LOG.info("Kafka consumer subscribed topics {}", subTopics);
- } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
- final Pattern pattern = kafkaSpoutConfig.getTopicWildcardPattern();
- kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
- LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
- }
+ kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
// Initial poll to get the consumer registration process going.
// KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
@@ -93,16 +93,12 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
return kafkaConsumer;
}
- KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
- return tuplesBuilder;
- }
-
Set<TopicPartition> getTopicPartitions() {
return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
}
-
- KafkaSpoutStreams getKafkaSpoutStreams() {
- return kafkaSpoutStreams;
+
+ Fields getFields() {
+ return fields;
}
KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index b77ad11..5c5856c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -19,6 +19,7 @@
package org.apache.storm.kafka.spout.trident;
import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.tuple.Fields;
@@ -29,12 +30,19 @@ import java.util.List;
import java.util.Map;
public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> {
+ private static final long serialVersionUID = -8003272486566259640L;
+
private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
private KafkaTridentSpoutManager<K, V> kafkaManager;
private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
+
+ public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
+ this(new KafkaTridentSpoutManager<>(conf));
+ }
+
public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
this.kafkaManager = kafkaManager;
LOG.debug("Created {}", this);
@@ -44,7 +52,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(Map conf, TopologyContext context) {
// Instance is created on first call rather than in constructor to avoid NotSerializableException caused by KafkaConsumer
if (kafkaTridentSpoutEmitter == null) {
- kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager);
+ kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager, context);
}
return kafkaTridentSpoutEmitter;
}
@@ -64,7 +72,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
@Override
public Fields getOutputFields() {
- final Fields outputFields = kafkaManager.getKafkaSpoutStreams().getOutputFields();
+ final Fields outputFields = kafkaManager.getFields();
LOG.debug("OutputFields = {}", outputFields);
return outputFields;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..f24fed5
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+ private KafkaProducer producer;
+ private OutputCollector collector;
+
+ private TridentTupleToKafkaMapper mapper;
+ private KafkaTopicSelector topicSelector;
+
+ public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ @Override
+ public void beginCommit(Long txid) {
+ LOG.debug("beginCommit is Noop.");
+ }
+
+ @Override
+ public void commit(Long txid) {
+ LOG.debug("commit is Noop.");
+ }
+
+ public void prepare(Properties options) {
+ Objects.requireNonNull(mapper, "mapper can not be null");
+ Objects.requireNonNull(topicSelector, "topicSelector can not be null");
+ producer = new KafkaProducer(options);
+ }
+
+ public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+ String topic = null;
+ try {
+ List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
+ for (TridentTuple tuple : tuples) {
+ topic = topicSelector.getTopic(tuple);
+
+ if(topic != null) {
+ Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+ mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+ futures.add(result);
+ } else {
+ LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+ }
+ }
+
+ List<ExecutionException> exceptions = new ArrayList<>(futures.size());
+ for (Future<RecordMetadata> future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ exceptions.add(e);
+ }
+ }
+
+ if(exceptions.size() > 0){
+ String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic
+ + " because of the following exceptions: \n";
+ for (ExecutionException exception : exceptions) {
+ errorMsg = errorMsg + exception.getMessage() + "\n";
+ }
+ LOG.error(errorMsg);
+ throw new FailedException(errorMsg);
+ }
+ } catch (Exception ex) {
+ String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
+ LOG.warn(errorMsg, ex);
+ throw new FailedException(errorMsg, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..f564510
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory implements StateFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+ private TridentTupleToKafkaMapper mapper;
+ private KafkaTopicSelector topicSelector;
+ private Properties producerProperties = new Properties();
+
+ public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+ this.mapper = mapper;
+ return this;
+ }
+
+ public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+ this.topicSelector = selector;
+ return this;
+ }
+
+ public TridentKafkaStateFactory withProducerProperties(Properties props) {
+ this.producerProperties = props;
+ return this;
+ }
+
+ @Override
+ public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+ LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+ TridentKafkaState state = new TridentKafkaState()
+ .withKafkaTopicSelector(this.topicSelector)
+ .withTridentTupleToKafkaMapper(this.mapper);
+ state.prepare(producerProperties);
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+ public final String keyFieldName;
+ public final String msgFieldName;
+
+ public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+ this.keyFieldName = keyFieldName;
+ this.msgFieldName = msgFieldName;
+ }
+
+ @Override
+ public K getKeyFromTuple(TridentTuple tuple) {
+ return (K) tuple.getValueByField(keyFieldName);
+ }
+
+ @Override
+ public V getMessageFromTuple(TridentTuple tuple) {
+ return (V) tuple.getValueByField(msgFieldName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V> extends Serializable {
+ K getKeyFromTuple(TridentTuple tuple);
+ V getMessageFromTuple(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..607c996
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+ private static final long serialVersionUID = -1172454882072591493L;
+ private final String topicName;
+
+ public DefaultTopicSelector(final String topicName) {
+ this.topicName = topicName;
+ }
+
+ @Override
+ public String getTopic(TridentTuple tuple) {
+ return topicName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+ String getTopic(TridentTuple tuple);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..8c8a945
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);
+
+ @SuppressWarnings({ "unchecked", "serial" })
+ @Test
+ public void testSimple() {
+ final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
+ when(producer.send((ProducerRecord<String,String>)any(), (Callback)any())).thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Callback c = (Callback)invocation.getArguments()[1];
+ c.onCompletion(null, null);
+ return null;
+ }
+ });
+ KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+ @Override
+ protected KafkaProducer<String, String> mkProducer(Properties props) {
+ return producer;
+ }
+ };
+ bolt.withTopicSelector("MY_TOPIC");
+
+ OutputCollector collector = mock(OutputCollector.class);
+ TopologyContext context = mock(TopologyContext.class);
+ Map<String, Object> conf = new HashMap<>();
+ bolt.prepare(conf, context, collector);
+ MkTupleParam param = new MkTupleParam();
+ param.setFields("key", "message");
+ Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param);
+ bolt.execute(testTuple);
+ verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() {
+ @Override
+ public boolean matches(Object argument) {
+ LOG.info("GOT {} ->", argument);
+ ProducerRecord<String, String> arg = (ProducerRecord<String, String>) argument;
+ LOG.info(" {} {} {}", arg.topic(), arg.key(), arg.value());
+ return "MY_TOPIC".equals(arg.topic()) &&
+ "KEY".equals(arg.key()) &&
+ "VALUE".equals(arg.value());
+ }
+ }), any(Callback.class));
+ verify(collector).ack(testTuple);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
new file mode 100644
index 0000000..ea0b6e7
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+ public static Func<ConsumerRecord<String, String>, List<Object>> JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.key());
+ }
+ };
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.value());
+ }
+ };
+
+ public static Func<ConsumerRecord<String, String>, List<Object>> KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> record) {
+ return new Values(record.key(), record.value());
+ }
+ };
+
+ @Test
+ public void testBasic() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), "value-stream");
+ trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), "key-value-stream");
+ HashSet<String> expectedStreams = new HashSet<>();
+ expectedStreams.add("default");
+ expectedStreams.add("value-stream");
+ expectedStreams.add("key-value-stream");
+ assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+ ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC OTHER", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+ assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+
+ ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("value"), trans.getFieldsFor("value-stream"));
+ assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+
+ ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
+ assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testFieldCollision() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testTopicCollision() {
+ ByTopicRecordTranslator<String, String> trans =
+ new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+ trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1");
+ trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), "foo2");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
new file mode 100644
index 0000000..f4275e4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.junit.Test;
+
+public class DefaultRecordTranslatorTest {
+ @Test
+ public void testBasic() {
+ DefaultRecordTranslator<String, String> trans = new DefaultRecordTranslator<>();
+ assertEquals(Arrays.asList("default"), trans.streams());
+ assertEquals(new Fields("topic", "partition", "offset", "key", "value"), trans.getFieldsFor("default"));
+ ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+ assertEquals(Arrays.asList("TOPIC", 100, 100l, "THE KEY", "THE VALUE"), trans.apply(cr));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
new file mode 100644
index 0000000..08220dd
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.junit.Test;
+
+public class KafkaSpoutConfigTest {
+
+ @Test
+ public void testBasic() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
+ assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
+ assertNull(conf.getConsumerGroupId());
+ assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
+ HashMap<String, Object> expected = new HashMap<>();
+ expected.put("bootstrap.servers", "localhost:1234");
+ expected.put("enable.auto.commit", "false");
+ assertEquals(expected, conf.getKafkaProps());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 2d3eb56..9969d84 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,11 +15,25 @@
*/
package org.apache.storm.kafka.spout;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -27,37 +41,16 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
-
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
import org.mockito.Captor;
-
-import static org.mockito.Mockito.reset;
-
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
public class KafkaSpoutRebalanceTest {
@Captor
@@ -65,7 +58,7 @@ public class KafkaSpoutRebalanceTest {
private TopologyContext contextMock;
private SpoutOutputCollector collectorMock;
- private Map conf;
+ private Map<String, Object> conf;
private KafkaConsumer<String, String> consumerMock;
private KafkaConsumerFactory<String, String> consumerFactoryMock;
@@ -113,11 +106,11 @@ public class KafkaSpoutRebalanceTest {
//Emit the messages
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
+ verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForRevokedPartition.capture());
reset(collectorMock);
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
+ verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForAssignedPartition.capture());
//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
@@ -132,7 +125,7 @@ public class KafkaSpoutRebalanceTest {
@Test
public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -161,7 +154,7 @@ public class KafkaSpoutRebalanceTest {
public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
//Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
deleted file mode 100644
index 723f5fd..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2016 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.tuple.Fields;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class KafkaSpoutStreamsNamedTopicsTest {
-
- @Test
- public void testGetOutputFields() {
- Fields outputFields = new Fields("b","a");
- String[] topics = new String[]{"testTopic"};
- String streamId = "test";
- KafkaSpoutStreamsNamedTopics build = new KafkaSpoutStreamsNamedTopics.Builder(outputFields, streamId, topics)
- .addStream(outputFields, streamId, topics)
- .build();
- Fields actualFields = build.getOutputFields();
- Assert.assertEquals(outputFields.get(0), actualFields.get(0));
- Assert.assertEquals(outputFields.get(1), actualFields.get(1));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 6983160..f457b59 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -70,7 +70,7 @@ public class SingleTopicKafkaSpoutTest {
SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
Map conf = mock(Map.class);
- KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort));
spout.open(conf, topology, collector);
spout.activate();
return new SpoutContext(spout, collector);
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 95b2199..99bd3de 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,17 +17,21 @@
*/
package org.apache.storm.kafka.spout.builders;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.Config;
import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
public class SingleTopicKafkaSpoutConfiguration {
@@ -42,53 +46,41 @@ public class SingleTopicKafkaSpoutConfiguration {
public static StormTopology getTopologyKafkaSpout(int port) {
final TopologyBuilder tp = new TopologyBuilder();
- tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
return tp.createTopology();
}
- public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port) {
+ return getKafkaSpoutConfig(port, 10_000);
}
- public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
- return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(port, offsetCommitPeriodMs, getRetryService());
}
- public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
- return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
- .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .setPollTimeoutMs(1000)
- .build();
+ private static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+ @Override
+ public List<Object> apply(ConsumerRecord<String, String> r) {
+ return new Values(r.topic(), r.key(), r.value());
+ }
+ };
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
+ .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+ new Fields("topic", "key", "value"), STREAM)
+ .setGroupId("kafkaSpoutTestGroup")
+ .setMaxPollRecords(5)
+ .setRetry(retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000)
+ .build();
}
-
+
protected static KafkaSpoutRetryService getRetryService() {
- return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
- KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
- }
-
- protected static Map<String, Object> getKafkaConsumerProps(int port) {
- Map<String, Object> props = new HashMap<>();
- props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
- props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
- props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("max.poll.records", "5");
- return props;
- }
-
- protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
- return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
- new TopicKeyValueTupleBuilder<String, String>(TOPIC))
- .build();
- }
-
- public static KafkaSpoutStreams getKafkaSpoutStreams() {
- final Fields outputFields = new Fields("topic", "key", "value");
- return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
- .build();
+ return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
deleted file mode 100644
index 4f20b58..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout.builders;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
- /**
- * @param topics list of topics that use this implementation to build tuples
- */
- public TopicKeyValueTupleBuilder(String... topics) {
- super(topics);
- }
-
- @Override
- public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
- return new Values(consumerRecord.topic(),
- consumerRecord.key(),
- consumerRecord.value());
- }
-}