You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/11/02 23:19:22 UTC

[15/17] storm git commit: upstream merge

upstream merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fe882009
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fe882009
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fe882009

Branch: refs/heads/master
Commit: fe882009aa1070b3abb216064f7b54e6a5ac56c1
Parents: 8cb379f 8ba776b
Author: Sumit Chawla <su...@gmail.com>
Authored: Sun Nov 1 19:18:24 2015 -0800
Committer: Sumit Chawla <su...@gmail.com>
Committed: Sun Nov 1 19:18:24 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   11 +
 README.markdown                                 |    4 +-
 STORM-UI-REST-API.md                            |   19 +-
 conf/defaults.yaml                              |    7 +-
 .../storm/starter/FastWordCountTopology.java    |  198 +++
 .../jvm/storm/starter/InOrderDeliveryTest.java  |  175 +++
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |    5 +-
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |  128 ++
 .../hdfs/bolt/AvroGenericRecordBoltTest.java    |  203 ++++
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |    2 +
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |    9 +
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |    5 +
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |    3 +
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |    5 +
 .../storm/jdbc/bolt/JdbcInsertBoltTest.java     |   71 ++
 .../storm/jdbc/bolt/JdbcLookupBoltTest.java     |   59 +
 external/storm-kafka/README.md                  |   44 +-
 .../src/jvm/storm/kafka/KafkaConfig.java        |    3 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |    8 +
 .../jvm/storm/kafka/MessageMetadataScheme.java  |   25 +
 .../MessageMetadataSchemeAsMultiScheme.java     |   40 +
 .../src/jvm/storm/kafka/PartitionManager.java   |   13 +-
 .../kafka/StringMessageAndMetadataScheme.java   |   42 +
 .../kafka/trident/TridentKafkaEmitter.java      |   14 +-
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   51 +-
 log4j2/worker.xml                               |   12 +-
 pom.xml                                         |    4 +-
 storm-core/pom.xml                              |    2 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |    8 +-
 storm-core/src/clj/backtype/storm/config.clj    |   19 +
 .../src/clj/backtype/storm/daemon/executor.clj  |  105 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |  514 +++++---
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   26 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   19 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   10 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   10 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |   53 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   45 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   26 +-
 storm-core/src/clj/backtype/storm/util.clj      |   41 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   47 +-
 .../backtype/storm/generated/TopologyInfo.java  |  604 ++++++++-
 .../storm/generated/TopologyPageInfo.java       |  606 ++++++++-
 .../storm/generated/TopologySummary.java        |  606 ++++++++-
 .../storm/metric/FileBasedEventLogger.java      |   18 +-
 .../jvm/backtype/storm/scheduler/Cluster.java   |   80 +-
 .../backtype/storm/scheduler/Topologies.java    |    6 +-
 .../storm/scheduler/TopologyDetails.java        |   51 +
 .../resource/ResourceAwareScheduler.java        |   38 +-
 .../authorizer/DRPCSimpleACLAuthorizer.java     |    8 +-
 .../authorizer/ImpersonationAuthorizer.java     |    7 +-
 .../serialization/SerializationFactory.java     |    6 +-
 .../storm/task/GeneralTopologyContext.java      |    5 +-
 ...uePartitionedTransactionalSpoutExecutor.java |    5 +-
 .../backtype/storm/utils/DisruptorQueue.java    |  449 ++++---
 .../jvm/backtype/storm/utils/MutableObject.java |    6 +-
 .../backtype/storm/utils/TransferDrainer.java   |   17 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   88 +-
 .../jvm/storm/trident/spout/ITridentSpout.java  |   51 +-
 .../OpaquePartitionedTridentSpoutExecutor.java  |   10 +-
 .../topology/TridentTopologyBuilder.java        |   23 +-
 storm-core/src/py/storm/ttypes.py               | 1145 +++++++++++++++---
 storm-core/src/storm.thrift                     |   18 +
 storm-core/src/ui/public/index.html             |    6 +-
 .../public/templates/index-page-template.html   |   18 +-
 .../templates/topology-page-template.html       |   60 +
 storm-core/src/ui/public/topology.html          |    8 +
 .../test/clj/backtype/storm/logviewer_test.clj  |  312 +++--
 .../scheduler/resource_aware_scheduler_test.clj |   38 +-
 .../test/clj/backtype/storm/supervisor_test.clj |  210 ++--
 .../utils/DisruptorQueueBackpressureTest.java   |   17 +-
 .../storm/utils/DisruptorQueueTest.java         |   66 +-
 72 files changed, 5487 insertions(+), 1180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/README.md
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0d286be,2e047b3..cd684df
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@@ -231,15 -207,19 +231,23 @@@ public class KafkaUtils 
          }
          return tups;
      }
