You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/31 02:23:19 UTC

[4/8] git commit: merge

merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b47932d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b47932d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b47932d8

Branch: refs/heads/master
Commit: b47932d899917a8a2d1a5d9a671a448b147a7432
Parents: 7d5b348 8da9572
Author: Boris Aksenov <ak...@corp.finam.ru>
Authored: Sat Jul 5 16:02:32 2014 +0400
Committer: Boris Aksenov <ak...@corp.finam.ru>
Committed: Sat Jul 5 16:02:32 2014 +0400

----------------------------------------------------------------------
 CHANGELOG.md                                    |  20 +
 DEVELOPER.md                                    |  30 +
 LICENSE                                         |  39 +-
 README.markdown                                 |  36 +-
 STORM-UI-REST-API.md                            | 546 +++++++++++++
 bin/storm                                       |  65 +-
 conf/defaults.yaml                              |   2 +
 conf/storm_env.ini                              |   2 +-
 examples/storm-starter/README.markdown          |  27 +-
 .../src/jvm/storm/starter/RollingTopWords.java  |  62 +-
 .../src/jvm/storm/starter/util/StormRunner.java |   8 +
 external/storm-kafka/README.md                  |  33 +-
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |   9 +-
 .../src/jvm/storm/kafka/Partition.java          |   9 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   2 +-
 .../storm/kafka/DynamicBrokersReaderTest.java   |  17 +
 .../src/test/storm/kafka/KafkaErrorTest.java    |  17 +
 .../src/test/storm/kafka/KafkaTestBroker.java   |  17 +
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  17 +
 .../storm/kafka/StringKeyValueSchemeTest.java   |  17 +
 .../src/test/storm/kafka/TestUtils.java         |  17 +
 .../src/test/storm/kafka/ZkCoordinatorTest.java |  17 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    |  17 +
 pom.xml                                         |  17 +-
 storm-core/pom.xml                              |   7 +
 .../src/clj/backtype/storm/LocalCluster.clj     |  76 +-
 storm-core/src/clj/backtype/storm/LocalDRPC.clj |   5 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |   9 +-
 storm-core/src/clj/backtype/storm/clojure.clj   |  10 +-
 storm-core/src/clj/backtype/storm/cluster.clj   | 431 ++++++-----
 storm-core/src/clj/backtype/storm/config.clj    | 145 ++--
 .../src/clj/backtype/storm/daemon/common.clj    |   2 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      | 134 ++--
 .../src/clj/backtype/storm/daemon/executor.clj  |   3 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj | 181 +++--
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  10 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  25 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  11 +-
 storm-core/src/clj/backtype/storm/disruptor.clj |  77 +-
 storm-core/src/clj/backtype/storm/event.clj     |  49 +-
 storm-core/src/clj/backtype/storm/log.clj       |  22 +-
 .../clj/backtype/storm/process_simulator.clj    |  27 +-
 storm-core/src/clj/backtype/storm/stats.clj     | 289 +++----
 storm-core/src/clj/backtype/storm/testing.clj   | 546 ++++++-------
 storm-core/src/clj/backtype/storm/testing4j.clj |  58 +-
 storm-core/src/clj/backtype/storm/thrift.clj    | 255 +++---
 storm-core/src/clj/backtype/storm/timer.clj     | 106 +--
 storm-core/src/clj/backtype/storm/tuple.clj     |   7 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   | 605 ++++++++-------
 .../src/clj/backtype/storm/ui/helpers.clj       |   5 +
 storm-core/src/clj/backtype/storm/util.clj      | 767 ++++++++++---------
 storm-core/src/clj/backtype/storm/zookeeper.clj | 158 ++--
 .../src/dev/resources/tester_bolt_metrics.py    |  35 +
 .../src/dev/resources/tester_spout_metrics.py   |  51 ++
 storm-core/src/jvm/backtype/storm/Config.java   |  29 +
 .../jvm/backtype/storm/generated/ErrorInfo.java | 188 ++++-
 .../netty/NettyRenameThreadFactory.java         |  19 +-
 .../metric/api/rpc/AssignableShellMetric.java   |  30 +
 .../metric/api/rpc/CombinedShellMetric.java     |  31 +
 .../storm/metric/api/rpc/CountShellMetric.java  |  38 +
 .../storm/metric/api/rpc/IShellMetric.java      |  31 +
 .../metric/api/rpc/ReducedShellMetric.java      |  32 +
 .../jvm/backtype/storm/multilang/BoltMsg.java   |  17 +
 .../backtype/storm/multilang/ISerializer.java   |  17 +
 .../storm/multilang/JsonSerializer.java         |  27 +
 .../storm/multilang/NoOutputException.java      |  17 +
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  37 +
 .../jvm/backtype/storm/multilang/SpoutMsg.java  |  17 +
 .../jvm/backtype/storm/spout/ShellSpout.java    |  44 +-
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  47 +-
 .../backtype/storm/task/TopologyContext.java    |  35 +
 .../storm/testing/PythonShellMetricsBolt.java   |  32 +
 .../storm/testing/PythonShellMetricsSpout.java  |  35 +
 .../storm/ui/InvalidRequestException.java       |  20 +
 .../backtype/storm/utils/DisruptorQueue.java    | 100 ++-
 .../jvm/backtype/storm/utils/ShellProcess.java  |  46 +-
 .../trident/state/map/RemovableMapState.java    |  17 +
 .../topology/MasterBatchCoordinator.java        |   4 +
 storm-core/src/multilang/py/storm.py            |   3 +
 storm-core/src/py/storm/ttypes.py               |  28 +-
 storm-core/src/storm.thrift                     |   2 +
 storm-core/src/ui/public/component.html         |  38 +-
 storm-core/src/ui/public/css/style.css          |   1 +
 storm-core/src/ui/public/index.html             |  16 +
 storm-core/src/ui/public/js/visualization.js    |  20 +-
 .../templates/component-page-template.html      |  27 +-
 .../public/templates/index-page-template.html   |  16 +
 .../public/templates/json-error-template.html   |  16 +
 .../templates/topology-page-template.html       |  24 +-
 storm-core/src/ui/public/topology.html          |  21 +-
 .../test/clj/backtype/storm/cluster_test.clj    |   8 +-
 .../test/clj/backtype/storm/metrics_test.clj    | 206 +++--
 .../test/clj/backtype/storm/supervisor_test.clj |  45 +-
 .../storm/utils/DisruptorQueueTest.java         | 153 ++++
 storm-dist/binary/LICENSE                       |  39 +-
 95 files changed, 4731 insertions(+), 1991 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/disruptor.clj
