You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:12 UTC

[06/38] storm git commit: Merge branch '1.x-branch' into metrics_v2

Merge branch '1.x-branch' into metrics_v2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/57a50f36
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/57a50f36
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/57a50f36

Branch: refs/heads/1.x-branch
Commit: 57a50f36df058937587cc08d81c235efc8dc720a
Parents: b8de0f3 c4a09d3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Aug 30 17:07:02 2017 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Aug 30 17:07:02 2017 -0400

----------------------------------------------------------------------
 .travis.yml                                     |    5 +-
 CHANGELOG.md                                    | 1480 ------------------
 DEVELOPER.md                                    |   11 +-
 bin/storm.cmd                                   |    7 +
 bin/storm.ps1                                   |   68 +
 bin/storm.py                                    |    9 +-
 conf/storm-env.ps1                              |   23 +
 dev-tools/jira-github-join.py                   |    2 +-
 dev-tools/jira/__init__.py                      |  285 ----
 dev-tools/jira_github/__init__.py               |  285 ++++
 dev-tools/release_notes.py                      |  118 ++
 dev-tools/report/report.py                      |    2 +-
 docs/Metrics.md                                 |  201 ++-
 docs/SECURITY.md                                |   40 +-
 docs/cgroups_in_storm.md                        |   71 -
 docs/distcache-blobstore.md                     |    8 +-
 docs/index.md                                   |    1 -
 docs/storm-kafka-client.md                      |  147 +-
 examples/storm-elasticsearch-examples/pom.xml   |   10 +
 .../elasticsearch/bolt/EsIndexTopology.java     |    6 +-
 .../storm/elasticsearch/common/EsTestUtil.java  |   14 +-
 .../trident/TridentEsTopology.java              |    3 +-
 .../TridentKafkaClientWordCountNamedTopics.java |   27 +-
 ...identKafkaClientWordCountWildcardTopics.java |    5 +-
 .../java/org/apache/storm/flux/TCKTest.java     |    9 +
 .../resources/configs/substitution-test.yaml    |    3 +-
 .../src/test/resources/configs/test.properties  |    1 +
 .../backends/trident/TestPlanCompiler.java      |    2 +-
 .../apache/storm/common/AbstractAutoCreds.java  |   27 +-
 .../apache/storm/hbase/security/AutoHBase.java  |   39 +-
 .../apache/storm/hdfs/security/AutoHDFS.java    |   41 +-
 .../apache/storm/hive/security/AutoHive.java    |   39 +-
 external/storm-elasticsearch/pom.xml            |   44 +-
 .../DefaultEsLookupResultOutput.java            |   62 +
 .../elasticsearch/ElasticsearchGetRequest.java  |   36 -
 .../elasticsearch/EsLookupResultOutput.java     |    5 +-
 .../elasticsearch/bolt/AbstractEsBolt.java      |   53 +-
 .../storm/elasticsearch/bolt/EsIndexBolt.java   |   31 +-
 .../storm/elasticsearch/bolt/EsLookupBolt.java  |   49 +-
 .../elasticsearch/bolt/EsPercolateBolt.java     |   41 +-
 .../common/DefaultEsTupleMapper.java            |   19 +
 .../storm/elasticsearch/common/EsConfig.java    |  130 +-
 .../elasticsearch/common/EsTupleMapper.java     |    9 +
 .../common/StormElasticSearchClient.java        |   37 +-
 .../common/TransportAddresses.java              |   72 -
 .../apache/storm/elasticsearch/doc/Index.java   |   69 +
 .../storm/elasticsearch/doc/IndexDoc.java       |   43 +
 .../storm/elasticsearch/doc/IndexItem.java      |   91 ++
 .../storm/elasticsearch/doc/IndexItemDoc.java   |   42 +
 .../apache/storm/elasticsearch/doc/Shards.java  |   63 +
 .../storm/elasticsearch/doc/SourceDoc.java      |   43 +
 .../response/BulkIndexResponse.java             |   80 +
 .../elasticsearch/response/LookupResponse.java  |   63 +
 .../response/PercolateResponse.java             |   85 +
 .../storm/elasticsearch/trident/EsState.java    |   72 +-
 .../elasticsearch/trident/EsStateFactory.java   |   15 +-
 .../bolt/AbstractEsBoltIntegrationTest.java     |   68 +-
 .../elasticsearch/bolt/AbstractEsBoltTest.java  |   15 +-
 .../elasticsearch/bolt/EsIndexBoltTest.java     |   44 +-
 .../bolt/EsLookupBoltIntegrationTest.java       |   75 +-
 .../elasticsearch/bolt/EsLookupBoltTest.java    |   67 +-
 .../elasticsearch/bolt/EsPercolateBoltTest.java |   62 +-
 .../elasticsearch/common/EsConfigTest.java      |   60 +-
 .../storm/elasticsearch/common/EsTestUtil.java  |  101 +-
 .../common/TransportAddressesTest.java          |   81 -
 .../trident/EsStateFactoryTest.java             |    2 +-
 .../elasticsearch/trident/EsStateTest.java      |   98 ++
 .../src/test/resources/log4j2.xml               |   33 +
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |   23 +-
 .../java/org/apache/storm/hdfs/bolt/Writer.java |   35 +
 .../storm/hdfs/common/AbstractHDFSWriter.java   |   16 +-
 .../org/apache/storm/jms/spout/JmsSpout.java    |  291 +++-
 .../apache/storm/jms/spout/JmsSpoutTest.java    |   81 +-
 .../kafka/spout/EmptyKafkaTupleListener.java    |   53 +
 .../apache/storm/kafka/spout/KafkaSpout.java    |   15 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  574 ++++---
 .../storm/kafka/spout/KafkaTupleListener.java   |   83 +
 .../spout/ManualPartitionNamedSubscription.java |   78 -
 .../ManualPartitionPatternSubscription.java     |   76 -
 .../spout/ManualPartitionSubscription.java      |   72 +
 .../storm/kafka/spout/ManualPartitioner.java    |    4 +-
 .../storm/kafka/spout/NamedSubscription.java    |    4 +-
 .../storm/kafka/spout/NamedTopicFilter.java     |   68 +
 .../storm/kafka/spout/PatternSubscription.java  |    4 +-
 .../storm/kafka/spout/PatternTopicFilter.java   |   70 +
 .../kafka/spout/SerializableDeserializer.java   |    6 +-
 .../apache/storm/kafka/spout/Subscription.java  |    2 +-
 .../apache/storm/kafka/spout/TopicFilter.java   |   38 +
 .../internal/KafkaConsumerFactoryDefault.java   |    3 +-
 .../kafka/spout/internal/OffsetManager.java     |    9 +-
 .../spout/trident/KafkaTridentSpoutManager.java |    3 +-
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  131 ++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |   17 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   53 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   37 +-
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  116 ++
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   10 +-
 .../storm/kafka/spout/NamedTopicFilterTest.java |   70 +
 .../kafka/spout/PatternTopicFilterTest.java     |   75 +
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   10 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |   87 +
 .../SingleTopicKafkaSpoutConfiguration.java     |   48 +-
 .../test/KafkaSpoutTopologyMainNamedTopics.java |    4 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |    4 +-
 integration-test/config/install-zookeeper.sh    |    2 +-
 integration-test/run-it.sh                      |    3 +-
 .../org/apache/storm/command/config_value.clj   |    6 +-
 .../src/clj/org/apache/storm/converter.clj      |   11 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |    4 +-
 .../clj/org/apache/storm/daemon/executor.clj    |   14 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   71 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |   15 +-
 storm-core/src/jvm/org/apache/storm/Config.java |   16 +-
 .../storm/cluster/StormClusterStateImpl.java    |   16 +-
 .../storm/daemon/supervisor/AdvancedFSOps.java  |   16 +-
 .../storm/daemon/supervisor/Container.java      |    6 +-
 .../daemon/supervisor/ReadClusterState.java     |    3 +
 .../apache/storm/daemon/supervisor/Slot.java    |   11 +
 .../storm/daemon/supervisor/Supervisor.java     |   16 +-
 .../daemon/supervisor/SupervisorUtils.java      |   11 +-
 .../daemon/supervisor/timer/UpdateBlobs.java    |   26 +-
 .../jvm/org/apache/storm/drpc/DRPCSpout.java    |    6 -
 .../org/apache/storm/generated/Assignment.java  |  114 +-
 .../apache/storm/generated/LocalAssignment.java |  114 +-
 .../org/apache/storm/generated/StormBase.java   |  114 +-
 .../apache/storm/localizer/AsyncLocalizer.java  |   37 +-
 .../jvm/org/apache/storm/scheduler/Cluster.java |   85 +-
 .../apache/storm/scheduler/TopologyDetails.java |   35 +-
 .../multitenant/MultitenantScheduler.java       |    2 +-
 .../storm/security/INimbusCredentialPlugin.java |   24 +-
 .../security/auth/ICredentialsRenewer.java      |   18 +-
 .../storm/security/auth/kerberos/AutoTGT.java   |    6 +-
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   31 +-
 .../org/apache/storm/utils/DisruptorQueue.java  |   70 +-
 .../org/apache/storm/utils/ObjectReader.java    |   58 +
 .../src/jvm/org/apache/storm/utils/Utils.java   |   52 +-
 storm-core/src/py/storm/ttypes.py               |   47 +-
 storm-core/src/storm.thrift                     |    5 +
 .../test/clj/org/apache/storm/cluster_test.clj  |    8 +-
 .../scheduler/multitenant_scheduler_test.clj    |   77 +-
 .../scheduler/resource_aware_scheduler_test.clj |   49 +-
 .../clj/org/apache/storm/scheduler_test.clj     |   10 +-
 .../test/jvm/org/apache/storm/ConfigTest.java   |   92 ++
 .../storm/daemon/supervisor/ContainerTest.java  |    6 +-
 .../storm/localizer/AsyncLocalizerTest.java     |    8 +-
 .../org/apache/storm/scheduler/ClusterTest.java |  111 ++
 .../resource/TestResourceAwareScheduler.java    |  183 ++-
 .../storm/scheduler/resource/TestUser.java      |    7 +-
 .../TestUtilsForResourceAwareScheduler.java     |   34 +-
 .../eviction/TestDefaultEvictionStrategy.java   |  118 +-
 .../TestDefaultResourceAwareStrategy.java       |    8 +-
 .../org/apache/storm/utils/ConfigUtilsTest.java |   98 ++
 .../jvm/org/apache/storm/utils/UtilsTest.java   |   84 +
 storm-dist/binary/src/main/assembly/binary.xml  |   10 +-
 .../src/main/resources/resources/storm.py       |    2 +-
 155 files changed, 5523 insertions(+), 3666 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 95e43f6,52063fc..250ace1
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -233,10 -231,7 +233,10 @@@
                                    (str "executor"  executor-id "-send-queue")
                                    (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                    (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
 +                                  (.getStormId worker-context)
 +                                  component-id
 +                                  (.getThisWorkerPort worker-context)
-                                   :producer-type :single-threaded
+                                   :producer-type :multi-threaded
                                    :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                    :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
          ]
@@@ -820,9 -811,8 +822,9 @@@
                           (let [delta (tuple-time-delta! tuple)]
                             (when debug? 
                               (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
 +                           (.mark  ^Meter (:acked-meter (:executor-data task-data)))
                             (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
-                            (when delta
+                            (when (<= 0 delta)
                               (stats/bolt-acked-tuple! executor-stats
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)
@@@ -836,9 -826,8 +838,9 @@@
                                 debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                             (when debug? 
                               (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
 +                           (.mark  ^Meter (:failed-meter (:executor-data task-data)))
                             (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
-                            (when delta
+                            (when (<= 0 delta)
                               (stats/bolt-failed-tuple! executor-stats
                                                         (.getSourceComponent tuple)
                                                         (.getSourceStreamId tuple)

http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/57a50f36/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index 5c0a2fb,5fd4b84..ca8568c
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@@ -34,9 -34,6 +34,8 @@@ import com.lmax.disruptor.dsl.ProducerT
  import org.apache.storm.Config;
  import org.apache.storm.metric.api.IStatefulObject;
  import org.apache.storm.metric.internal.RateTracker;
 +import org.apache.storm.metrics2.DisruptorMetrics;
 +import org.apache.storm.metrics2.StormMetricRegistry;
- import org.apache.storm.task.WorkerTopologyContext;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -62,19 -59,18 +61,19 @@@ import java.util.concurrent.locks.Reent
   * the ability to catch up to the producer by processing tuples in batches.
   */
  public class DisruptorQueue implements IStatefulObject {
--    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);    
++    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
      private static final Object INTERRUPT = new Object();
      private static final String PREFIX = "disruptor-";
      private static final FlusherPool FLUSHER = new FlusherPool();
--    
 +    private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true);
++
      private static int getNumFlusherPoolThreads() {
          int numThreads = 100;
          try {
--        	Map<String, Object> conf = Utils.readStormConfig();
--        	numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
++            Map<String, Object> conf = Utils.readStormConfig();
++            numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads);
          } catch (Exception e) {
--        	LOG.warn("Error while trying to read system config", e);
++            LOG.warn("Error while trying to read system config", e);
          }
          try {
              String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads));
@@@ -86,8 -82,8 +85,8 @@@
          return numThreads;
      }
  
--    private static class FlusherPool { 
--    	private static final String THREAD_PREFIX = "disruptor-flush";
++    private static class FlusherPool {
++        private static final String THREAD_PREFIX = "disruptor-flush";
          private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true);
          private ThreadPoolExecutor _exec;
          private HashMap<Long, ArrayList<Flusher>> _pendingFlush = new HashMap<>();
@@@ -201,8 -197,8 +200,8 @@@
              if (block) {
                  _flushLock.lock();
              } else if (!_flushLock.tryLock()) {
--               //Someone else if flushing so don't do anything
--               return;
++                //Someone else if flushing so don't do anything
++                return;
              }
              try {
                  while (!_overflow.isEmpty()) {
@@@ -256,7 -252,7 +255,7 @@@
                      }
                  }
  
--                if (!flushed) {        
++                if (!flushed) {
                      _overflow.add(_currentBatch);
                      _currentBatch = new ArrayList<Object>(_inputBatchSize);
                  }
@@@ -276,8 -272,8 +275,8 @@@
              if (block) {
                  _flushLock.lock();
              } else if (!_flushLock.tryLock()) {
--               //Someone else if flushing so don't do anything
--               return;
++                //Someone else if flushing so don't do anything
++                return;
              }
              try {
                  while (!_overflow.isEmpty()) {
@@@ -349,11 -345,9 +348,17 @@@
              return (1.0F * population() / capacity());
          }
  
 +        public double arrivalRate(){
 +            return _rateTracker.reportRate();
 +        }
 +
 +        public double sojournTime(){
++            return tuplePopulation.get() / Math.max(arrivalRate(), 0.00001) * 1000.0;
++        }
++
+         public Object getState() {
+             Map state = new HashMap<String, Object>();
+ 
              // get readPos then writePos so it's never an under-estimate
              long rp = readPos();
              long wp = writePos();
@@@ -393,8 -393,7 +404,8 @@@
      private final int _inputBatchSize;
      private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>();
      private final Flusher _flusher;
-     private final QueueMetrics _metrics; // old metrics API
+     private final QueueMetrics _metrics;
 +    private final DisruptorMetrics _disruptorMetrics;
  
      private String _queueName = "";
      private DisruptorBackpressureCallback _cb = null;
@@@ -402,9 -401,10 +413,10 @@@
      private int _lowWaterMark = 0;
      private boolean _enableBackpressure = false;
      private final AtomicLong _overflowCount = new AtomicLong(0);
+     private final AtomicLong tuplePopulation = new AtomicLong(0);
      private volatile boolean _throttleOn = false;
  
 -    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) {
 +    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) {
          this._queueName = PREFIX + queueName;
          WaitStrategy wait;
          if (readTimeout <= 0) {
@@@ -425,13 -424,6 +437,12 @@@
  
          _flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
          _flusher.start();
- 
 +        METRICS_TIMER.schedule(new TimerTask(){
 +            @Override
 +            public void run() {
 +                _disruptorMetrics.set(_metrics);
 +            }
-         }, 15000, 15000); // TODO: Configurable interval
++        }, 15000, 15000);
      }
  
      public String getName() {
@@@ -619,4 -612,4 +631,4 @@@
      public QueueMetrics getMetrics() {
          return _metrics;
      }
--}
++}