You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by vrozov <gi...@git.apache.org> on 2015/11/18 02:56:49 UTC

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

GitHub user vrozov opened a pull request:

    https://github.com/apache/incubator-apex-core/pull/173

    APEX-254 & APEX-269

    Introduce Abstract and Forwarding Reservoir classes. Provide concrete implementation of AbstractReservoir based on JCTools SpscArrayQueue.
    
    SpscArrayQueueReservoir allows to process 25 million records per second for ports with CONTAINER_LOCAL locality.
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vrozov/incubator-apex-core APEX-269

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-apex-core/pull/173.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #173
    
----
commit 8134affdd4bf509b49971a16112169b807fdbc95
Author: Vlad Rozov <v....@datatorrent.com>
Date:   2015-11-11T01:42:11Z

    APEX-254 - Introduce Abstract and Forwarding Reservoir classes.

commit c48a97892de8fc55012ea4fa138f70d7dc135b68
Author: Vlad Rozov <v....@datatorrent.com>
Date:   2015-11-11T01:43:06Z

    APEX-254 - Introduce Abstract and Forwarding Reservoir classes.

commit bcaec96019e48612fb0acce9b1523d89d5e6147e
Author: Vlad Rozov <v....@datatorrent.com>
Date:   2015-11-17T23:53:43Z

    APEX-269 #resolve Provide concrete implementation of AbstractReservoir based on SpscArrayQueue

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-160690609
  
    The larger queue size for Spsc and CircularBuffer Reservoirs is driven by the 10 milliseconds wait time as waits for not empty and not full conditions become more frequent with the smaller queue sizes. CPU utilization for smaller queue sizes does not mean lower cost of producing or consuming events, it is more frequent and overlapping sleeps both in the producer and in the consumer threads.
    
    Currently, 10 milliseconds sleep time is the default sleep time for the Reservoir consumer and can be configured using SPIN_MILLIS attribute. Sleep on the producer side is hardcoded to 10 milliseconds. After I recompiled Reservoirs with 0 sleep time in the producer and set SPIN_MILLIS to 0, Spsc averages 13 million events/s and Circular buffer averages 5 million events/s and both have >98% CPU utilization. This benchmark suggests that it is necessary to provide ability for an application to specify sleep time not only for the consumer but also for the producer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-157638918
  
    In this pull request, code in AbstractReservoir is copied from DefaultReservoir but history and attribution doesn't reflect it. DefaultReservoir is then renamed to ForwardingReservoir which has more new changes (rather than old code which is moved to AbstractReservoir). In past this is how we dealt with changes like this:
    1. Renamed (with git) the existing class to the abstract. This will preserve history. 
    2. Added a new extension. In this case ForwardingReservoir.
    
    This was done to preserve history and it maybe a little tedious but most of us have done it. This should be handled here as well. We cannot be selective about when to force such rules and when to ignore it. Also IMO preserving provenance of code is important.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159321063
  
    reviewing it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159474783
  
    For small queue sizes, ArrayBlockingQueueReservoir is the best option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#discussion_r45760480
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---
    @@ -0,0 +1,786 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.lang.reflect.Constructor;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.jctools.queues.MessagePassingQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Sink;
    +import com.datatorrent.netlet.util.CircularBuffer;
    +import com.datatorrent.netlet.util.UnsafeBlockingQueue;
    +import com.datatorrent.stram.tuple.Tuple;
    +
    +import static java.lang.Thread.sleep;
    +
    +/**
    + * Abstract Sweepable Reservoir implementation. Implements all methods of {@link SweepableReservoir} except
    + * {@link SweepableReservoir#sweep}. Classes that extend {@link AbstractReservoir} must implement
    + * {@link BlockingQueue} interface.
    + */
    +public abstract class AbstractReservoir implements SweepableReservoir, BlockingQueue<Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class);
    +  static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir";
    +  private static final String reservoirDefaultClassName = SpscArrayQueueReservoir.class.getName();
    +
    +  /**
    +   * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir} based on
    +   * {@link AbstractReservoir#reservoirClassNameProperty} property.
    +   * @param id reservoir identifier
    +   * @param capacity reservoir capacity
    +   * @return concrete implementation of {@link AbstractReservoir}
    +   */
    +  public static AbstractReservoir newReservoir(final String id, final int capacity)
    +  {
    +    String reservoirClassName = System.getProperty(reservoirClassNameProperty, reservoirDefaultClassName);
    +    if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
    --- End diff --
    
    Looks like what you have in else will cover all these cases isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-157748307
  
    @chandnisingh Please double check. There are 3 commits. The first one is to rename DefaultReservoir.java->AbstractReservoir.java, the second one to introduce AbstractReservoir and ForwardingReservoir classes along with concrete implementation of the AbstractReservoir that extends AbstractReservoir and delegates BlockingQueue implementation to the CircularBuffer and the last one where SpscArrayQueueReservoir and ArrayBlockingQueueReservoir are introduced along with test cases.
    
    Note that git still does not preserve history and behaves as if it internally squashing all 3 commits into a single one. If there is a better way to preserve history, please let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159398782
  
    @243826 Please bring the discussion here, I did not see anything in the e-mail that will justify -1 to this pull request. The only thing under question is changing InlineStream default queue size and it is not part of this pull request.
    
    SpscArrayQueueReservoir outperforms CircularBufferReservoir by 4x with the queue size set to 512000 and is on par or better with the default queue size. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159467113
  
    Started benchmark applications on the dev cluster with 1k, 4k, 32k, 64k, 128k, 256k, 512k and 1024k queue sizes (for benchmark application default is 32k):
    
    queue size    |     spsc events  | spsc cpu  |   circular events  |  circular cpu
    ------------------|---------------------|---------------|-----------------------|------------------|
         1k             |            94k       |       0.46% |           101k         |         0.5%    |
          4k            |           404k      |         1%    |           431k         |         1%       |
          32k          |            3.1m     |        8%     |           3.1m         |        14%      | 
          64k          |            6.1m     |        16%   |           4.2m         |         16%     |
         128k         |            10.8m   |         55%  |          5.5m          |          85%    |
         256k         |            12m      |         65%  |           4.8m         |           90%   |
         512k         |            19.6m   |          73% |           10.7m       |           99%   |
        1024k        |            22m      |          99% |            6.1m        |           99%  |
    
    smaller queue size causes more blocking waits both on producer and consumer sides.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.

On 11/18/15, 9:21 PM, "Vlad Rozov" <v....@datatorrent.com> wrote:

>Connecting fast producing operator to a slow consuming operator using
>CONTAINER_LOCAL port will be a bad application design decision anyway,
>as it will slowdown producer.

Hmmm. I do not agree with the bad design part. Some systems will
inherently be designed to work as fast as the slowest running operator
with the resource constraints in place. Besides It may not always be by
design. This could be a temporary surge. Thirdly - the work we did to make
sure that buffer server stalls the publisher just a few days ago would not
be needed if we decide regardless that this is a bad design decision.

>In this case it will be better to
>partition slow downstream operator. Do we currently support partitioning
>of an operator and deploying all partitions into the same container
>(CONTAINER_LOCAL)?
> What stream will be used for multiplexing?
>InlineStream only supports single not partitioned stream.

MuxStream. Even though Apex supports this - I had added this functionality
for a different reason - to simplify the logic via partitioning. In
reality - most distributed system developers will give more resources to
the same operator instead of adding even more overhead of parititioning
and unifying on the same machine.

The above are just to answer the questions you asked - the overarching
point I want to make is that the queue has a specific size because it’s
possible that at some point all the slots in the queue will be filled. In
asynchronous system such as Apex, even a very big queue will get filled
very often. So we have to be conservative on how big we allow it to grow.

At the same time - the benchmark that I had devised is for near ideal
conditions for operators. Both the operators are taking almost zero time
to create and process the events. In almost all useful applications the
operators are going to take non zero time and the optimization using SPSC
are going to result in insignificant, if not zero, improvement in most
applications' performance. On the contrary - the additional memory
requirements to get that performance will have instability due to OOM
errors as the slowest operator in the application will put back pressure
on faster upstream operators by filling up the queues.

TLDR; increasing queue size to increase the speed is not an option as it
will come at the cost of disproportionately large amount of RAM and
problems associated with needing a lot of RAM.

>
>Thank you,
>
>Vlad
>
>On 11/18/15 16:01, Chetan Narsude (cnarsude) wrote:
>> On 11/18/15, 2:43 PM, "Vlad Rozov" <v....@datatorrent.com> wrote:
>>
>>> Based on the current performance testing I plan to change default value
>>> of PortContext.QUEUE_CAPACITY from 1024 to 1<<19 (still need to do my
>>> homework and see where PortContext.QUEUE_CAPACITY is used).
>> There are 2 different aspects that need to be considered here. One is
>> speed of the messaging bus (that you are focusing on) and the second one
>> is the speed of the operator. What operator cannot process fast enough
>> sits in the queue and stresses the RAM. That¹s the reason the
>> queue_capacity default is kept low. I also have a suspicion that this
>>may
>> cause regression failure and hence is not binary compatible.
>>
>> ‹
>> Chetan
>>
>>
>>
>>> Thank you,
>>>
>>> Vlad
>>>
>>> On 11/18/15 11:23, Chetan Narsude (cnarsude) wrote:
>>>> What are we doing for size() being inaccurate with spsc?
>>>>
>>>> On 11/18/15, 8:55 AM, "vrozov" <gi...@git.apache.org> wrote:
>>>>
>>>>> Github user vrozov commented on the pull request:
>>>>>
>>>>>      
>>>>>
>>>>> 
>>>>>https://github.com/apache/incubator-apex-core/pull/173#issuecomment-15
>>>>>77
>>>>> 77
>>>>> 188
>>>>>    
>>>>>      There are 3 applications running on the dev cluster:
>>>>>      
>>>>>      SpscArrayQueueReservoir around 25 million tuples/s
>>>>>      CircularBufferReservoir - around 10 million tuples/s
>>>>>      ArrayBlockingQueueReservoir - around 1.5 million tuples/s
>>>>>
>>>>>
>>>>>
>>>>> ---
>>>>> If your project is set up for it, you can reply to this email and
>>>>>have
>>>>> your
>>>>> reply appear on GitHub as well. If your project does not have this
>>>>> feature
>>>>> enabled and wishes so, or if the feature is enabled but not working,
>>>>> please
>>>>> contact infrastructure at infrastructure@apache.org or file a JIRA
>>>>> ticket
>>>>> with INFRA.
>>>>> ---
>


Re: [GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by Vlad Rozov <v....@datatorrent.com>.
Connecting fast producing operator to a slow consuming operator using 
CONTAINER_LOCAL port will be a bad application design decision anyway, 
as it will slowdown producer. In this case it will be better to 
partition slow downstream operator. Do we currently support partitioning 
of an operator and deploying all partitions into the same container 
(CONTAINER_LOCAL)? What stream will be used for multiplexing? 
InlineStream only supports single not partitioned stream.

Thank you,

Vlad

On 11/18/15 16:01, Chetan Narsude (cnarsude) wrote:
> On 11/18/15, 2:43 PM, "Vlad Rozov" <v....@datatorrent.com> wrote:
>
>> Based on the current performance testing I plan to change default value
>> of PortContext.QUEUE_CAPACITY from 1024 to 1<<19 (still need to do my
>> homework and see where PortContext.QUEUE_CAPACITY is used).
> There are 2 different aspects that need to be considered here. One is
> speed of the messaging bus (that you are focusing on) and the second one
> is the speed of the operator. What operator cannot process fast enough
> sits in the queue and stresses the RAM. That¹s the reason the
> queue_capacity default is kept low. I also have a suspicion that this may
> cause regression failure and hence is not binary compatible.
>
> ‹
> Chetan
>
>
>
>> Thank you,
>>
>> Vlad
>>
>> On 11/18/15 11:23, Chetan Narsude (cnarsude) wrote:
>>> What are we doing for size() being inaccurate with spsc?
>>>
>>> On 11/18/15, 8:55 AM, "vrozov" <gi...@git.apache.org> wrote:
>>>
>>>> Github user vrozov commented on the pull request:
>>>>
>>>>      
>>>>
>>>> https://github.com/apache/incubator-apex-core/pull/173#issuecomment-1577
>>>> 77
>>>> 188
>>>>    
>>>>      There are 3 applications running on the dev cluster:
>>>>      
>>>>      SpscArrayQueueReservoir around 25 million tuples/s
>>>>      CircularBufferReservoir - around 10 million tuples/s
>>>>      ArrayBlockingQueueReservoir - around 1.5 million tuples/s
>>>>
>>>>
>>>>
>>>> ---
>>>> If your project is set up for it, you can reply to this email and have
>>>> your
>>>> reply appear on GitHub as well. If your project does not have this
>>>> feature
>>>> enabled and wishes so, or if the feature is enabled but not working,
>>>> please
>>>> contact infrastructure at infrastructure@apache.org or file a JIRA
>>>> ticket
>>>> with INFRA.
>>>> ---


Re: [GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
On 11/18/15, 2:43 PM, "Vlad Rozov" <v....@datatorrent.com> wrote:

>Based on the current performance testing I plan to change default value
>of PortContext.QUEUE_CAPACITY from 1024 to 1<<19 (still need to do my
>homework and see where PortContext.QUEUE_CAPACITY is used).

There are 2 different aspects that need to be considered here. One is
speed of the messaging bus (that you are focusing on) and the second one
is the speed of the operator. What operator cannot process fast enough
sits in the queue and stresses the RAM. That¹s the reason the
queue_capacity default is kept low. I also have a suspicion that this may
cause regression failure and hence is not binary compatible.

‹
Chetan



>
>Thank you,
>
>Vlad
>
>On 11/18/15 11:23, Chetan Narsude (cnarsude) wrote:
>> What are we doing for size() being inaccurate with spsc?
>>
>> On 11/18/15, 8:55 AM, "vrozov" <gi...@git.apache.org> wrote:
>>
>>> Github user vrozov commented on the pull request:
>>>
>>>     
>>> 
>>>https://github.com/apache/incubator-apex-core/pull/173#issuecomment-1577
>>>77
>>> 188
>>>   
>>>     There are 3 applications running on the dev cluster:
>>>     
>>>     SpscArrayQueueReservoir around 25 million tuples/s
>>>     CircularBufferReservoir - around 10 million tuples/s
>>>     ArrayBlockingQueueReservoir - around 1.5 million tuples/s
>>>
>>>
>>>
>>> ---
>>> If your project is set up for it, you can reply to this email and have
>>> your
>>> reply appear on GitHub as well. If your project does not have this
>>>feature
>>> enabled and wishes so, or if the feature is enabled but not working,
>>> please
>>> contact infrastructure at infrastructure@apache.org or file a JIRA
>>>ticket
>>> with INFRA.
>>> ---
>


Re: [GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by Vlad Rozov <v....@datatorrent.com>.
Queue size should only be used for monitoring/operational purposes and 
to estimate optimal PortContext.QUEUE_CAPACITY value. It is not used in 
Reservoir implementation to pull tuples from the Reservoir.

Based on the current performance testing I plan to change default value 
of PortContext.QUEUE_CAPACITY from 1024 to 1<<19 (still need to do my 
homework and see where PortContext.QUEUE_CAPACITY is used).

Thank you,

Vlad

On 11/18/15 11:23, Chetan Narsude (cnarsude) wrote:
> What are we doing for size() being inaccurate with spsc?
>
> On 11/18/15, 8:55 AM, "vrozov" <gi...@git.apache.org> wrote:
>
>> Github user vrozov commented on the pull request:
>>
>>     
>> https://github.com/apache/incubator-apex-core/pull/173#issuecomment-157777
>> 188
>>   
>>     There are 3 applications running on the dev cluster:
>>     
>>     SpscArrayQueueReservoir around 25 million tuples/s
>>     CircularBufferReservoir - around 10 million tuples/s
>>     ArrayBlockingQueueReservoir - around 1.5 million tuples/s
>>
>>
>>
>> ---
>> If your project is set up for it, you can reply to this email and have
>> your
>> reply appear on GitHub as well. If your project does not have this feature
>> enabled and wishes so, or if the feature is enabled but not working,
>> please
>> contact infrastructure at infrastructure@apache.org or file a JIRA ticket
>> with INFRA.
>> ---


Re: [GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by "Chetan Narsude (cnarsude)" <cn...@cisco.com>.
What are we doing for size() being inaccurate with spsc?

On 11/18/15, 8:55 AM, "vrozov" <gi...@git.apache.org> wrote:

>Github user vrozov commented on the pull request:
>
>    
>https://github.com/apache/incubator-apex-core/pull/173#issuecomment-157777
>188
>  
>    There are 3 applications running on the dev cluster:
>    
>    SpscArrayQueueReservoir around 25 million tuples/s
>    CircularBufferReservoir - around 10 million tuples/s
>    ArrayBlockingQueueReservoir - around 1.5 million tuples/s
>
>
>
>---
>If your project is set up for it, you can reply to this email and have
>your
>reply appear on GitHub as well. If your project does not have this feature
>enabled and wishes so, or if the feature is enabled but not working,
>please
>contact infrastructure at infrastructure@apache.org or file a JIRA ticket
>with INFRA.
>---


[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-157777188
  
    There are 3 applications running on the dev cluster:
    
    SpscArrayQueueReservoir around 25 million tuples/s
    CircularBufferReservoir - around 10 million tuples/s
    ArrayBlockingQueueReservoir - around 1.5 million tuples/s



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-apex-core/pull/173


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by tweise <gi...@git.apache.org>.
Github user tweise commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159729354
  
    Vlad, this is great data. How about selecting the default reservoir implementation based on the queue size?  ArrayBlockingQueueReservoir for small queue size, spsc for larger size? And leave the final say about the implementation to the user?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159959244
  
    Vlad thanks for the benchmarks for the different sizes. There is consistent increase in throughput with increase in queue capacities. This warrants a consideration even if it comes at the cost of higher cpu as we are always running in a multi-core environment. There may be other mitigation factors we may not have yet looked at to improve cpu usage.
    
    For a user the higher capacity would translate to a bigger peak memory size.  Why not account for this memory size when we ask for the container. So a 512k capacity, even with a generous 1000 bytes per tuple at full capacity would be 512mb which is not a lot (we already ask for the same as default for the buffers that are served). We would add this to the amount of memory we ask for a container. The user of course would have a choice to trim the queue size down if they wanted and we could also give them an option to specify this memory size just like we do for the buffers.
    
    If at different queue capacities a different implementation performs better we can have a default heuristic strategy that automatically picks the implementation that is better suited for that range based on our experience like as you mentioned ArrayBlockingQueue performs better for capacities in the order of 1k. This will free the user from knowing what to specify and having to specify it. For power users we could always provide an option to override this strategy and specify a specific implementation.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159438298
  
    I don't agree that it is not possible to use 512K queue size. Depending on tuple size, memory availability and processing requirements queue size of 512K or larger may be used by applications. Whether or not default queue size should be changed to larger value is discussable. Based on my experience and what I've heard, there is no good default value for the queue size and applications need to choose between slightly larger memory consumption or a performance boost. Note that 4x performance improvement of SpscArrayQueueReservoir over CircularBufferReservoir is for CircularBufferReservoir configured also for 512K queue size. With the default queue size, CircularBufferReservoir scales only to 3 million events per second.
    
    It is not clear why it is necessary to have exact size() and where do we pay price for loosing accuracy of the size(). Performance benchmark does not show any degradation for the CircularBufferReservoir that does not use size() and uses BlockingQueue interface.
    
    While benchmark is done under ideal conditions, it measures scalability of the Apex framework. SpscArrayQueueReservoir provides the infrastructure that is more scalable compared to CircularBufferReservoir and ArrayBlockingQueueReservoir. IMO, this should be sufficient to justify pulling the changes in.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by 243826 <gi...@git.apache.org>.
Github user 243826 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159369549
  
    -1 to this pull request due to the direction in which this is going (discussed in the emails).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by 243826 <gi...@git.apache.org>.
Github user 243826 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159751791
  
    It's not a good idea. It's micro optimization (of not a critical path) at the cost of complicated implementation for developers and complicated use for the users.
    
    In the benchmark that Vlad shared starting at 128k queue size we start seeing that the cost of receiving the event outweighs the cost of processing it - this is unsustainable practically unless we are doing next to none processing of the event. I even suspect that it's the same event that's being pushed as opposed to pushing a new event every time through SPSC (Vlad - do you have reuse_tuple or equivalent flag turned on?).  Practically a long before we reach that throughput with larger queue size - we breach through the barrier between the ContainerLocal and ThreadLocal. The timing is driven by the processing complexity of subscriber operator. At that point you may as well switch to ThreadLocal and enjoy much more and optimal throughput (the benchmark for ThreadLocal it also very academic).
    
    -
    Chetan
    
    
    
    From: Thomas Weise <no...@github.com>>
    Reply-To: apache/incubator-apex-core <re...@reply.github.com>>
    Date: Wednesday, November 25, 2015 at 12:54 PM
    To: apache/incubator-apex-core <in...@noreply.github.com>>
    Cc: Chetan Narsude <ch...@apache.org>>
    Subject: Re: [incubator-apex-core] APEX-254 & APEX-269 (#173)
    
    
    Vlad, this is great data. How about selecting the default reservoir implementation based on the queue size? ArrayBlockingQueueReservoir for small queue size, spsc for larger size? And leave the final say about the implementation to the user?
    
    -
    Reply to this email directly or view it on GitHub<https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159729354>.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by 243826 <gi...@git.apache.org>.
Github user 243826 commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159417794
  
    Vlad,
    
    512k is undesirable and will cause other problems when exercised (I discussed those in the email). The kind of code surgery needed with this pull request is unneeded when we cannot use 512k. The slight performance boost you claim to get is coming at the cost of losing accuracy of the size() call and constructs to fix that. I also said that even with 512k, the benchmark itself is done under ideal (unreal) conditions and will not boil down to  much if not none performance gain.
    
    -
    Chetan
    
    From: Vlad Rozov <no...@github.com>>
    Reply-To: apache/incubator-apex-core <re...@reply.github.com>>
    Date: Tuesday, November 24, 2015 at 12:40 PM
    To: apache/incubator-apex-core <in...@noreply.github.com>>
    Cc: Chetan Narsude <ch...@apache.org>>
    Subject: Re: [incubator-apex-core] APEX-254 & APEX-269 (#173)
    
    
    @243826<https://github.com/243826> Please bring the discussion here, I did not see anything in the e-mail that will justify -1 to this pull request. The only thing under question is changing InlineStream default queue size and it is not part of this pull request.
    
    SpscArrayQueueReservoir outperforms CircularBufferReservoir by 4x with the queue size set to 512000 and is on par or better with the default queue size.
    
    -
    Reply to this email directly or view it on GitHub<https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159398782>.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by PramodSSImmaneni <gi...@git.apache.org>.
Github user PramodSSImmaneni commented on the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#issuecomment-159442049
  
    What are the performance benchmarks for the SpscArrayQueueReservoir for smaller queue sizes. lets say 1k, 4k, 16k, 32k. In the benchmark, since the number of event/sec is the sustained load in steady state, why would a large queue size significantly change the result. There are no big environmental factors such as bursty network traffic since it is container local. Is it because of the vagaries of getting CPU time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-apex-core pull request: APEX-254 & APEX-269

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/incubator-apex-core/pull/173#discussion_r45792451
  
    --- Diff: engine/src/main/java/com/datatorrent/stram/engine/AbstractReservoir.java ---
    @@ -0,0 +1,786 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package com.datatorrent.stram.engine;
    +
    +import java.lang.reflect.Constructor;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +import org.jctools.queues.MessagePassingQueue;
    +import org.jctools.queues.SpscArrayQueue;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.datatorrent.api.Sink;
    +import com.datatorrent.netlet.util.CircularBuffer;
    +import com.datatorrent.netlet.util.UnsafeBlockingQueue;
    +import com.datatorrent.stram.tuple.Tuple;
    +
    +import static java.lang.Thread.sleep;
    +
    +/**
    + * Abstract Sweepable Reservoir implementation. Implements all methods of {@link SweepableReservoir} except
    + * {@link SweepableReservoir#sweep}. Classes that extend {@link AbstractReservoir} must implement
    + * {@link BlockingQueue} interface.
    + */
    +public abstract class AbstractReservoir implements SweepableReservoir, BlockingQueue<Object>
    +{
    +  private static final Logger logger = LoggerFactory.getLogger(AbstractReservoir.class);
    +  static final String reservoirClassNameProperty = "com.datatorrent.stram.engine.Reservoir";
    +  private static final String reservoirDefaultClassName = SpscArrayQueueReservoir.class.getName();
    +
    +  /**
    +   * Reservoir factory. Constructs concrete implementation of {@link AbstractReservoir} based on
    +   * {@link AbstractReservoir#reservoirClassNameProperty} property.
    +   * @param id reservoir identifier
    +   * @param capacity reservoir capacity
    +   * @return concrete implementation of {@link AbstractReservoir}
    +   */
    +  public static AbstractReservoir newReservoir(final String id, final int capacity)
    +  {
    +    String reservoirClassName = System.getProperty(reservoirClassNameProperty, reservoirDefaultClassName);
    +    if (reservoirClassName.equals(SpscArrayQueueReservoir.class.getName())) {
    --- End diff --
    
    No, they can't be constructed in the else clause due to "private static" declaration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---