index 72bf0a7,a723601..2bbeebd
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@@ -13,20 -13,20 +13,20 @@@
  ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
+ 
  (ns backtype.storm.disruptor
    (:import [backtype.storm.utils DisruptorQueue])
-   (:import [com.lmax.disruptor BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy BusySpinWaitStrategy])
 -  (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
 -            BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
++  (:import [com.lmax.disruptor BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy 
+             BusySpinWaitStrategy])
 +  (:import [com.lmax.disruptor.dsl ProducerType])
    (:require [clojure [string :as str]])
    (:require [clojure [set :as set]])
    (:use [clojure walk])
-   (:use [backtype.storm util log])
-   )
+   (:use [backtype.storm util log]))
  
  (def CLAIM-STRATEGY
 -  {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
 -   :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))})
 +  {:multi-threaded (ProducerType/MULTI)
-    :single-threaded (ProducerType/SINGLE)
-    })
++   :single-threaded (ProducerType/SINGLE)})
  
  (def WAIT-STRATEGY
    {:block (fn [] (BlockingWaitStrategy.))
@@@ -44,22 -42,23 +42,24 @@@
  
  ;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.
  ;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
- ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, 
+ ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
  ;; unblocking the consumer
- (defnk disruptor-queue [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+ (defnk disruptor-queue
+   [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
    (DisruptorQueue. queue-name
 -                   ((CLAIM-STRATEGY claim-strategy) buffer-size)
 +                   (CLAIM-STRATEGY claim-strategy)
 +                   buffer-size
-                    (mk-wait-strategy wait-strategy)
-                    ))
+                    (mk-wait-strategy wait-strategy)))
  
- (defn clojure-handler [afn]
+ (defn clojure-handler
+   [afn]
    (reify com.lmax.disruptor.EventHandler
-     (onEvent [this o seq-id batchEnd?]
-       (afn o seq-id batchEnd?)
-       )))
+     (onEvent
+       [this o seq-id batchEnd?]
+       (afn o seq-id batchEnd?))))
  
- (defmacro handler [& args]
+ (defmacro handler
+   [& args]
    `(clojure-handler (fn ~@args)))
  
  (defn publish

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index df2545f,932af16..25bfcf8
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@@ -24,48 -25,64 +24,66 @@@ import com.lmax.disruptor.InsufficientC
  import com.lmax.disruptor.RingBuffer;
  import com.lmax.disruptor.Sequence;
  import com.lmax.disruptor.SequenceBarrier;
 -import com.lmax.disruptor.SingleThreadedClaimStrategy;
 +import com.lmax.disruptor.TimeoutException;
  import com.lmax.disruptor.WaitStrategy;
 +import com.lmax.disruptor.dsl.ProducerType;
+ 
  import java.util.concurrent.ConcurrentLinkedQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.HashMap;
 -import java.util.Map;
  import backtype.storm.metric.api.IStatefulObject;
  
 -
  /**
   *
   * A single consumer queue that uses the LMAX Disruptor. They key to the performance is
   * the ability to catch up to the producer by processing tuples in batches.
   */
  public class DisruptorQueue implements IStatefulObject {
 -    static final Object FLUSH_CACHE = new Object();
 -    static final Object INTERRUPT = new Object();
 -    
 -    RingBuffer<MutableObject> _buffer;
 -    Sequence _consumer;
 -    SequenceBarrier _barrier;
 +    private static final Object FLUSH_CACHE = new Object();
 +    private static final Object INTERRUPT = new Object();
 +    private static final String PREFIX = "disruptor-";
 +
-     private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
-     private final HashMap<String, Object> state = new HashMap<String, Object>(4);
+     
 +
 +    private final String _queueName;
 +    private final RingBuffer<MutableObject> _buffer;
 +    private final Sequence _consumer;
 +    private final SequenceBarrier _barrier;
 +
      // TODO: consider having a threadlocal cache of this variable to speed up reads?
      volatile boolean consumerStartedFlag = false;
- 
 -    ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
++    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
++    
++    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
++    
+     
+     private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+     private final Lock readLock  = cacheLock.readLock();
+     private final Lock writeLock = cacheLock.writeLock();
 -    
 -    private static String PREFIX = "disruptor-";
 -    private String _queueName = "";
 -    
 -    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
 -         this._queueName = PREFIX + queueName;
 -        _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
 +    public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
++        this._queueName = PREFIX + queueName;
 +        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
-         _queueName = PREFIX + queueName;
          _consumer = new Sequence();
          _barrier = _buffer.newBarrier();
 -        _buffer.setGatingSequences(_consumer);
 -        if(claim instanceof SingleThreadedClaimStrategy) {
 +        _buffer.addGatingSequences(_consumer);
-         consumerStartedFlag = producerType == ProducerType.SINGLE;
++        if(producerType == ProducerType.SINGLE) {
+             consumerStartedFlag = true;
+         } else {
+             // make sure we flush the pending messages in cache first
+             try {
+                 publishDirect(FLUSH_CACHE, true);
+             } catch (InsufficientCapacityException e) {
+                 throw new RuntimeException("This code should be unreachable!", e);
+             }
+         }
      }
- 
+     
      public String getName() {
-         return _queueName;
+       return _queueName;
      }
- 
- 
+     
      public void consumeBatch(EventHandler<Object> handler) {
          consumeBatchToCursor(_barrier.getCursor(), handler);
      }
@@@ -85,12 -102,10 +103,12 @@@
              throw new RuntimeException(e);
          } catch (InterruptedException e) {
              throw new RuntimeException(e);
 +        } catch (TimeoutException e) {
 +            throw new RuntimeException(e);
          }
      }
- 
- 
+     
+     
      private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
          for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
              try {
@@@ -131,35 -146,49 +149,49 @@@
      public void tryPublish(Object obj) throws InsufficientCapacityException {
          publish(obj, false);
      }
 -    
 +
      public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-         if(consumerStartedFlag) {
-             final long id;
-             if(block) {
-                 id = _buffer.next();
-             } else {
-                 id = _buffer.tryNext(1);
+ 
+         boolean publishNow = consumerStartedFlag;
+ 
+         if (!publishNow) {
+             readLock.lock(); 
+             try {
+                 publishNow = consumerStartedFlag;
+                 if (!publishNow) {
+                     _cache.add(obj);
+                 }
+             } finally {
+                 readLock.unlock();
              }
-             final MutableObject m = _buffer.get(id);
-             m.setObject(obj);
-             _buffer.publish(id);
-         } else {
-             _cache.add(obj);
-             if(consumerStartedFlag) flushCache();
+         }
+         
+         if (publishNow) {
+             publishDirect(obj, block);
          }
      }
- 
-     public void consumerStarted() {
-         if(!consumerStartedFlag) {
-             consumerStartedFlag = true;
-             flushCache();
+     
+     private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+         final long id;
+         if(block) {
+             id = _buffer.next();
+         } else {
+             id = _buffer.tryNext(1);
          }
+         final MutableObject m = _buffer.get(id);
+         m.setObject(obj);
+         _buffer.publish(id);
      }
+     
+     public void consumerStarted() {
  
-     private void flushCache() {
-         publish(FLUSH_CACHE);
+         consumerStartedFlag = true;
+         
+         // Use writeLock to make sure all pending cache add opearation completed
+         writeLock.lock();
+         writeLock.unlock();
      }
- 
+     
      public long  population() { return (writePos() - readPos()); }
      public long  capacity()   { return _buffer.getBufferSize(); }
      public long  writePos()   { return _buffer.getCursor(); }