You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:22 UTC
[15/17] storm git commit: upstream merge
upstream merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fe882009
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fe882009
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fe882009
Branch: refs/heads/master
Commit: fe882009aa1070b3abb216064f7b54e6a5ac56c1
Parents: 8cb379f 8ba776b
Author: Sumit Chawla <su...@gmail.com>
Authored: Sun Nov 1 19:18:24 2015 -0800
Committer: Sumit Chawla <su...@gmail.com>
Committed: Sun Nov 1 19:18:24 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 11 +
README.markdown | 4 +-
STORM-UI-REST-API.md | 19 +-
conf/defaults.yaml | 7 +-
.../storm/starter/FastWordCountTopology.java | 198 +++
.../jvm/storm/starter/InOrderDeliveryTest.java | 175 +++
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 5 +-
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 128 ++
.../hdfs/bolt/AvroGenericRecordBoltTest.java | 203 ++++
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 2 +
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 9 +
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +
.../jdbc/mapper/SimpleJdbcLookupMapper.java | 3 +
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 5 +
.../storm/jdbc/bolt/JdbcInsertBoltTest.java | 71 ++
.../storm/jdbc/bolt/JdbcLookupBoltTest.java | 59 +
external/storm-kafka/README.md | 44 +-
.../src/jvm/storm/kafka/KafkaConfig.java | 3 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 8 +
.../jvm/storm/kafka/MessageMetadataScheme.java | 25 +
.../MessageMetadataSchemeAsMultiScheme.java | 40 +
.../src/jvm/storm/kafka/PartitionManager.java | 13 +-
.../kafka/StringMessageAndMetadataScheme.java | 42 +
.../kafka/trident/TridentKafkaEmitter.java | 14 +-
.../src/test/storm/kafka/KafkaUtilsTest.java | 51 +-
log4j2/worker.xml | 12 +-
pom.xml | 4 +-
storm-core/pom.xml | 2 +-
storm-core/src/clj/backtype/storm/cluster.clj | 8 +-
storm-core/src/clj/backtype/storm/config.clj | 19 +
.../src/clj/backtype/storm/daemon/executor.clj | 105 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 514 +++++---
.../src/clj/backtype/storm/daemon/nimbus.clj | 26 +-
.../clj/backtype/storm/daemon/supervisor.clj | 19 +-
.../src/clj/backtype/storm/daemon/task.clj | 10 +-
.../src/clj/backtype/storm/daemon/worker.clj | 10 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 53 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 45 +-
.../src/clj/backtype/storm/ui/helpers.clj | 26 +-
storm-core/src/clj/backtype/storm/util.clj | 41 +-
storm-core/src/jvm/backtype/storm/Config.java | 47 +-
.../backtype/storm/generated/TopologyInfo.java | 604 ++++++++-
.../storm/generated/TopologyPageInfo.java | 606 ++++++++-
.../storm/generated/TopologySummary.java | 606 ++++++++-
.../storm/metric/FileBasedEventLogger.java | 18 +-
.../jvm/backtype/storm/scheduler/Cluster.java | 80 +-
.../backtype/storm/scheduler/Topologies.java | 6 +-
.../storm/scheduler/TopologyDetails.java | 51 +
.../resource/ResourceAwareScheduler.java | 38 +-
.../authorizer/DRPCSimpleACLAuthorizer.java | 8 +-
.../authorizer/ImpersonationAuthorizer.java | 7 +-
.../serialization/SerializationFactory.java | 6 +-
.../storm/task/GeneralTopologyContext.java | 5 +-
...uePartitionedTransactionalSpoutExecutor.java | 5 +-
.../backtype/storm/utils/DisruptorQueue.java | 449 ++++---
.../jvm/backtype/storm/utils/MutableObject.java | 6 +-
.../backtype/storm/utils/TransferDrainer.java | 17 +-
.../src/jvm/backtype/storm/utils/Utils.java | 88 +-
.../jvm/storm/trident/spout/ITridentSpout.java | 51 +-
.../OpaquePartitionedTridentSpoutExecutor.java | 10 +-
.../topology/TridentTopologyBuilder.java | 23 +-
storm-core/src/py/storm/ttypes.py | 1145 +++++++++++++++---
storm-core/src/storm.thrift | 18 +
storm-core/src/ui/public/index.html | 6 +-
.../public/templates/index-page-template.html | 18 +-
.../templates/topology-page-template.html | 60 +
storm-core/src/ui/public/topology.html | 8 +
.../test/clj/backtype/storm/logviewer_test.clj | 312 +++--
.../scheduler/resource_aware_scheduler_test.clj | 38 +-
.../test/clj/backtype/storm/supervisor_test.clj | 210 ++--
.../utils/DisruptorQueueBackpressureTest.java | 17 +-
.../storm/utils/DisruptorQueueTest.java | 66 +-
72 files changed, 5487 insertions(+), 1180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/README.md
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0d286be,2e047b3..cd684df
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@@ -231,15 -207,19 +231,23 @@@ public class KafkaUtils
}
return tups;
}
+
+ public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
+ ByteBuffer payload = msg.payload();
+ if (payload == null) {
+ return null;
+ }
+ return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+ }
- public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
+ public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
- List<Partition> partitions = partitionInformation.getOrderedPartitions();
+ List<Partition> taskPartitions = new ArrayList<Partition>();
+ List<Partition> partitions = new ArrayList<Partition>();
+ for(GlobalPartitionInformation partitionInformation : partitons) {
+ partitions.addAll(partitionInformation.getOrderedPartitions());
+ }
int numPartitions = partitions.size();
if (numPartitions < totalTasks) {
LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index f5ea696,39e42ed..faf34d9
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@@ -136,7 -139,14 +140,14 @@@ public class PartitionManager
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
- Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg,_partition.topic);
+
+ Iterable<List<Object>> tups;
+ if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
+ } else {
- tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
++ tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, partition.topic);
+ }
+
if ((tups != null) && tups.iterator().hasNext()) {
if(_spoutConfig.topicAsStreamId) {
for (List<Object> tup : tups) {
http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index a72a48d,39aac1a..a97d2cb
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@@ -176,8 -179,14 +176,14 @@@ public class TridentKafkaEmitter
}
}
- private void emit(TridentCollector collector, Message msg, String topic) {
- Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, msg, topic);
+ private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
+ Iterable<List<Object>> values;
+ if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+ values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
+ } else {
- values = KafkaUtils.generateTuples(_config, msg);
++ values = KafkaUtils.generateTuples(_config, msg, partition.topic);
+ }
+
if (values != null) {
for (List<Object> value : values) {
collector.emit(value);
http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 9c08b1d,65e8d2b..2b5c8f7
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@@ -24,25 -29,24 +29,24 @@@ import kafka.api.OffsetRequest
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
+
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
+ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
- import org.junit.Assert;
- import storm.kafka.trident.GlobalPartitionInformation;
-
- import java.util.*;
-
- import static org.hamcrest.CoreMatchers.equalTo;
- import static org.hamcrest.CoreMatchers.is;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertThat;
+ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+ import storm.kafka.trident.GlobalPartitionInformation;
+ import backtype.storm.spout.SchemeAsMultiScheme;
+ import backtype.storm.utils.Utils;
+
+ import com.google.common.collect.ImmutableMap;
public class KafkaUtilsTest {
-
+ private String TEST_TOPIC = "testTopic";
private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class);
private KafkaTestBroker broker;
private SimpleConsumer simpleConsumer;