You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:11 UTC
[10/14] storm git commit: STORM-2937: Overwrite storm-kafka-client
1.x-branch into 1.0.x-branch: backport trident interface changes
STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: backport trident interface changes
Cherry-picked from commit SHA b8885411. Except that we had to ditch all changes to storm-kafka-client because b8885411 was done earlier than 74ca795 where we copied storm-kafka-client from, and so there were conflicting changes that would have been regressions and broken the storm-kafka-client code.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4492b3e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4492b3e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4492b3e2
Branch: refs/heads/1.0.x-branch
Commit: 4492b3e282574c18074baa374bfbab042835d27d
Parents: 1312d69
Author: Hugo Louro <hm...@gmail.com>
Authored: Fri Mar 10 15:13:31 2017 -0600
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Feb 7 18:43:39 2018 -0800
----------------------------------------------------------------------
.../trident/OpaqueTridentEventHubEmitter.java | 20 ++++++++++++++++----
.../kafka/trident/TridentKafkaEmitter.java | 19 +++++++++++++------
.../spout/IOpaquePartitionedTridentSpout.java | 19 ++++++++++++++++---
.../OpaquePartitionedTridentSpoutExecutor.java | 16 +++++++---------
.../topology/state/TransactionalState.java | 4 ++++
5 files changed, 56 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index ae21ab3..20375a2 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -17,16 +17,16 @@
*******************************************************************************/
package org.apache.storm.eventhubs.trident;
-import java.util.List;
-import java.util.Map;
-
import org.apache.storm.eventhubs.spout.EventHubSpoutConfig;
import org.apache.storm.eventhubs.spout.IEventHubReceiverFactory;
-
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
/**
* A thin wrapper of TransactionalTridentEventHubEmitter for OpaqueTridentEventHubSpout
*/
@@ -63,6 +63,18 @@ public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSp
}
@Override
+ public List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo) {
+ final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
+ final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
+ if (orderedPartitions != null) {
+ for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
+ taskPartitions.add(orderedPartitions.get(i));
+ }
+ }
+ return taskPartitions;
+ }
+
+ @Override
public void refreshPartitions(List<Partition> partitionList) {
transactionalEmitter.refreshPartitions(partitionList);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 809ed73..50cae21 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -148,11 +148,6 @@ public class TridentKafkaEmitter {
/**
* re-emit the batch described by the meta data provided
- *
- * @param attempt
- * @param collector
- * @param partition
- * @param meta
*/
private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
LOG.info("re-emitting batch, attempt " + attempt);
@@ -164,7 +159,7 @@ public class TridentKafkaEmitter {
ByteBufferMessageSet msgs = null;
msgs = fetchMessages(consumer, partition, offset);
- if(msgs != null) {
+ if (msgs != null) {
for (MessageAndOffset msg : msgs) {
if (offset == nextOffset) {
break;
@@ -237,6 +232,18 @@ public class TridentKafkaEmitter {
}
@Override
+ public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<GlobalPartitionInformation> allPartitionInfo) {
+ final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
+ final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
+ if (orderedPartitions != null) {
+ for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
+ taskPartitions.add(orderedPartitions.get(i));
+ }
+ }
+ return taskPartitions;
+ }
+
+ @Override
public void close() {
clear();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index bf4c093..50afa69 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -51,13 +51,26 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS
* This method is called when this task is responsible for a new set of partitions. Should be used
* to manage things like connections to brokers.
*/
- void refreshPartitions(List<Partition> partitionResponsibilities);
+ void refreshPartitions(List<Partition> partitionResponsibilities);
+
+ /**
+ * @return The oredered list of partitions being processed by all the tasks
+ */
List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
+
+ /**
+ * @return The list of partitions that are to be processed by the task with id {@code taskId}
+ */
+ List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo);
+
void close();
}
- Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
- Coordinator getCoordinator(Map conf, TopologyContext context);
+ Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
+
+ Coordinator getCoordinator(Map conf, TopologyContext context);
+
Map<String, Object> getComponentConfiguration();
+
Fields getOutputFields();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index 7c43961..ea04659 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -20,7 +20,7 @@ package org.apache.storm.trident.spout;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
-import java.util.ArrayList;
+
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -95,16 +95,14 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
- List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
- List<ISpoutPartition> myPartitions = new ArrayList<>();
- for(int i=_index; i < partitions.size(); i+=_numTasks) {
- ISpoutPartition p = partitions.get(i);
- String id = p.getId();
- myPartitions.add(p);
- _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
+ final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
+ for (ISpoutPartition partition : taskPartitions) {
+ _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
}
- _emitter.refreshPartitions(myPartitions);
+
+ // refresh all partitions for backwards compatibility with old spout
+ _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/4492b3e2/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
index 5061590..b5546c9 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/topology/state/TransactionalState.java
@@ -40,6 +40,10 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Class that contains the logic to extract the transactional state info from zookeeper. All transactional state
+ * is kept in zookeeper. This class only contains references to Curator, which is used to get all info from zookeeper.
+ */
public class TransactionalState {
private static final Logger LOG = LoggerFactory.getLogger(TransactionalState.class);
CuratorFramework _curator;