You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/02/23 11:33:56 UTC

camel git commit: Added camel-distruptor docs to gitbook

Repository: camel
Updated Branches:
  refs/heads/master 5ab310241 -> 622e515d4


Added camel-distruptor docs to gitbook


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/622e515d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/622e515d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/622e515d

Branch: refs/heads/master
Commit: 622e515d44d86d699dc0eaebc0dd937015b4d944
Parents: 5ab3102
Author: Andrea Cosentino <an...@gmail.com>
Authored: Tue Feb 23 11:33:09 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Tue Feb 23 11:33:09 2016 +0100

----------------------------------------------------------------------
 .../src/main/docs/disruptor.adoc                | 350 +++++++++++++++++++
 docs/user-manual/en/SUMMARY.md                  |   1 +
 2 files changed, 351 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/622e515d/components/camel-disruptor/src/main/docs/disruptor.adoc
----------------------------------------------------------------------
diff --git a/components/camel-disruptor/src/main/docs/disruptor.adoc b/components/camel-disruptor/src/main/docs/disruptor.adoc
new file mode 100644
index 0000000..fcda340
--- /dev/null
+++ b/components/camel-disruptor/src/main/docs/disruptor.adoc
@@ -0,0 +1,350 @@
+[[Disruptor-DisruptorComponent]]
+Disruptor Component
+~~~~~~~~~~~~~~~~~~~
+
+*Available as of Camel 2.12*
+
+The *disruptor:* component provides asynchronous
+http://www.eecs.harvard.edu/~mdw/proj/seda/[SEDA] behavior much as the
+standard SEDA Component, but utilizes a
+https://github.com/LMAX-Exchange/disruptor[Disruptor] instead of a
+http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue]
+utilized by the standard link:seda.html[SEDA]. Alternatively, a
+
+*disruptor-vm:* endpoint is supported by this component, providing an
+alternative to the standard link:vm.html[VM]. As with the SEDA
+component, buffers of the *disruptor:* endpoints are only visible within
+a *single* link:camelcontext.html[CamelContext] and no support is
+provided for persistence or recovery. The buffers of the
+**disruptor-vm:** endpoints also provides support for communication
+across CamelContexts instances so you can use this mechanism to
+communicate across web applications (provided that *camel-disruptor.jar*
+is on the *system/boot* classpath).
+
+The main advantage of choosing to use the Disruptor Component over the
+SEDA or the VM Component is performance in use cases where there is high
+contention between producer(s) and/or multicasted or concurrent
+Consumers. In those cases, significant increases of throughput and
+reduction of latency has been observed. Performance in scenarios without
+contention is comparable to the SEDA and VM Components.
+
+The Disruptor is implemented with the intention of mimicing the
+behaviour and options of the SEDA and VM Components as much as possible.
+The main differences with the them are the following:
+
+* The buffer used is always bounded in size (default 1024 exchanges).
+* As a the buffer is always bouded, the default behaviour for the
+Disruptor is to block while the buffer is full instead of throwing an
+exception. This default behaviour may be configured on the component
+(see options).
+* The Disruptor enpoints don't implement the BrowsableEndpoint
+interface. As such, the exchanges currently in the Disruptor can't be
+retrieved, only the amount of exchanges.
+* The Disruptor requires its consumers (multicasted or otherwise) to be
+statically configured. Adding or removing consumers on the fly requires
+complete flushing of all pending exchanges in the Disruptor.
+* As a result of the reconfiguration: Data sent over a Disruptor is
+directly processed and 'gone' if there is at least one consumer, late
+joiners only get new exchanges published after they've joined.
+* The *pollTimeout* option is not supported by the Disruptor Component.
+* When a producer blocks on a full Disruptor, it does not respond to
+thread interrupts.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component:
+
+[source,java]
+------------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-disruptor</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+------------------------------------------------------------
+
+[[Disruptor-URIformat]]
+URI format
+^^^^^^^^^^
+
+[source,java]
+-----------------------------
+ disruptor:someName[?options]
+-----------------------------
+
+or
+
+[source,java]
+--------------------------------
+ disruptor-vm:someName[?options]
+--------------------------------
+
+Where **someName** can be any string that uniquely identifies the
+endpoint within the current link:camelcontext.html[CamelContext] (or
+across contexts in case of +
+ **disruptor-vm:**). +
+ You can append query options to the URI in the following format:
+
+[source,java]
+------------------------------
+  ?option=value&option=value&…
+------------------------------
+
+[[Disruptor-Options]]
+Options
+^^^^^^^
+
+All the following options are valid for both the **disruptor:** and
+**disruptor-vm:** components.
+
+[width="100%",cols="10%,10%,80%",options="header",]
+|=======================================================================
+|Name | Default | Description
+
+|size |1024 |The maximum capacity of the Disruptors ringbuffer. Will be effectively
+increased to the nearest power of two. *Notice:* Mind if you use this
+option, then its the first endpoint being created with the queue name,
+that determines the size. To make sure all endpoints use same size, then
+configure the size option on all of them, or the first endpoint being
+created.
+
+|bufferSize |  | *Component only:* The maximum default size (capacity of the number of
+messages it can hold) of the Disruptors ringbuffer. This option is used
+if size is not in use.
+
+|queueSize |  | *Component only:* Additional option to specify the <em>bufferSize</em>
+to maintain maximum compatibility with the link:seda.html[SEDA]
+Component.
+
+|concurrentConsumers |1 |Number of concurrent threads processing exchanges.
+
+|waitForTaskToComplete |IfReplyExpected |Option to specify whether the caller should wait for the async task to
+complete or not before continuing. The following three options are
+supported: _Always_, _Never_ or _IfReplyExpected_. The first two values
+are self-explanatory. The last value, _IfReplyExpected_, will only wait
+if the message is link:request-reply.html[Request Reply] based. See more
+information about link:async.html[Async] messaging.
+
+|timeout |30000 |Timeout (in milliseconds) before a producer will stop waiting for an
+asynchronous task to complete. See _waitForTaskToComplete_ and
+link:async.html[Async] for more details. You can disable timeout by
+using 0 or a negative value.
+
+|defaultMultipleConsumers |  | *Component only:* Allows to set the default allowance of multiple
+consumers for endpoints created by this component used when
+_multipleConsumers_ is not provided.
+
+|multipleConsumers |false |Specifies whether multiple consumers are allowed. If enabled, you can
+use Disruptor for http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern[Publish-Subscribe] messaging.
+That is, you can send a message to the SEDA queue and have each consumer
+receive a copy of the message. When enabled, this option should be
+specified on every consumer endpoint.
+
+|limitConcurrentConsumers |true |Whether to limit the number of concurrentConsumers to the maximum of
+500. By default, an exception will be thrown if a Disruptor endpoint is
+configured with a greater number. You can disable that check by turning
+this option off.
+
+|blockWhenFull |true |Whether a thread that sends messages to a full Disruptor will block
+until the ringbuffer's capacity is no longer exhausted. By default, the
+calling thread will block and wait until the message can be accepted. By
+disabling this option, an exception will be thrown stating that the
+queue is full.
+
+|defaultBlockWhenFull | | *Component only:* Allows to set the default producer behaviour when the
+ringbuffer is full for endpoints created by this comonent used when
+_blockWhenFull_ is not provided.
+
+|waitStrategy |Blocking |Defines the strategy used by consumer threads to wait on new exchanges
+to be published. The options allowed are:_Blocking_, _Sleeping_,
+_BusySpin_ and _Yielding_. Refer to the section below for more
+information on this subject
+
+|defaultWaitStrategy |   | *Component only:* Allows to set the default wait strategy for endpoints
+created by this comonent used when _waitStrategy_ is not provided.
+
+|producerType |Multi | Defines the producers allowed on the Disruptor. The options allowed are:
+_Multi_ to allow multiple producers and _Single_ to enable certain
+optimizations only allowed when one concurrent producer (on one thread
+or otherwise synchronized) is active.
+
+|defaultProducerType |  | *Component only:* Allows to set the default producer type for endpoints
+created by this comonent used when _producerType_ is not provided.
+|=======================================================================
+
+[[Disruptor-Waitstrategies]]
+Wait strategies
+^^^^^^^^^^^^^^^
+
+The wait strategy effects the type of waiting performed by the consumer
+threads that are currently waiting for the next exchange to be
+published. The following strategies can be chosen:
+
+[width="100%",cols="10%,45%,45%",options="header",]
+|=======================================================================
+|Name |Description |Advice
+
+|Blocking | Blocking strategy that uses a lock and condition variable for Consumers
+waiting on a barrier. | This strategy can be used when throughput and low-latency are not as
+important as CPU resource.
+
+|Sleeping |Sleeping strategy that initially spins, then uses a Thread.yield(), and
+eventually for the minimum number of nanos the OS and JVM will allow
+while the Consumers are waiting on a barrier. |This strategy is a good compromise between performance and CPU resource.
+Latency spikes can occur after quiet periods.
+
+|BusySpin |Busy Spin strategy that uses a busy spin loop for Consumers waiting on a
+barrier. |This strategy will use CPU resource to avoid syscalls which can
+introduce latency jitter. It is best used when threads can be bound to
+specific CPU cores.
+
+|Yielding |Yielding strategy that uses a Thread.yield() for Consumers waiting on a
+barrier after an initially spinning. |This strategy is a good compromise between performance and CPU resource
+without incurring significant latency spikes.
+|=======================================================================
+
+[[Disruptor-UseofRequestReply]]
+Use of Request Reply
+^^^^^^^^^^^^^^^^^^^^
+
+The Disruptor component supports using link:request-reply.html[Request
+Reply], where the caller will wait for the Async route to complete. For
+instance:
+
+[source,java]
+------------------------------------------------------------------------------
+from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input");
+from("disruptor:input").to("bean:processInput").to("bean:createResponse");
+------------------------------------------------------------------------------
+
+In the route above, we have a TCP listener on port 9876 that accepts
+incoming requests. The request is routed to the _disruptor:input_
+buffer. As it is a link:request-reply.html[Request Reply] message, we
+wait for the response. When the consumer on the _disruptor:input_ buffer
+is complete, it copies the response to the original message response.
+
+[[Disruptor-Concurrentconsumers]]
+Concurrent consumers
+^^^^^^^^^^^^^^^^^^^^
+
+By default, the Disruptor endpoint uses a single consumer thread, but
+you can configure it to use concurrent consumer threads. So instead of
+thread pools you can use:
+
+[source,java]
+--------------------------------------------------------------
+from("disruptor:stageName?concurrentConsumers=5").process(...)
+--------------------------------------------------------------
+
+As for the difference between the two, note a thread pool can
+increase/shrink dynamically at runtime depending on load, whereas the
+number of concurrent consumers is always fixed and supported by the
+Disruptor internally so performance will be higher.
+
+[[Disruptor-Threadpools]]
+Thread pools
+^^^^^^^^^^^^
+
+Be aware that adding a thread pool to a Disruptor endpoint by doing
+something like:
+
+[source,java]
+--------------------------------------------------
+from("disruptor:stageName").thread(5).process(...)
+--------------------------------------------------
+
+Can wind up with adding a normal
+http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue]
+to be used in conjunction with the Disruptor, effectively negating part
+of the performance gains achieved by using the Disruptor. Instead, it is
+advices to directly configure number of threads that process messages on
+a Disruptor endpoint using the concurrentConsumers option.
+
+[[Disruptor-Sample]]
+Sample
+^^^^^^
+
+In the route below we use the Disruptor to send the request to this
+async queue to be able to send a fire-and-forget message for further
+processing in another thread, and return a constant reply in this thread
+to the original caller.
+
+[source,java]
+-------------------------------------------------
+public void configure() throws Exception {
+    from("direct:start")
+        // send it to the disruptor that is async
+        .to("disruptor:next")
+        // return a constant response
+        .transform(constant("OK"));
+
+    from("disruptor:next").to("mock:result");
+}
+-------------------------------------------------
+
+Here we send a Hello World message and expects the reply to be OK.
+
+[source,java]
+-----------------------------------------------------------------
+Object out = template.requestBody("direct:start", "Hello World");
+assertEquals("OK", out);
+-----------------------------------------------------------------
+
+The "Hello World" message will be consumed from the Disruptor from
+another thread for further processing. Since this is from a unit test,
+it will be sent to a mock endpoint where we can do assertions in the
+unit test.
+
+[[Disruptor-UsingmultipleConsumers]]
+Using multipleConsumers
+^^^^^^^^^^^^^^^^^^^^^^^
+
+In this example we have defined two consumers and registered them as
+spring beans.
+
+[source,java]
+-------------------------------------------------------------------------------------------
+<!-- define the consumers as spring beans -->
+<bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/>
+
+<bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/>
+
+<camelContext xmlns="http://camel.apache.org/schema/spring">
+    <!-- define a shared endpoint which the consumers can refer to instead of using url -->
+    <endpoint id="foo" uri="disruptor:foo?multipleConsumers=true"/>
+</camelContext>
+-------------------------------------------------------------------------------------------
+
+Since we have specified multipleConsumers=true on the Disruptor foo
+endpoint we can have those two or more consumers receive their own copy
+of the message as a kind of pub-sub style messaging. As the beans are
+part of an unit test they simply send the message to a mock endpoint,
+but notice how we can use @Consume to consume from the Disruptor.
+
+[source,java]
+-------------------------------------------
+public class FooEventConsumer {
+
+    @EndpointInject(uri = "mock:result")
+    private ProducerTemplate destination;
+
+    @Consume(ref = "foo")
+    public void doSomething(String body) {
+        destination.sendBody("foo" + body);
+    }
+
+}
+-------------------------------------------
+
+[[Disruptor-Extractingdisruptorinformation]]
+Extracting disruptor information
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+If needed, information such as buffer size, etc. can be obtained without
+using JMX in this fashion:
+
+[source,java]
+--------------------------------------------------------------------
+DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx");
+int size = disruptor.getBufferSize();
+--------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/camel/blob/622e515d/docs/user-manual/en/SUMMARY.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index ca286cd..8590d75 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -111,6 +111,7 @@
 	* [Crypto](crypto.adoc)
 		* [Crypto Digital Signatures](crypto-digital-signatures.adoc)
 	* [CSV](csv.adoc)
+	* [Disruptor](disruptor.adoc)
 	* [JMS](jms.adoc)
 	* [Metrics](metrics.adoc)
 	* [Properties](properties.adoc)