+     
+     public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
+         ByteBuffer payload = msg.payload();
+         if (payload == null) {
+             return null;
+         }
+         return scheme.deserializeMessageWithMetadata(Utils.toByteArray(payload), partition, offset);
+     }
  
  
 -    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
 +    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
          Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
 -        List<Partition> partitions = partitionInformation.getOrderedPartitions();
 +        List<Partition> taskPartitions = new ArrayList<Partition>();
 +        List<Partition> partitions = new ArrayList<Partition>();
 +        for(GlobalPartitionInformation partitionInformation : partitons) {
 +            partitions.addAll(partitionInformation.getOrderedPartitions());
 +        }
          int numPartitions = partitions.size();
          if (numPartitions < totalTasks) {
              LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");

http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index f5ea696,39e42ed..faf34d9
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@@ -136,7 -139,14 +140,14 @@@ public class PartitionManager 
              if (toEmit == null) {
                  return EmitState.NO_EMITTED;
              }
-             Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg,_partition.topic);
+ 
+             Iterable<List<Object>> tups;
+             if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+                 tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.msg, _partition, toEmit.offset);
+             } else {
 -                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
++                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, partition.topic);
+             }
+             
              if ((tups != null) && tups.iterator().hasNext()) {
                  if(_spoutConfig.topicAsStreamId) {
                      for (List<Object> tup : tups) {

http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index a72a48d,39aac1a..a97d2cb
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@@ -176,8 -179,14 +176,14 @@@ public class TridentKafkaEmitter 
          }
      }
  
-     private void emit(TridentCollector collector, Message msg, String topic) {
-         Iterable<List<Object>> values = KafkaUtils.generateTuples(_config, msg, topic);
+     private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
+         Iterable<List<Object>> values;
+         if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+             values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
+         } else {
 -            values = KafkaUtils.generateTuples(_config, msg);
++            values = KafkaUtils.generateTuples(_config, msg, partition.topic);
+         }
+ 
          if (values != null) {
              for (List<Object> value : values) {
                  collector.emit(value);

http://git-wip-us.apache.org/repos/asf/storm/blob/fe882009/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 9c08b1d,65e8d2b..2b5c8f7
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@@ -24,25 -29,24 +29,24 @@@ import kafka.api.OffsetRequest
  import kafka.javaapi.consumer.SimpleConsumer;
  import kafka.javaapi.message.ByteBufferMessageSet;
  import kafka.message.MessageAndOffset;
+ 
  import org.apache.kafka.clients.producer.KafkaProducer;
  import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
  import org.junit.After;
+ import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
- import org.junit.Assert;
- import storm.kafka.trident.GlobalPartitionInformation;
- 
- import java.util.*;
- 
- import static org.hamcrest.CoreMatchers.equalTo;
- import static org.hamcrest.CoreMatchers.is;
- import static org.junit.Assert.assertEquals;
- import static org.junit.Assert.assertThat;
+ import org.mockito.Mockito;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
+ 
+ import storm.kafka.trident.GlobalPartitionInformation;
+ import backtype.storm.spout.SchemeAsMultiScheme;
+ import backtype.storm.utils.Utils;
+ 
+ import com.google.common.collect.ImmutableMap;
  public class KafkaUtilsTest {
 -
 +    private String TEST_TOPIC = "testTopic";
      private static final Logger LOG = LoggerFactory.getLogger(KafkaUtilsTest.class);
      private KafkaTestBroker broker;
      private SimpleConsumer simpleConsumer;