You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/19 03:35:00 UTC
[4/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
deleted file mode 100644
index 38958b2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.apache.storm.kafka.trident.IBrokerReader;
-import org.apache.storm.kafka.trident.StaticBrokerReader;
-import org.apache.storm.kafka.trident.ZkBrokerReader;
-import org.apache.storm.metric.api.IMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class KafkaUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
- private static final int NO_OFFSET = -5;
-
- //suppress default constructor for noninstantiablility
- private KafkaUtils() {
- throw new AssertionError();
- }
-
- public static IBrokerReader makeBrokerReader(Map<String, Object> topoConf, KafkaConfig conf) {
- if (conf.hosts instanceof StaticHosts) {
- return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
- } else {
- return new ZkBrokerReader(topoConf, conf.topic, (ZkHosts) conf.hosts);
- }
- }
-
-
- public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
- long startOffsetTime = config.startOffsetTime;
- return getOffset(consumer, topic, partition, startOffsetTime);
- }
-
- public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
- OffsetRequest request = new OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-
- long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
- if (offsets.length > 0) {
- return offsets[0];
- } else {
- return NO_OFFSET;
- }
- }
-
- public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
- throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException {
- ByteBufferMessageSet msgs = null;
- String topic = partition.topic;
- int partitionId = partition.partition;
- FetchRequestBuilder builder = new FetchRequestBuilder();
- FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
- clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
- FetchResponse fetchResponse;
- try {
- fetchResponse = consumer.fetch(fetchRequest);
- } catch (Exception e) {
- if (e instanceof ConnectException ||
- e instanceof SocketTimeoutException ||
- e instanceof IOException ||
- e instanceof UnresolvedAddressException
- ) {
- LOG.warn("Network error when fetching messages:", e);
- throw new FailedFetchException(e);
- } else {
- throw new RuntimeException(e);
- }
- }
- if (fetchResponse.hasError()) {
- KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
- if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
- String msg = partition + " Got fetch request with offset out of range: [" + offset + "]";
- LOG.warn(msg);
- throw new TopicOffsetOutOfRangeException(msg);
- } else {
- String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
- LOG.error(message);
- throw new FailedFetchException(message);
- }
- } else {
- msgs = fetchResponse.messageSet(topic, partitionId);
- }
- LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", config, consumer,
- partition, offset, msgs);
- return msgs;
- }
-
- public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
- Iterable<List<Object>> tups;
- ByteBuffer payload = msg.payload();
- if (payload == null) {
- return null;
- }
- ByteBuffer key = msg.key();
- if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
- tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
- } else {
- if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
- tups = ((StringMultiSchemeWithTopic) kafkaConfig.scheme).deserializeWithTopic(topic, payload);
- } else {
- tups = kafkaConfig.scheme.deserialize(payload);
- }
- }
- return tups;
- }
-
- public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition,
- long offset) {
- ByteBuffer payload = msg.payload();
- if (payload == null) {
- return null;
- }
- return scheme.deserializeMessageWithMetadata(payload, partition, offset);
- }
-
- public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
- int totalTasks, int taskIndex, int taskId) {
- Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
- List<Partition> taskPartitions = new ArrayList<Partition>();
- List<Partition> partitions = new ArrayList<Partition>();
- for (GlobalPartitionInformation partitionInformation : partitons) {
- partitions.addAll(partitionInformation.getOrderedPartitions());
- }
- int numPartitions = partitions.size();
- if (numPartitions < totalTasks) {
- LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions +
- "), some tasks will be idle");
- }
- for (int i = taskIndex; i < numPartitions; i += totalTasks) {
- Partition taskPartition = partitions.get(i);
- taskPartitions.add(taskPartition);
- }
- logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
- return taskPartitions;
- }
-
- private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
- String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
- if (taskPartitions.isEmpty()) {
- LOG.warn(taskPrefix + " no partitions assigned");
- } else {
- LOG.info(taskPrefix + " assigned " + taskPartitions);
- }
- }
-
- public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
- return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
- }
-
- public static class KafkaOffsetMetric implements IMetric {
- Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap<Partition, PartitionManager.OffsetData>();
- Set<Partition> _partitions;
- DynamicPartitionConnections _connections;
-
- public KafkaOffsetMetric(DynamicPartitionConnections connections) {
- _connections = connections;
- }
-
- public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) {
- _partitionToOffset.put(partition, offsetData);
- }
-
- @Override
- public Object getValueAndReset() {
- try {
- HashMap<String, Long> ret = new HashMap<>();
- if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
- Map<String, TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
- for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) {
- Partition partition = e.getKey();
- SimpleConsumer consumer = _connections.getConnection(partition);
- if (consumer == null) {
- LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
- return null;
- }
- long latestTimeOffset =
- getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
- long earliestTimeOffset =
- getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
- if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
- LOG.warn("No data found in Kafka Partition " + partition.getId());
- return null;
- }
- long latestEmittedOffset = e.getValue().latestEmittedOffset;
- long latestCompletedOffset = e.getValue().latestCompletedOffset;
- long spoutLag = latestTimeOffset - latestCompletedOffset;
- String topic = partition.topic;
- String metricPath = partition.getId();
- //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition
- if (!metricPath.startsWith(topic + "/")) {
- metricPath = topic + "/" + metricPath;
- }
- ret.put(metricPath + "/" + "spoutLag", spoutLag);
- ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
- ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
- ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
- ret.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
-
- if (!topicMetricsMap.containsKey(partition.topic)) {
- topicMetricsMap.put(partition.topic, new TopicMetrics());
- }
-
- TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic);
- topicMetrics.totalSpoutLag += spoutLag;
- topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
- topicMetrics.totalLatestTimeOffset += latestTimeOffset;
- topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
- topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
- }
-
- for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
- String topic = e.getKey();
- TopicMetrics topicMetrics = e.getValue();
- ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
- ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
- ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
- ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
- ret.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
- }
-
- return ret;
- } else {
- LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
- }
- } catch (Throwable t) {
- LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
- }
- return null;
- }
-
- public void refreshPartitions(Set<Partition> partitions) {
- _partitions = partitions;
- Iterator<Partition> it = _partitionToOffset.keySet().iterator();
- while (it.hasNext()) {
- if (!partitions.contains(it.next())) {
- it.remove();
- }
- }
- }
-
- private class TopicMetrics {
- long totalSpoutLag = 0;
- long totalEarliestTimeOffset = 0;
- long totalLatestTimeOffset = 0;
- long totalLatestEmittedOffset = 0;
- long totalLatestCompletedOffset = 0;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
deleted file mode 100644
index 6bb1dc5..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-
-public interface KeyValueScheme extends Scheme {
- List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
deleted file mode 100644
index 00983cc..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
-
- public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
- super(scheme);
- }
-
- public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
- List<Object> o = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, value);
- if (o == null) {
- return null;
- } else {
- return Arrays.asList(o);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
deleted file mode 100644
index f77f419..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-
-public interface MessageMetadataScheme extends Scheme {
- List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
deleted file mode 100644
index f52a772..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
- private static final long serialVersionUID = -7172403703813625116L;
-
- public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) {
- super(scheme);
- }
-
- public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
- List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
- if (o == null) {
- return null;
- } else {
- return Arrays.asList(o);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
deleted file mode 100644
index 9edf28b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Objects;
-import java.io.Serializable;
-import org.apache.storm.trident.spout.ISpoutPartition;
-
-
-public class Partition implements ISpoutPartition, Serializable {
-
- public Broker host;
- public int partition;
- public String topic;
-
- //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
- private Boolean bUseTopicNameForPartitionPathId;
-
- // for kryo compatibility
- private Partition() {
-
- }
-
- public Partition(Broker host, String topic, int partition) {
- this.topic = topic;
- this.host = host;
- this.partition = partition;
- this.bUseTopicNameForPartitionPathId = false;
- }
-
- public Partition(Broker host, String topic, int partition, Boolean bUseTopicNameForPartitionPathId) {
- this.topic = topic;
- this.host = host;
- this.partition = partition;
- this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(host, topic, partition);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final Partition other = (Partition) obj;
- return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) &&
- Objects.equal(this.partition, other.partition);
- }
-
- @Override
- public String toString() {
- return "Partition{" +
- "host=" + host +
- ", topic=" + topic +
- ", partition=" + partition +
- '}';
- }
-
- @Override
- public String getId() {
- if (bUseTopicNameForPartitionPathId) {
- return topic + "/partition_" + partition;
- } else {
- //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
- return "partition_" + partition;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
deleted file mode 100644
index 4dba709..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.List;
-
-public interface PartitionCoordinator {
- List<PartitionManager> getMyManagedPartitions();
-
- PartitionManager getManager(Partition partition);
-
- void refresh();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
deleted file mode 100644
index 5805c21..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.KafkaSpout.EmitState;
-import org.apache.storm.kafka.trident.MaxMetric;
-import org.apache.storm.metric.api.CombinedMetric;
-import org.apache.storm.metric.api.CountMetric;
-import org.apache.storm.metric.api.MeanReducer;
-import org.apache.storm.metric.api.ReducedMetric;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PartitionManager {
- private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
-
- private final CombinedMetric _fetchAPILatencyMax;
- private final ReducedMetric _fetchAPILatencyMean;
- private final CountMetric _fetchAPICallCount;
- private final CountMetric _fetchAPIMessageCount;
- // Count of messages which could not be emitted or retried because they were deleted from kafka
- private final CountMetric _lostMessageCount;
- // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
- // retry
- private final CountMetric _messageIneligibleForRetryCount;
- private final FailedMsgRetryManager _failedMsgRetryManager;
- Long _emittedToOffset;
- // retryRecords key = Kafka offset, value = retry info for the given message
- Long _committedTo;
- LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
- Partition _partition;
- SpoutConfig _spoutConfig;
- String _topologyInstanceId;
- SimpleConsumer _consumer;
- DynamicPartitionConnections _connections;
- ZkState _state;
- Map _topoConf;
- long numberFailed, numberAcked;
- // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
- private SortedMap<Long, Long> _pending = new TreeMap<Long, Long>();
-
- public PartitionManager(
- DynamicPartitionConnections connections,
- String topologyInstanceId,
- ZkState state,
- Map<String, Object> topoConf,
- SpoutConfig spoutConfig,
- Partition id) {
- this(connections, topologyInstanceId, state, topoConf, spoutConfig, id, null);
- }
-
- /**
- * @param previousManager previous partition manager if manager for partition is being recreated
- */
- public PartitionManager(
- DynamicPartitionConnections connections,
- String topologyInstanceId,
- ZkState state,
- Map<String, Object> topoConf,
- SpoutConfig spoutConfig,
- Partition id,
- PartitionManager previousManager) {
- _partition = id;
- _connections = connections;
- _spoutConfig = spoutConfig;
- _topologyInstanceId = topologyInstanceId;
- _consumer = connections.register(id.host, id.topic, id.partition);
- _state = state;
- _topoConf = topoConf;
- numberAcked = numberFailed = 0;
-
- if (previousManager != null) {
- _failedMsgRetryManager = previousManager._failedMsgRetryManager;
- _committedTo = previousManager._committedTo;
- _emittedToOffset = previousManager._emittedToOffset;
- _waitingToEmit = previousManager._waitingToEmit;
- _pending = previousManager._pending;
- LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
- _waitingToEmit.size(),
- _pending.size());
- } else {
- try {
- _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
- _failedMsgRetryManager.prepare(spoutConfig, _topoConf);
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
- FailedMsgRetryManager.class,
- spoutConfig.failedMsgRetryManagerClass), e);
- }
-
- String jsonTopologyId = null;
- Long jsonOffset = null;
- String path = committedPath();
- try {
- Map<Object, Object> json = _state.readJSON(path);
- LOG.info("Read partition information from: " + path + " --> " + json);
- if (json != null) {
- jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
- jsonOffset = (Long) json.get("offset");
- }
- } catch (Throwable e) {
- LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
- }
-
- String topic = _partition.topic;
- Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
-
- if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
- _committedTo = currentOffset;
- LOG.info("No partition information found, using configuration to determine offset");
- } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
- _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
- LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
- } else {
- _committedTo = jsonOffset;
- LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId +
- " - new topology_id: " + topologyInstanceId);
- }
-
- if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
- LOG.info("Last commit offset from zookeeper: " + _committedTo);
- Long lastCommittedOffset = _committedTo;
- _committedTo = currentOffset;
- LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
- spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" +
- spoutConfig.startOffsetTime);
- }
-
- LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
- _emittedToOffset = _committedTo;
- }
-
- _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
- _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
- _fetchAPICallCount = new CountMetric();
- _fetchAPIMessageCount = new CountMetric();
- _lostMessageCount = new CountMetric();
- _messageIneligibleForRetryCount = new CountMetric();
- }
-
- public Map<String, Object> getMetricsDataMap() {
- String metricPrefix = _partition.getId();
-
- Map<String, Object> ret = new HashMap<>();
- ret.put(metricPrefix + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
- ret.put(metricPrefix + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
- ret.put(metricPrefix + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
- ret.put(metricPrefix + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
- ret.put(metricPrefix + "/lostMessageCount", _lostMessageCount.getValueAndReset());
- ret.put(metricPrefix + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
- return ret;
- }
-
- //returns false if it's reached the end of current batch
- public EmitState next(SpoutOutputCollector collector) {
- if (_waitingToEmit.isEmpty()) {
- fill();
- }
- while (true) {
- MessageAndOffset toEmit = _waitingToEmit.pollFirst();
- if (toEmit == null) {
- return EmitState.NO_EMITTED;
- }
-
- Iterable<List<Object>> tups;
- if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
- tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition,
- toEmit.offset());
- } else {
- tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
- }
-
- if ((tups != null) && tups.iterator().hasNext()) {
- if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
- for (List<Object> tup : tups) {
- collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset()));
- }
- } else {
- for (List<Object> tup : tups) {
- collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset()));
- }
- }
- break;
- } else {
- ack(toEmit.offset());
- }
- }
- if (!_waitingToEmit.isEmpty()) {
- return EmitState.EMITTED_MORE_LEFT;
- } else {
- return EmitState.EMITTED_END;
- }
- }
-
-
- private void fill() {
- long start = System.currentTimeMillis();
- Long offset;
-
- // Are there failed tuples? If so, fetch those first.
- offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
- final boolean processingNewTuples = (offset == null);
- if (processingNewTuples) {
- offset = _emittedToOffset;
- }
-
- ByteBufferMessageSet msgs = null;
- try {
- msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
- } catch (TopicOffsetOutOfRangeException e) {
- offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
- // fetch failed, so don't update the fetch metrics
-
- //fix bug [STORM-643] : remove outdated failed offsets
- if (!processingNewTuples) {
- // For the case of EarliestTime it would be better to discard
- // all the failed offsets, that are earlier than actual EarliestTime
- // offset, since they are anyway not there.
- // These calls to broker API will be then saved.
- Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
-
- // Omitted messages have not been acked and may be lost
- if (null != omitted) {
- _lostMessageCount.incrBy(omitted.size());
- }
-
- _pending.headMap(offset).clear();
-
- LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
- }
-
- if (offset > _emittedToOffset) {
- _lostMessageCount.incrBy(offset - _emittedToOffset);
- _emittedToOffset = offset;
- LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
- }
-
- return;
- }
- long millis = System.currentTimeMillis() - start;
- _fetchAPILatencyMax.update(millis);
- _fetchAPILatencyMean.update(millis);
- _fetchAPICallCount.incr();
- if (msgs != null) {
- int numMessages = 0;
-
- for (MessageAndOffset msg : msgs) {
- final Long cur_offset = msg.offset();
- if (cur_offset < offset) {
- // Skip any old offsets.
- continue;
- }
- if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
- numMessages += 1;
- if (!_pending.containsKey(cur_offset)) {
- _pending.put(cur_offset, System.currentTimeMillis());
- }
- _waitingToEmit.add(msg);
- _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
- if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
- this._failedMsgRetryManager.retryStarted(cur_offset);
- }
- }
- }
- _fetchAPIMessageCount.incrBy(numMessages);
- }
- }
-
- public void ack(Long offset) {
- if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) {
- // Too many things pending!
- _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
- }
- _pending.remove(offset);
- this._failedMsgRetryManager.acked(offset);
- numberAcked++;
- }
-
- public void fail(Long offset) {
- if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
- LOG.info(
- "Skipping failed tuple at offset={}" +
- " because it's more than maxOffsetBehind={}" +
- " behind _emittedToOffset={} for {}",
- offset,
- _spoutConfig.maxOffsetBehind,
- _emittedToOffset,
- _partition
- );
- } else {
- LOG.debug("Failing at offset={} with _pending.size()={} pending and _emittedToOffset={} for {}", offset, _pending.size(),
- _emittedToOffset, _partition);
- numberFailed++;
- if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
- throw new RuntimeException("Too many tuple failures");
- }
-
- // Offset may not be considered for retry by failedMsgRetryManager
- if (this._failedMsgRetryManager.retryFurther(offset)) {
- this._failedMsgRetryManager.failed(offset);
- } else {
- // state for the offset should be cleaned up
- LOG.warn("Will not retry failed kafka offset {} further", offset);
- _messageIneligibleForRetryCount.incr();
- this._failedMsgRetryManager.cleanOffsetAfterRetries(_partition, offset);
- _pending.remove(offset);
- this._failedMsgRetryManager.acked(offset);
- }
- }
- }
-
- public void commit() {
- long lastCompletedOffset = lastCompletedOffset();
- if (_committedTo != lastCompletedOffset) {
- LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition,
- _topologyInstanceId);
- Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
- .put("topology", ImmutableMap.of("id", _topologyInstanceId,
- "name", _topoConf
- .get(Config.TOPOLOGY_NAME)))
- .put("offset", lastCompletedOffset)
- .put("partition", _partition.partition)
- .put("broker", ImmutableMap.of("host", _partition.host.host,
- "port", _partition.host.port))
- .put("topic", _partition.topic).build();
- _state.writeJSON(committedPath(), data);
-
- _committedTo = lastCompletedOffset;
- LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition,
- _topologyInstanceId);
- } else {
- LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId);
- }
- }
-
- protected String committedPath() {
- return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
- }
-
- public long lastCompletedOffset() {
- if (_pending.isEmpty()) {
- return _emittedToOffset;
- } else {
- return _pending.firstKey();
- }
- }
-
- public OffsetData getOffsetData() {
- return new OffsetData(_emittedToOffset, lastCompletedOffset());
- }
-
- public Partition getPartition() {
- return _partition;
- }
-
- public void close() {
- commit();
- _connections.unregister(_partition.host, _partition.topic, _partition.partition);
- }
-
- static class KafkaMessageId implements Serializable {
- public Partition partition;
- public long offset;
-
- public KafkaMessageId(Partition partition, long offset) {
- this.partition = partition;
- this.offset = offset;
- }
- }
-
- public static class OffsetData {
- public long latestEmittedOffset;
- public long latestCompletedOffset;
-
- public OffsetData(long latestEmittedOffset, long latestCompletedOffset) {
- this.latestEmittedOffset = latestEmittedOffset;
- this.latestCompletedOffset = latestCompletedOffset;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
deleted file mode 100644
index 74a4a3b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-
-public class SpoutConfig extends KafkaConfig implements Serializable {
- private static final long serialVersionUID = -1247769246497567352L;
- public List<String> zkServers = null;
- public Integer zkPort = null;
- public String zkRoot = null;
- public String id = null;
-
- public String outputStreamId;
-
- // setting for how often to save the current kafka offset to ZooKeeper
- public long stateUpdateIntervalMs = 2000;
-
- // Retry strategy for failed messages
- public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
-
- // Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
- // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
- public long retryInitialDelayMs = 0;
- public double retryDelayMultiplier = 1.0;
- public long retryDelayMaxMs = 60 * 1000;
- public int retryLimit = -1;
-
- /**
- * Create a SpoutConfig without setting client.id, which can make the source application ambiguous when tracing Kafka calls.
- */
- public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
- super(hosts, topic);
- this.zkRoot = zkRoot;
- this.id = id;
- }
-
- /**
- * Create a SpoutConfig with a client.id value.
- */
- public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id) {
- super(hosts, topic, clientId);
- this.zkRoot = zkRoot;
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
deleted file mode 100644
index 8d12ee1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-
-public class StaticCoordinator implements PartitionCoordinator {
- Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
- List<PartitionManager> _allManagers = new ArrayList<>();
-
- public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state,
- int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
- StaticHosts hosts = (StaticHosts) config.hosts;
- List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
- partitions.add(hosts.getPartitionInformation());
- List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
- for (Partition myPartition : myPartitions) {
- _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, topoConf, config, myPartition));
- }
- _allManagers = new ArrayList<>(_managers.values());
- }
-
- @Override
- public List<PartitionManager> getMyManagedPartitions() {
- return _allManagers;
- }
-
- public PartitionManager getManager(Partition partition) {
- return _managers.get(partition);
- }
-
- @Override
- public void refresh() { return; }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
deleted file mode 100644
index 1f8f903..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-/**
- * Date: 11/05/2013
- * Time: 14:43
- */
-public class StaticHosts implements BrokerHosts {
-
-
- private GlobalPartitionInformation partitionInformation;
-
- public StaticHosts(GlobalPartitionInformation partitionInformation) {
- this.partitionInformation = partitionInformation;
- }
-
- public GlobalPartitionInformation getPartitionInformation() {
- return partitionInformation;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
deleted file mode 100644
index 2c9d4f2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.HashMap;
-import java.util.Map;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class StaticPartitionConnections {
- Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
- KafkaConfig _config;
- StaticHosts hosts;
-
- public StaticPartitionConnections(KafkaConfig conf) {
- _config = conf;
- if (!(conf.hosts instanceof StaticHosts)) {
- throw new RuntimeException("Must configure with static hosts");
- }
- this.hosts = (StaticHosts) conf.hosts;
- }
-
- public SimpleConsumer getConsumer(int partition) {
- if (!_kafka.containsKey(partition)) {
- Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
- _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
-
- }
- return _kafka.get(partition);
- }
-
- public void close() {
- for (SimpleConsumer consumer : _kafka.values()) {
- consumer.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
deleted file mode 100644
index 3d62961..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.collect.ImmutableMap;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.tuple.Values;
-
-public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
-
- @Override
- public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
- if (key == null) {
- return deserialize(value);
- }
- String keyString = StringScheme.deserializeString(key);
- String valueString = StringScheme.deserializeString(value);
- return new Values(ImmutableMap.of(keyString, valueString));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
deleted file mode 100644
index ab6e500..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme {
- public static final String STRING_SCHEME_PARTITION_KEY = "partition";
- public static final String STRING_SCHEME_OFFSET = "offset";
- private static final long serialVersionUID = -5441841920447947374L;
-
- @Override
- public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
- String stringMessage = StringScheme.deserializeString(message);
- return new Values(stringMessage, partition.partition, offset);
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
deleted file mode 100644
index 061b30a..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import org.apache.storm.spout.MultiScheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class StringMultiSchemeWithTopic
- implements MultiScheme {
- public static final String STRING_SCHEME_KEY = "str";
-
- public static final String TOPIC_KEY = "topic";
-
- @Override
- public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
- throw new UnsupportedOperationException();
- }
-
- public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) {
- List<Object> items = new Values(StringScheme.deserializeString(bytes), topic);
- return Collections.singletonList(items);
- }
-
- public Fields getOutputFields() {
- return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
deleted file mode 100644
index bcbc058..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-public class StringScheme implements Scheme {
- public static final String STRING_SCHEME_KEY = "str";
- private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
-
- public static String deserializeString(ByteBuffer string) {
- if (string.hasArray()) {
- int base = string.arrayOffset();
- return new String(string.array(), base + string.position(), string.remaining(), UTF8_CHARSET);
- } else {
- return new String(Utils.toByteArray(string), UTF8_CHARSET);
- }
- }
-
- public List<Object> deserialize(ByteBuffer bytes) {
- return new Values(deserializeString(bytes));
- }
-
- public Fields getOutputFields() {
- return new Fields(STRING_SCHEME_KEY);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
deleted file mode 100644
index 613a62e..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-public class TopicOffsetOutOfRangeException extends RuntimeException {
-
- public TopicOffsetOutOfRangeException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
deleted file mode 100644
index bc9ebd5..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
-
-public class ZkCoordinator implements PartitionCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
-
- SpoutConfig _spoutConfig;
- int _taskIndex;
- int _totalTasks;
- int _taskId;
- String _topologyInstanceId;
- Map<Partition, PartitionManager> _managers = new HashMap();
- List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
- Long _lastRefreshTime = null;
- int _refreshFreqMs;
- DynamicPartitionConnections _connections;
- DynamicBrokersReader _reader;
- ZkState _state;
- Map _topoConf;
-
- public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
- int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
- this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId,
- buildReader(topoConf, spoutConfig));
- }
-
- public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
- int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
- _spoutConfig = spoutConfig;
- _connections = connections;
- _taskIndex = taskIndex;
- _totalTasks = totalTasks;
- _taskId = taskId;
- _topologyInstanceId = topologyInstanceId;
- _topoConf = topoConf;
- _state = state;
- ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
- _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
- _reader = reader;
- }
-
- private static DynamicBrokersReader buildReader(Map<String, Object> topoConf, SpoutConfig spoutConfig) {
- ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
- return new DynamicBrokersReader(topoConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
- }
-
- @Override
- public List<PartitionManager> getMyManagedPartitions() {
- if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
- refresh();
- _lastRefreshTime = System.currentTimeMillis();
- }
- return _cachedList;
- }
-
- @Override
- public void refresh() {
- try {
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
- List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
- List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
-
- Set<Partition> curr = _managers.keySet();
- Set<Partition> newPartitions = new HashSet<Partition>(mine);
- newPartitions.removeAll(curr);
-
- Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
- deletedPartitions.removeAll(mine);
-
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
-
- Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
- for (Partition id : deletedPartitions) {
- deletedManagers.put(id.partition, _managers.remove(id));
- }
- for (PartitionManager manager : deletedManagers.values()) {
- if (manager != null) manager.close();
- }
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
-
- for (Partition id : newPartitions) {
- PartitionManager man = new PartitionManager(
- _connections,
- _topologyInstanceId,
- _state,
- _topoConf,
- _spoutConfig,
- id,
- deletedManagers.get(id.partition));
- _managers.put(id, man);
- }
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- _cachedList = new ArrayList<PartitionManager>(_managers.values());
- LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
- }
-
- @Override
- public PartitionManager getManager(Partition partition) {
- return _managers.get(partition);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
deleted file mode 100644
index 9c6b29d..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-
-public class ZkHosts implements BrokerHosts {
- private static final String DEFAULT_ZK_PATH = "/brokers";
-
- public String brokerZkStr = null;
- public String brokerZkPath = null; // e.g., /kafka/brokers
- public int refreshFreqSecs = 60;
-
- public ZkHosts(String brokerZkStr, String brokerZkPath) {
- this.brokerZkStr = brokerZkStr;
- this.brokerZkPath = brokerZkPath;
- }
-
- public ZkHosts(String brokerZkStr) {
- this(brokerZkStr, DEFAULT_ZK_PATH);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
deleted file mode 100644
index 3d27173..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.storm.Config;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkState {
- private static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
- CuratorFramework _curator;
-
- public ZkState(Map<String, Object> stateConf) {
- stateConf = new HashMap<>(stateConf);
-
- try {
- _curator = newCurator(stateConf);
- _curator.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private CuratorFramework newCurator(final Map<String, Object> stateConf)
- throws Exception {
- Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
- String serverPorts = "";
- for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
- serverPorts = serverPorts + server + ":" + port + ",";
- }
- return CuratorFrameworkFactory.newClient(serverPorts,
- ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
- new RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- ObjectReader
- .getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- }
-
- public CuratorFramework getCurator() {
- assert _curator != null;
- return _curator;
- }
-
- public void writeJSON(String path, Map<Object, Object> data) {
- LOG.debug("Writing {} the data {}", path, data.toString());
- writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
- }
-
- public void writeBytes(String path, byte[] bytes) {
- try {
- if (_curator.checkExists().forPath(path) == null) {
- _curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, bytes);
- } else {
- _curator.setData().forPath(path, bytes);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public Map<Object, Object> readJSON(String path) {
- try {
- byte[] b = readBytes(path);
- if (b == null) {
- return null;
- }
- return (Map<Object, Object>) JSONValue.parseWithException(new String(b, "UTF-8"));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public byte[] readBytes(String path) {
- try {
- if (_curator.checkExists().forPath(path) != null) {
- return _curator.getData().forPath(path);
- } else {
- return null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- _curator = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
deleted file mode 100644
index e0b94f3..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
-import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
-import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
-import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Bolt implementation that can send Tuple data to Kafka
- * <p/>
- * It expects the producer configuration and topic in storm config under
- * <p/>
- * 'kafka.broker.properties' and 'topic'
- * <p/>
- * respectively.
- * <p/>
- * This bolt uses 0.8.2 Kafka Producer API.
- * <p/>
- * It works for sending tuples to older Kafka version (0.8.1).
- * @deprecated Please use the KafkaBolt in storm-kafka-client
- */
-@Deprecated
-public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
-
- public static final String TOPIC = "topic";
- private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
- private KafkaProducer<K, V> producer;
- private OutputCollector collector;
- private TupleToKafkaMapper<K, V> mapper;
- private KafkaTopicSelector topicSelector;
- private Properties boltSpecfiedProperties = new Properties();
- /**
- * With default setting for fireAndForget and async, the callback is called when the sending succeeds.
- * By setting fireAndForget true, the send will not wait at all for kafka to ack.
- * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set.
- * By setting async false, synchronous sending is used.
- */
- private boolean fireAndForget = false;
- private boolean async = true;
-
- public KafkaBolt() {}
-
- public KafkaBolt<K, V> withTupleToKafkaMapper(TupleToKafkaMapper<K, V> mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public KafkaBolt<K, V> withTopicSelector(KafkaTopicSelector selector) {
- this.topicSelector = selector;
- return this;
- }
-
- public KafkaBolt<K, V> withProducerProperties(Properties producerProperties) {
- this.boltSpecfiedProperties = producerProperties;
- return this;
- }
-
- @Override
- public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
- //for backward compatibility.
- if (mapper == null) {
- this.mapper = new FieldNameBasedTupleToKafkaMapper<K, V>();
- }
-
- //for backward compatibility.
- if (topicSelector == null) {
- if (topoConf.containsKey(TOPIC)) {
- this.topicSelector = new DefaultTopicSelector((String) topoConf.get(TOPIC));
- } else {
- throw new IllegalArgumentException("topic should be specified in bolt's configuration");
- }
- }
-
- producer = new KafkaProducer<>(boltSpecfiedProperties);
- this.collector = collector;
- }
-
- @Override
- protected void process(final Tuple input) {
- K key = null;
- V message = null;
- String topic = null;
- try {
- key = mapper.getKeyFromTuple(input);
- message = mapper.getMessageFromTuple(input);
- topic = topicSelector.getTopic(input);
- if (topic != null) {
- Callback callback = null;
-
- if (!fireAndForget && async) {
- callback = new Callback() {
- @Override
- public void onCompletion(RecordMetadata ignored, Exception e) {
- synchronized (collector) {
- if (e != null) {
- collector.reportError(e);
- collector.fail(input);
- } else {
- collector.ack(input);
- }
- }
- }
- };
- }
- Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
- if (!async) {
- try {
- result.get();
- collector.ack(input);
- } catch (ExecutionException err) {
- collector.reportError(err);
- collector.fail(input);
- }
- } else if (fireAndForget) {
- collector.ack(input);
- }
- } else {
- LOG.warn("skipping key = " + key + ", topic selector returned null.");
- collector.ack(input);
- }
- } catch (Exception ex) {
- collector.reportError(ex);
- collector.fail(input);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
- }
-
- @Override
- public void cleanup() {
- producer.close();
- }
-
- public void setFireAndForget(boolean fireAndForget) {
- this.fireAndForget = fireAndForget;
- }
-
- public void setAsync(boolean async) {
- this.async = async;
- }
-}