You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/19 03:34:59 UTC
[3/7] storm git commit: STORM-2953: Remove storm-kafka
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 7276ef6..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.mapper;
-
-import org.apache.storm.tuple.Tuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TupleToKafkaMapper<K, V> {
-
- public static final String BOLT_KEY = "key";
- public static final String BOLT_MESSAGE = "message";
- public String boltKeyField;
- public String boltMessageField;
-
- public FieldNameBasedTupleToKafkaMapper() {
- this(BOLT_KEY, BOLT_MESSAGE);
- }
-
- public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) {
- this.boltKeyField = boltKeyField;
- this.boltMessageField = boltMessageField;
- }
-
- @Override
- public K getKeyFromTuple(Tuple tuple) {
- //for backward compatibility, we return null when key is not present.
- return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
- }
-
- @Override
- public V getMessageFromTuple(Tuple tuple) {
- return (V) tuple.getValueByField(boltMessageField);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
deleted file mode 100644
index 7012e6b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.mapper;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-/**
- * as the really verbose name suggests this interface mapps a storm tuple to kafka key and message.
- * @param <K> type of key.
- * @param <V> type of value.
- */
-public interface TupleToKafkaMapper<K, V> extends Serializable {
- K getKeyFromTuple(Tuple tuple);
-
- V getMessageFromTuple(Tuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
deleted file mode 100644
index d1784b0..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
- private final String topicName;
-
- public DefaultTopicSelector(final String topicName) {
- this.topicName = topicName;
- }
-
- @Override
- public String getTopic(Tuple tuple) {
- return topicName;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
deleted file mode 100644
index 50c5c1f..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses field index to select topic name from tuple .
- */
-public class FieldIndexTopicSelector implements KafkaTopicSelector {
-
- private static final Logger LOG = LoggerFactory.getLogger(FieldIndexTopicSelector.class);
-
- private final int fieldIndex;
- private final String defaultTopicName;
-
- public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
- this.fieldIndex = fieldIndex;
- this.defaultTopicName = defaultTopicName;
- }
-
- @Override
- public String getTopic(Tuple tuple) {
- if (fieldIndex < tuple.size()) {
- return tuple.getString(fieldIndex);
- } else {
- LOG.warn("Field Index " + fieldIndex + " Out of bound . Using default topic " + defaultTopicName);
- return defaultTopicName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
deleted file mode 100644
index d3c304a..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Uses field name to select topic name from tuple .
- */
-public class FieldNameTopicSelector implements KafkaTopicSelector {
-
- private static final Logger LOG = LoggerFactory.getLogger(FieldNameTopicSelector.class);
-
- private final String fieldName;
- private final String defaultTopicName;
-
-
- public FieldNameTopicSelector(String fieldName, String defaultTopicName) {
- this.fieldName = fieldName;
- this.defaultTopicName = defaultTopicName;
- }
-
- @Override
- public String getTopic(Tuple tuple) {
- if (tuple.contains(fieldName)) {
- return tuple.getStringByField(fieldName);
- } else {
- LOG.warn("Field " + fieldName + " Not Found . Using default topic " + defaultTopicName);
- return defaultTopicName;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
deleted file mode 100644
index 4045df7..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt.selector;
-
-import java.io.Serializable;
-import org.apache.storm.tuple.Tuple;
-
-public interface KafkaTopicSelector extends Serializable {
- String getTopic(Tuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
deleted file mode 100644
index 0e8dba1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/Coordinator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.KafkaUtils;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-
-class Coordinator implements IPartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>>,
- IOpaquePartitionedTridentSpout.Coordinator<List<GlobalPartitionInformation>> {
-
- private IBrokerReader reader;
- private TridentKafkaConfig config;
-
- public Coordinator(Map<String, Object> conf, TridentKafkaConfig tridentKafkaConfig) {
- config = tridentKafkaConfig;
- reader = KafkaUtils.makeBrokerReader(conf, config);
- }
-
- @Override
- public void close() {
- config.coordinator.close();
- }
-
- @Override
- public boolean isReady(long txid) {
- return config.coordinator.isReady(txid);
- }
-
- @Override
- public List<GlobalPartitionInformation> getPartitionsForBatch() {
- return reader.getAllBrokers();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
deleted file mode 100644
index 575e235..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/DefaultCoordinator.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-public class DefaultCoordinator implements IBatchCoordinator {
-
- @Override
- public boolean isReady(long txid) {
- return true;
- }
-
- @Override
- public void close() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
deleted file mode 100644
index b26dc7f..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/GlobalPartitionInformation.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import com.google.common.base.Objects;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.apache.storm.kafka.Broker;
-import org.apache.storm.kafka.Partition;
-
-
-public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
-
- public String topic;
- private Map<Integer, Broker> partitionMap;
- //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
- private Boolean bUseTopicNameForPartitionPathId;
-
- public GlobalPartitionInformation(String topic, Boolean bUseTopicNameForPartitionPathId) {
- this.topic = topic;
- this.partitionMap = new TreeMap<Integer, Broker>();
- this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
- }
-
- public GlobalPartitionInformation(String topic) {
- this.topic = topic;
- this.partitionMap = new TreeMap<Integer, Broker>();
- this.bUseTopicNameForPartitionPathId = false;
- }
-
- public void addPartition(int partitionId, Broker broker) {
- partitionMap.put(partitionId, broker);
- }
-
- public Boolean getbUseTopicNameForPartitionPathId() {
- return bUseTopicNameForPartitionPathId;
- }
-
- @Override
- public String toString() {
- return "GlobalPartitionInformation{" +
- "topic=" + topic +
- ", partitionMap=" + partitionMap +
- '}';
- }
-
- public Broker getBrokerFor(Integer partitionId) {
- return partitionMap.get(partitionId);
- }
-
- public List<Partition> getOrderedPartitions() {
- List<Partition> partitions = new LinkedList<Partition>();
- for (Map.Entry<Integer, Broker> partition : partitionMap.entrySet()) {
- partitions.add(new Partition(partition.getValue(), this.topic, partition.getKey(), this.bUseTopicNameForPartitionPathId));
- }
- return partitions;
- }
-
- @Override
- public Iterator<Partition> iterator() {
- final Iterator<Map.Entry<Integer, Broker>> iterator = partitionMap.entrySet().iterator();
- final String topic = this.topic;
- final Boolean bUseTopicNameForPartitionPathId = this.bUseTopicNameForPartitionPathId;
- return new Iterator<Partition>() {
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public Partition next() {
- Map.Entry<Integer, Broker> next = iterator.next();
- return new Partition(next.getValue(), topic, next.getKey(), bUseTopicNameForPartitionPathId);
- }
-
- @Override
- public void remove() {
- iterator.remove();
- }
- };
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(partitionMap);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final GlobalPartitionInformation other = (GlobalPartitionInformation) obj;
- return Objects.equal(this.partitionMap, other.partitionMap);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
deleted file mode 100644
index 4c6c404..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBatchCoordinator.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.io.Serializable;
-
-public interface IBatchCoordinator extends Serializable {
- boolean isReady(long txid);
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
deleted file mode 100644
index c5cf8b2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/IBrokerReader.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-
-public interface IBrokerReader {
-
- GlobalPartitionInformation getBrokerForTopic(String topic);
-
- List<GlobalPartitionInformation> getAllBrokers();
-
- void close();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
deleted file mode 100644
index 14324ed..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/MaxMetric.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-
-import org.apache.storm.metric.api.ICombiner;
-
-public class MaxMetric implements ICombiner<Long> {
- @Override
- public Long identity() {
- return null;
- }
-
- @Override
- public Long combine(Long l1, Long l2) {
- if (l1 == null) {
- return l2;
- }
- if (l2 == null) {
- return l1;
- }
- return Math.max(l1, l2);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
deleted file mode 100644
index c98be42..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.tuple.Fields;
-
-
-public class OpaqueTridentKafkaSpout
- implements IOpaquePartitionedTridentSpout<
- List<GlobalPartitionInformation>,
- Partition,
- Map<String, Object>> {
-
-
- TridentKafkaConfig _config;
-
- public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
- _config = config;
- }
-
- @Override
- public Emitter<List<GlobalPartitionInformation>,
- Partition,
- Map<String, Object>> getEmitter(Map<String, Object> conf,
- TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, context
- .getStormId()).asOpaqueEmitter();
- }
-
- @Override
- public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(
- Map<String, Object> conf,
- TopologyContext tc) {
- return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
- }
-
- @Override
- public Fields getOutputFields() {
- return _config.scheme.getOutputFields();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
deleted file mode 100644
index 3c5cc09..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/StaticBrokerReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-public class StaticBrokerReader implements IBrokerReader {
-
- private Map<String, GlobalPartitionInformation> brokers = new TreeMap<String, GlobalPartitionInformation>();
-
- public StaticBrokerReader(String topic, GlobalPartitionInformation partitionInformation) {
- this.brokers.put(topic, partitionInformation);
- }
-
- @Override
- public GlobalPartitionInformation getBrokerForTopic(String topic) {
- if (brokers.containsKey(topic)) return brokers.get(topic);
- return null;
- }
-
- @Override
- public List<GlobalPartitionInformation> getAllBrokers() {
- List<GlobalPartitionInformation> list = new ArrayList<GlobalPartitionInformation>();
- list.addAll(brokers.values());
- return list;
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
deleted file mode 100644
index 7b1d4dd..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.Map;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.tuple.Fields;
-
-
-public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
- TridentKafkaConfig _config;
-
- public TransactionalTridentKafkaSpout(TridentKafkaConfig config) {
- _config = config;
- }
-
-
- @Override
- public IPartitionedTridentSpout.Coordinator getCoordinator(Map<String, Object> conf, TopologyContext context) {
- return new org.apache.storm.kafka.trident.Coordinator(conf, _config);
- }
-
- @Override
- public IPartitionedTridentSpout.Emitter getEmitter(Map<String, Object> conf, TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, context.getStormId()).asTransactionalEmitter();
- }
-
- @Override
- public Fields getOutputFields() {
- return _config.scheme.getOutputFields();
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
deleted file mode 100644
index 3dac221..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import org.apache.storm.kafka.BrokerHosts;
-import org.apache.storm.kafka.KafkaConfig;
-
-
-public class TridentKafkaConfig extends KafkaConfig {
-
-
- public final IBatchCoordinator coordinator = new DefaultCoordinator();
-
- public TridentKafkaConfig(BrokerHosts hosts, String topic) {
- super(hosts, topic);
- }
-
- public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
- super(hosts, topic, clientId);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
deleted file mode 100644
index cb00579..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ /dev/null
@@ -1,306 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.DynamicPartitionConnections;
-import org.apache.storm.kafka.FailedFetchException;
-import org.apache.storm.kafka.KafkaUtils;
-import org.apache.storm.kafka.MessageMetadataSchemeAsMultiScheme;
-import org.apache.storm.kafka.Partition;
-import org.apache.storm.kafka.PartitionManager;
-import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
-import org.apache.storm.metric.api.CombinedMetric;
-import org.apache.storm.metric.api.MeanReducer;
-import org.apache.storm.metric.api.ReducedMetric;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.trident.topology.TransactionAttempt;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaEmitter {
-
- private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
- private DynamicPartitionConnections _connections;
- private String _topologyName;
- private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
- private ReducedMetric _kafkaMeanFetchLatencyMetric;
- private CombinedMetric _kafkaMaxFetchLatencyMetric;
- private TridentKafkaConfig _config;
- private String _topologyInstanceId;
-
- public TridentKafkaEmitter(Map<String, Object> conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
- _config = config;
- _topologyInstanceId = topologyInstanceId;
- _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
- _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
- _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
- context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
- _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
- _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
- }
-
-
- private Map<String, Object> failFastEmitNewPartitionBatch(
- final TransactionAttempt attempt,
- TridentCollector collector,
- Partition partition,
- Map<String, Object> lastMeta) {
- SimpleConsumer consumer = _connections.register(partition);
- Map<String, Object> ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta, attempt);
- Long offset = (Long) ret.get("offset");
- Long endOffset = (Long) ret.get("nextOffset");
- _kafkaOffsetMetric.setOffsetData(partition, new PartitionManager.OffsetData(endOffset, offset));
- return ret;
- }
-
- private Map<String, Object> emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition,
- Map<String, Object> lastMeta) {
- try {
- return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
- } catch (FailedFetchException e) {
- LOG.warn("Failed to fetch from partition " + partition);
- if (lastMeta == null) {
- return null;
- } else {
- Map<String, Object> ret = new HashMap<>();
- ret.put("offset", lastMeta.get("nextOffset"));
- ret.put("nextOffset", lastMeta.get("nextOffset"));
- ret.put("partition", partition.partition);
- ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- ret.put("topic", partition.topic);
- ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- return ret;
- }
- }
- }
-
- private Map<String, Object> doEmitNewPartitionBatch(SimpleConsumer consumer,
- Partition partition,
- TridentCollector collector,
- Map<String, Object> lastMeta,
- TransactionAttempt attempt) {
- LOG.debug("Emitting new partition batch - [transaction = {}], [lastMeta = {}]", attempt, lastMeta);
- long offset;
- if (lastMeta != null) {
- String lastInstanceId = null;
- Map<String, Object> lastTopoMeta = (Map<String, Object>)
- lastMeta.get("topology");
- if (lastTopoMeta != null) {
- lastInstanceId = (String) lastTopoMeta.get("id");
- }
- if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
- offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
- } else {
- offset = (Long) lastMeta.get("nextOffset");
- }
- } else {
- offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
- }
- LOG.debug("[transaction = {}], [OFFSET = {}]", attempt, offset);
-
- ByteBufferMessageSet msgs = null;
- try {
- msgs = fetchMessages(consumer, partition, offset);
- } catch (TopicOffsetOutOfRangeException e) {
- long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
- LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
- offset = newOffset;
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- }
-
- long endoffset = offset;
- for (MessageAndOffset msg : msgs) {
- emit(collector, msg.message(), partition, msg.offset(), attempt);
- endoffset = msg.nextOffset();
- }
- Map<String, Object> newMeta = new HashMap<>();
- newMeta.put("offset", offset);
- newMeta.put("nextOffset", endoffset);
- newMeta.put("instanceId", _topologyInstanceId);
- newMeta.put("partition", partition.partition);
- newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- newMeta.put("topic", partition.topic);
- newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- LOG.debug("[transaction = {}], [newMeta = {}]", attempt, newMeta);
- return newMeta;
- }
-
- private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
- long start = System.currentTimeMillis();
- ByteBufferMessageSet msgs = null;
- msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
- long millis = System.currentTimeMillis() - start;
- _kafkaMeanFetchLatencyMetric.update(millis);
- _kafkaMaxFetchLatencyMetric.update(millis);
- return msgs;
- }
-
- /**
- * re-emit the batch described by the meta data provided
- */
- private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition,
- Map<String, Object> meta) {
- LOG.info("re-emitting batch, attempt " + attempt);
- String instanceId = (String) meta.get("instanceId");
- if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
- SimpleConsumer consumer = _connections.register(partition);
- long offset = (Long) meta.get("offset");
- long nextOffset = (Long) meta.get("nextOffset");
- ByteBufferMessageSet msgs = null;
- msgs = fetchMessages(consumer, partition, offset);
-
- if (msgs != null) {
- for (MessageAndOffset msg : msgs) {
- if (offset == nextOffset) {
- break;
- }
- if (offset > nextOffset) {
- throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
- }
- emit(collector, msg.message(), partition, msg.offset(), attempt);
- offset = msg.nextOffset();
- }
- }
- }
- }
-
- private void emit(TridentCollector collector, Message msg, Partition partition, long offset, TransactionAttempt attempt) {
- Iterable<List<Object>> values;
- if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
- values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
- } else {
- values = KafkaUtils.generateTuples(_config, msg, partition.topic);
- }
-
- if (values != null) {
- for (List<Object> value : values) {
- LOG.debug("Emitting: [Transaction: {}], [Data: {}]", attempt, value);
- collector.emit(value);
- }
- } else {
- LOG.debug("NOT Emitting NULL data. [Transaction: {}]", attempt);
- }
- }
-
- private void clear() {
- _connections.clear();
- }
-
- private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
- List<Partition> part = new ArrayList<Partition>();
- for (GlobalPartitionInformation globalPartitionInformation : partitions)
- part.addAll(globalPartitionInformation.getOrderedPartitions());
- return part;
- }
-
- private void refresh(List<Partition> list) {
- _connections.clear();
- _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
- }
-
-
- public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>> asOpaqueEmitter() {
-
- return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction.
- *
- * Return the metadata describing this batch that will be used as lastPartitionMeta
- * for defining the parameters of the next batch.
- */
- @Override
- public Map<String, Object> emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector,
- Partition partition, Map<String, Object> map) {
- return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
- }
-
- public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
- return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map<String, Object>>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction that's never been emitted before.
- * Return the metadata that can be used to reconstruct this partition/batch in the future.
- */
- @Override
- public Map<String, Object> emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector,
- Partition partition, Map<String, Object> map) {
- return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * Emit a batch of tuples for a partition/transaction that has been emitted before, using
- * the metadata created when it was first emitted.
- */
- @Override
- public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition,
- Map<String, Object> map) {
- reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * This method is called when this task is responsible for a new set of partitions. Should be used
- * to manage things like connections to brokers.
- */
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
deleted file mode 100644
index 71b2cb1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.commons.lang.Validate;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.topology.FailedException;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.tuple.TridentTuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaState implements State {
- private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
-
- private KafkaProducer producer;
- private OutputCollector collector;
-
- private TridentTupleToKafkaMapper mapper;
- private KafkaTopicSelector topicSelector;
-
- public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
- this.topicSelector = selector;
- return this;
- }
-
- @Override
- public void beginCommit(Long txid) {
- LOG.debug("beginCommit is Noop.");
- }
-
- @Override
- public void commit(Long txid) {
- LOG.debug("commit is Noop.");
- }
-
- public void prepare(Properties options) {
- Validate.notNull(mapper, "mapper can not be null");
- Validate.notNull(topicSelector, "topicSelector can not be null");
- producer = new KafkaProducer(options);
- }
-
- public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
- String topic = null;
- try {
- List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
- for (TridentTuple tuple : tuples) {
- topic = topicSelector.getTopic(tuple);
-
- if (topic != null) {
- Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
- mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple)));
- futures.add(result);
- } else {
- LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
- }
- }
-
- List<ExecutionException> exceptions = new ArrayList<>(futures.size());
- for (Future<RecordMetadata> future : futures) {
- try {
- future.get();
- } catch (ExecutionException e) {
- exceptions.add(e);
- }
- }
-
- if (exceptions.size() > 0) {
- String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic
- + " because of the following exceptions: \n";
- for (ExecutionException exception : exceptions) {
- errorMsg = errorMsg + exception.getMessage() + "\n";
- }
- LOG.error(errorMsg);
- throw new FailedException(errorMsg);
- }
- } catch (Exception ex) {
- String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
deleted file mode 100644
index 5b66fd8..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.Map;
-import java.util.Properties;
-import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
-import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
-import org.apache.storm.task.IMetricsContext;
-import org.apache.storm.trident.state.State;
-import org.apache.storm.trident.state.StateFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TridentKafkaStateFactory implements StateFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
-
- private TridentTupleToKafkaMapper mapper;
- private KafkaTopicSelector topicSelector;
- private Properties producerProperties = new Properties();
-
- public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
- this.mapper = mapper;
- return this;
- }
-
- public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
- this.topicSelector = selector;
- return this;
- }
-
- public TridentKafkaStateFactory withProducerProperties(Properties props) {
- this.producerProperties = props;
- return this;
- }
-
- @Override
- public State makeState(Map<String, Object> conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
- TridentKafkaState state = new TridentKafkaState()
- .withKafkaTopicSelector(this.topicSelector)
- .withTridentTupleToKafkaMapper(this.mapper);
- state.prepare(producerProperties);
- return state;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
deleted file mode 100644
index 1100b66..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.List;
-import org.apache.storm.trident.operation.TridentCollector;
-import org.apache.storm.trident.state.BaseStateUpdater;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
- @Override
- public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
- state.updateState(tuples, collector);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
deleted file mode 100644
index d40256e..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.DynamicBrokersReader;
-import org.apache.storm.kafka.ZkHosts;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ZkBrokerReader implements IBrokerReader {
-
- private static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
-
- List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
- DynamicBrokersReader reader;
- long lastRefreshTimeMs;
-
-
- long refreshMillis;
-
- public ZkBrokerReader(Map<String, Object> conf, String topic, ZkHosts hosts) {
- try {
- reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = System.currentTimeMillis();
- refreshMillis = hosts.refreshFreqSecs * 1000L;
- } catch (java.net.SocketTimeoutException e) {
- LOG.warn("Failed to update brokers", e);
- }
-
- }
-
- private void refresh() {
- long currTime = System.currentTimeMillis();
- if (currTime > lastRefreshTimeMs + refreshMillis) {
- try {
- LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
- cachedBrokers = reader.getBrokerInfo();
- lastRefreshTimeMs = currTime;
- } catch (java.net.SocketTimeoutException e) {
- LOG.warn("Failed to update brokers", e);
- }
- }
- }
-
- @Override
- public GlobalPartitionInformation getBrokerForTopic(String topic) {
- refresh();
- for (GlobalPartitionInformation partitionInformation : cachedBrokers) {
- if (partitionInformation.topic.equals(topic)) return partitionInformation;
- }
- return null;
- }
-
- @Override
- public List<GlobalPartitionInformation> getAllBrokers() {
- refresh();
- return cachedBrokers;
- }
-
- @Override
- public void close() {
- reader.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
deleted file mode 100644
index 01e3eca..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.mapper;
-
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
-
- public final String keyFieldName;
- public final String msgFieldName;
-
- public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
- this.keyFieldName = keyFieldName;
- this.msgFieldName = msgFieldName;
- }
-
- @Override
- public K getKeyFromTuple(TridentTuple tuple) {
- return (K) tuple.getValueByField(keyFieldName);
- }
-
- @Override
- public V getMessageFromTuple(TridentTuple tuple) {
- return (V) tuple.getValueByField(msgFieldName);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
deleted file mode 100644
index 4a522d6..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.mapper;
-
-import java.io.Serializable;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public interface TridentTupleToKafkaMapper<K, V> extends Serializable {
- K getKeyFromTuple(TridentTuple tuple);
-
- V getMessageFromTuple(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
deleted file mode 100644
index 93b5566..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.selector;
-
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public class DefaultTopicSelector implements KafkaTopicSelector {
-
- private final String topicName;
-
- public DefaultTopicSelector(final String topicName) {
- this.topicName = topicName;
- }
-
- @Override
- public String getTopic(TridentTuple tuple) {
- return topicName;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
deleted file mode 100644
index 6de3921..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.trident.selector;
-
-import java.io.Serializable;
-import org.apache.storm.trident.tuple.TridentTuple;
-
-public interface KafkaTopicSelector extends Serializable {
- String getTopic(TridentTuple tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
deleted file mode 100644
index a6bb61c..0000000
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/DynamicBrokersReaderTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.ExponentialBackoffRetry;
-import org.apache.curator.test.TestingServer;
-import org.apache.curator.utils.ZKPaths;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Date: 16/05/2013
- * Time: 20:35
- */
-public class DynamicBrokersReaderTest {
- private DynamicBrokersReader dynamicBrokersReader, wildCardBrokerReader;
- private String masterPath = "/brokers";
- private String topic = "testing1";
- private String secondTopic = "testing2";
- private String thirdTopic = "testing3";
-
- private CuratorFramework zookeeper;
- private TestingServer server;
-
- @Before
- public void setUp() throws Exception {
- server = new TestingServer();
- String connectionString = server.getConnectString();
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
- conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
- ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
- zookeeper = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
- dynamicBrokersReader = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
-
- Map<String, Object> conf2 = new HashMap<>();
- conf2.putAll(conf);
- conf2.put("kafka.topic.wildcard.match", true);
-
- wildCardBrokerReader = new DynamicBrokersReader(conf2, connectionString, masterPath, "^test.*$");
- zookeeper.start();
- }
-
- @After
- public void tearDown() throws Exception {
- dynamicBrokersReader.close();
- zookeeper.close();
- server.close();
- }
-
- private void addPartition(int id, String host, int port, String topic) throws Exception {
- writePartitionId(id, topic);
- writeLeader(id, 0, topic);
- writeLeaderDetails(0, host, port);
- }
-
- private void addPartition(int id, int leader, String host, int port, String topic) throws Exception {
- writePartitionId(id, topic);
- writeLeader(id, leader, topic);
- writeLeaderDetails(leader, host, port);
- }
-
- private void writePartitionId(int id, String topic) throws Exception {
- String path = dynamicBrokersReader.partitionPath(topic);
- writeDataToPath(path, ("" + id));
- }
-
- private void writeDataToPath(String path, String data) throws Exception {
- ZKPaths.mkdirs(zookeeper.getZookeeperClient().getZooKeeper(), path);
- zookeeper.setData().forPath(path, data.getBytes());
- }
-
- private void writeLeader(int id, int leaderId, String topic) throws Exception {
- String path = dynamicBrokersReader.partitionPath(topic) + "/" + id + "/state";
- String value = " { \"controller_epoch\":4, \"isr\":[ 1, 0 ], \"leader\":" + leaderId + ", \"leader_epoch\":1, \"version\":1 }";
- writeDataToPath(path, value);
- }
-
- private void writeLeaderDetails(int leaderId, String host, int port) throws Exception {
- String path = dynamicBrokersReader.brokerPath() + "/" + leaderId;
- String value = "{ \"host\":\"" + host + "\", \"jmx_port\":9999, \"port\":" + port + ", \"version\":1 }";
- writeDataToPath(path, value);
- }
-
-
- private GlobalPartitionInformation getByTopic(List<GlobalPartitionInformation> partitions, String topic) {
- for (GlobalPartitionInformation partitionInformation : partitions) {
- if (partitionInformation.topic.equals(topic)) return partitionInformation;
- }
- return null;
- }
-
- @Test
- public void testGetBrokerInfo() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- addPartition(partition, host, port, topic);
- List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
- GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(1, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
- }
-
- @Test
- public void testGetBrokerInfoWildcardMatch() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- addPartition(partition, host, port, topic);
- addPartition(partition, host, port, secondTopic);
-
- List<GlobalPartitionInformation> partitions = wildCardBrokerReader.getBrokerInfo();
-
- GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(1, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
- brokerInfo = getByTopic(partitions, secondTopic);
- assertNotNull(brokerInfo);
- assertEquals(1, brokerInfo.getOrderedPartitions().size());
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
- addPartition(partition, host, port, thirdTopic);
- //Discover newly added topic
- partitions = wildCardBrokerReader.getBrokerInfo();
- assertNotNull(getByTopic(partitions, topic));
- assertNotNull(getByTopic(partitions, secondTopic));
- assertNotNull(getByTopic(partitions, secondTopic));
- }
-
-
- @Test
- public void testMultiplePartitionsOnDifferentHosts() throws Exception {
- String host = "localhost";
- int port = 9092;
- int secondPort = 9093;
- int partition = 0;
- int secondPartition = partition + 1;
- addPartition(partition, 0, host, port, topic);
- addPartition(secondPartition, 1, host, secondPort, topic);
-
- List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
- GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
- assertEquals(secondPort, brokerInfo.getBrokerFor(secondPartition).port);
- assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
- }
-
-
- @Test
- public void testMultiplePartitionsOnSameHost() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- int secondPartition = partition + 1;
- addPartition(partition, 0, host, port, topic);
- addPartition(secondPartition, 0, host, port, topic);
-
- List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
- GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(2, brokerInfo.getOrderedPartitions().size());
-
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
- assertEquals(port, brokerInfo.getBrokerFor(secondPartition).port);
- assertEquals(host, brokerInfo.getBrokerFor(secondPartition).host);
- }
-
- @Test
- public void testSwitchHostForPartition() throws Exception {
- String host = "localhost";
- int port = 9092;
- int partition = 0;
- addPartition(partition, host, port, topic);
- List<GlobalPartitionInformation> partitions = dynamicBrokersReader.getBrokerInfo();
-
- GlobalPartitionInformation brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(port, brokerInfo.getBrokerFor(partition).port);
- assertEquals(host, brokerInfo.getBrokerFor(partition).host);
-
- String newHost = host + "switch";
- int newPort = port + 1;
- addPartition(partition, newHost, newPort, topic);
- partitions = dynamicBrokersReader.getBrokerInfo();
-
- brokerInfo = getByTopic(partitions, topic);
- assertNotNull(brokerInfo);
- assertEquals(newPort, brokerInfo.getBrokerFor(partition).port);
- assertEquals(newHost, brokerInfo.getBrokerFor(partition).host);
- }
-
- @Test(expected = NullPointerException.class)
- public void testErrorLogsWhenConfigIsMissing() throws Exception {
- String connectionString = server.getConnectString();
- Map<String, Object> conf = new HashMap<>();
- conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 1000);
- // conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 1000);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 4);
- conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
-
- DynamicBrokersReader dynamicBrokersReader1 = new DynamicBrokersReader(conf, connectionString, masterPath, topic);
-
- }
-}