You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by arunmahadevan <gi...@git.apache.org> on 2015/11/04 12:42:56 UTC

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

GitHub user arunmahadevan opened a pull request:

    https://github.com/apache/storm/pull/855

    STORM-1167: Add windowing support for storm core

    Currently, topologies that needs windowing support requires writing custom logic inside bolts making it tedious to handle the windowing and acking logic with custom logic.
    
    The PR proposes to add framework level support to core storm bolts to process tuples in a time or a count based window. Sliding and tumbling windows based on tuple count or time duration are supported.
    
    A new bolt interface is added for bolts that needs windowing support.
    
    ```
    public interface IWindowedBolt extends IComponent {
        void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
        /**
         * Process tuples falling within the window and optionally emit 
         * new tuples based on the tuples in the input window.
         */
        void execute(TupleWindow inputWindow);
        void cleanup();
    }
    ```
    
    `TupleWindow` gives access to the current tuples in the window, the tuples that expired and the new tuples that are added since last window was computed which will be useful for efficient windowing computations.
    
    The `TopologyBuilder` wraps the IWindowedBolt implementation in an internal bolt `WindowedBoltExecutor` (similar to `BasicBoltExecutor`), which receives the tuples and hands it off to IWindowedBolt when the window conditions are met. The tuples are automatically acked when they fall out of the window and the tuples in a window are automatically anchored with the tuple emitted from an IWindowedBolt to provide atleast once guarentee. An example topology `SlidingWindowTopology` shows how to use the apis to compute a sliding window sum.
    
    The tuples are tracked with the system timestamp when they are received by the bolt. Later we can add support for tuple based timestamps (e.g the ts when the tuple was generated). There is also scope for minimizing duplicates by tracking the evaluated tuples (based on some id) in a persistent store.
    
    Similiar windowing constructs needs to be added to storm trident apis as well. That can be addressed once the basic support to store core is added and the common logic can be reused.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/arunmahadevan/storm windowing

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #855
    
----
commit 77429d07d7c080962b8c414cf02ee93839b25ca2
Author: Arun Mahadevan <ai...@hortonworks.com>
Date:   2015-11-04T11:29:19Z

    STORM-1167: Add windowing support for storm core
    
    1. Added new interface IWindowedBolt and wrapper classes for bolts that need windowing support
    2. New constants for specifying the window length and sliding interval
    3. WindowManager and related classes that handles the windowing logic

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43896077
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +    }
    --- End diff --
    
    Can you put in a comment in all the empty methods, something like ```\\NOOP``` just so it is obvious that it is intended to be left blank.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976803
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.windowedOutputCollector = new WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.info("Initialized window manager {} ", this.windowManager);
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        windowManager.add(input);
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        windowManager.shutdown();
    +        bolt.cleanup();
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        bolt.declareOutputFields(declarer);
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        return bolt.getComponentConfiguration();
    +    }
    +
    +    private WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
    +        return new WindowLifecycleListener<Tuple>() {
    +            @Override
    +            public void onExpiry(List<Tuple> tuples) {
    +                for (Tuple tuple : tuples) {
    +                    windowedOutputCollector.ack(tuple);
    +                }
    +            }
    +
    +            @Override
    +            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
    +                windowedOutputCollector.setContext(tuples);
    +                bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples));
    +                newTuples.clear();
    --- End diff --
    
    Yes, here the caller should clear. I think I forgot to remove this after I refactored the code a bit. Will remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43893861
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    +        }
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("sum"));
    +        }
    +
    +    }
    +
    +
    +    public static void main(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +        builder.setSpout("integer", new RandomIntegerSpout(), 1);
    +        builder.setBolt("window", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)),
    +                        //new SlidingWindowSumBolt().withTumblingWindow(new Duration(20, TimeUnit.SECONDS))
    --- End diff --
    
    If we want a separate example with a TumblingWindow we should probably have that instead of a comment in the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43918519
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    --- End diff --
    
    Same as above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44016958
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/EvictionPolicy.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +/**
    + * Eviction policy tracks events and decides whether
    + * an event should be evicted from the window or not.
    + *
    + * @param <T> the type of event that is tracked.
    + */
    +public interface EvictionPolicy<T> {
    +    /**
    +     * returns true if the event should be evicted from the window.
    --- End diff --
    
    Could we make this a proper javadoc with an `@return` and `@param`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976021
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +    }
    --- End diff --
    
    Sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44016430
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private final transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    --- End diff --
    
    I don't see a lot of value in having a Count or a Duration class, Count especially.  They either wrap holding an int that is quickly unwrapped, or converting a timeUnit to millis, which too is quickly unwrapped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-156171771
  
    @harshach any update on your review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153972196
  
    @harshach Storing tuples in a persistent store would mean we don't rely completely on storm's acking mechanism and build an independent logic which could complicate things. To start with we can limit the max window size to limit the memory. We can track the tuple store for windowing in a separate jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-158301323
  
    Thanks @arunmahadevan merged into trunk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153963681
  
    @arunmahadevan my comment was regarding state storage like @haohui pointed out. Yes we can do this in follow-up jira.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43895419
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    --- End diff --
    
    This this marked as transient, do the with... methods even work?  It will not be serialized and it looks like other configs override it anyways, 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154008535
  
    @harshach created subtasks to track items discussed
    https://issues.apache.org/jira/browse/STORM-1175 
    https://issues.apache.org/jira/browse/STORM-1176 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44009081
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/Event.java ---
    @@ -0,0 +1,41 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +/**
    + * An event is a wrapper object that gets stored in the window.
    + *
    + * @param <T> the type of the object thats wrapped. E.g Tuple
    + */
    +interface Event<T> {
    +    /**
    +     * The event timestamp in millis. This could be the time
    +     * when the source generated the tuple or if not the time
    +     * when the tuple was received by a bolt.
    +     *
    +     * @return
    +     */
    +    long getTs();
    --- End diff --
    
    Can we rename getTs as getTimestamp?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43895851
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java ---
    @@ -0,0 +1,68 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * An eviction policy that tracks event counts and can
    + * evict based on a threshold count.
    + *
    + * @param <T> the type of event tracked by this policy.
    + */
    +public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
    +    private final int threshold;
    +    private AtomicInteger currentCount;
    --- End diff --
    
    This should probably be a final too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/855


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975101
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    --- End diff --
    
    I will add more comments in the example topology explaining the details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43932253
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    --- End diff --
    
    I am not sure why we can't just have instance variable for all the configs? Why does it need to go to a transient map?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154085743
  
    I when through the code in more detail and I had a few minor nits, but also a few issues that I am concerned about.
    
    Primarily error handling when using a separate thread for timed expiry. But fundamentally I am concerned that the windowing functionality assumes that arrival time to the bolt is the only time that people will want to do windowed intervals over, which is just not true.
    
    This potentially opens up a whole can of worms the more I think about it, and I am not sure that this is the best way to do windowing at scale if we want to be able to support late data, out of order arrival, and event creation time as a part of the window.
    
    Additionally I would like to see more documentation about how to use this functionality, not just a simple example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975030
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    --- End diff --
    
    Yes, I overlooked this when I copied the code from another bolt. Will fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r45579748
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/EventImpl.java ---
    @@ -0,0 +1,38 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +class EventImpl<T> implements Event<T> {
    --- End diff --
    
    Agree with your comments for value types and final classes. When I wrote, I was not sure whether we would need another implementation and it turns out that we need another implementation (watermark events) for tracking tuple timestamps.
    If unsure I feel it might be better to err by having interfaces. It makes it easier to write mock implementation for testing and makes the apis more readable without cluttering with the implementation details. (see below link for more details).
    
    http://programmers.stackexchange.com/questions/159813/do-i-need-to-use-an-interface-when-only-one-class-will-ever-implement-it/159909#159909


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894940
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    --- End diff --
    
    I think this is trying to show an optimization where we dont calculate the entire sum each time, instead we just add the new values and subtract the expired one. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by hmcl <gi...@git.apache.org>.
Github user hmcl commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r45482446
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java ---
    @@ -0,0 +1,26 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +/**
    + * A {@link Window} that contains {@link Tuple} objects.
    + */
    +public interface TupleWindow extends Window<Tuple> {
    --- End diff --
    
    I would rather have execute(Window<Tuple> input) than execute(TupleWindow window). That is the whole purpose of generics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43895967
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +    }
    +
    +    @Override
    +    public void execute(TupleWindow inputWindow) {
    +
    --- End diff --
    
    Wouldn't it be better to have this abstract?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976144
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    --- End diff --
    
    Its overridden by bolt level component configuration. So users can have two bolts with different intervals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43904817
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    --- End diff --
    
    Oh I see you have the `getNew()` and the `getExpired()`.  OK It would be good to add a comment on that, because it was very confusing to me when I just glanced through the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153962977
  
    @haohui Thanks for the feedback and yes your points are valid. We will incrementally add a state store for partial results and the window state and reduce window re-evaluation during replay. However, I am not sure if the tuples in window should spill over to a persistent store. I think we should rely on storm's acking mechanism to replay the events and expect that the spouts can reliable replay the events from a persistent store (e.g. kafka). This is consistent with storm's idea of not persisting in-transit tuples anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894843
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.windowedOutputCollector = new WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.info("Initialized window manager {} ", this.windowManager);
    --- End diff --
    
    This looks more like a debug statement to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153961998
  
    @harshach  Right now the behavior is such that in case of failures the events will get replayed and the results will be recomputed and there will be potentially many duplicates. I assume you mean providing apis to save the partial results of computation in some persistent storage and support for retrieving the state. Along with this we also need to checkpoint the evaluated state of the window in the background and use this info to avoid re-evaluating window when events get replayed. This will help in reducing the duplicates. Since its more involved work, I will raise separate jiras to track this and will add it incrementally. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44106905
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,224 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private int getMaxSpoutPending(Map stormConf) {
    +        int maxPending = Integer.MAX_VALUE;
    +        if (stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) {
    +            maxPending = ((Number) stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
    +        }
    +        return maxPending;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    private void ensureCountLessThanMaxPending(int count, int maxPending) {
    +        if (count > maxPending) {
    +            throw new IllegalArgumentException("Window count (length + sliding interval) value " + count +
    +                                                       " is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING +
    +                                                       " value " + maxPending);
    +        }
    +    }
    +
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        int maxSpoutPending = getMaxSpoutPending(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        } else if (slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout);
    +        }
    +
    +        if (windowLengthCount != null && slidingIntervalCount != null) {
    +            ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
    +        } else if (windowLengthCount != null) {
    +            ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
    +        } else if (slidingIntervalCount != null) {
    +            ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.windowedOutputCollector = new WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.debug("Initialized window manager {} ", this.windowManager);
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        windowManager.add(input);
    --- End diff --
    
    I added it with the idea of supporting late events, but realized that it will need more work and thought it can be added once the basic windowing is in place. I will file a follow on jira and work on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975856
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    --- End diff --
    
    I added new constants in Config and overriding it here via the componentConfiguration. This gives a way to define default values for these params at topology level if needed and override it at per bolt level. Defining it in same place with other params makes it more consistent and enables us to reuse the constants for trident as well. I think defining it in BaseWindowedBolt makes it more specific to core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894359
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    --- End diff --
    
    minor nit. The `<p/>` is not in other license files, I think IntelleJ is adding it in for you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894535
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    --- End diff --
    
    If we need more validation we should add it in.  If we don't we should remove this comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43977228
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
    + * on expiry of events or activation of the window due to {@link TriggerPolicy}.
    + *
    + * @param <T> the type of event in the window.
    + */
    +public class WindowManager<T> implements TriggerHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
    +
    +    /**
    +     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
    +     * keep the window size in check.
    +     */
    +    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    +
    +    private WindowLifecycleListener<T> windowLifecycleListener;
    +    private ConcurrentLinkedQueue<Event<T>> window;
    +    private EvictionPolicy<T> evictionPolicy;
    +    private TriggerPolicy<T> triggerPolicy;
    +    private List<T> expiredEvents;
    +    private Set<Event<T>> prevWindowEvents;
    +    private AtomicInteger eventsSinceLastExpiry;
    +    private ReentrantLock lock;
    +
    +    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
    +        windowLifecycleListener = lifecycleListener;
    +        window = new ConcurrentLinkedQueue<>();
    +        expiredEvents = new ArrayList<>();
    +        prevWindowEvents = new HashSet<>();
    +        eventsSinceLastExpiry = new AtomicInteger();
    +        lock = new ReentrantLock(true);
    +    }
    +
    +    public void setWindowLength(Count count) {
    +        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
    +    }
    +
    +    public void setWindowLength(Duration duration) {
    +        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
    +    }
    +
    +    public void setSlidingInterval(Count count) {
    +        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
    +    }
    +
    +    public void setSlidingInterval(Duration duration) {
    +        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
    +    }
    +
    +    /**
    +     * Add an event into the window, with {@link System#currentTimeMillis()} as
    +     * the tracking ts.
    +     *
    +     * @param event the event to add
    +     */
    +    public void add(T event) {
    +        add(event, System.currentTimeMillis());
    +    }
    +
    +    /**
    +     * Add an event into the window, with the given ts as the tracking ts.
    +     *
    +     * @param event the event to track
    +     * @param ts    the timestamp
    +     */
    +    public void add(T event, long ts) {
    +        Event<T> windowEvent = new EventImpl<T>(event, ts);
    +        window.add(windowEvent);
    +        track(windowEvent);
    +        compactWindow();
    --- End diff --
    
    Compaction is done to keep the window size in check, in case the trigger is defined such that unwanted events keep piling up in the window. Eg. window length = 100 and sliding interval is 1 min and the event rate is 1000 events/sec, without compaction, we will end storing upto 60,000 events whereas ideally we dont want to keep more than 100 events in the window. Similar cases can happen for other window configurations.
    
    The events are stored in a `ConcurrentLinkedQueue` and onTrigger takes a snapshot of the queue and iterates over it even if elements are added meanwhile. There is locking to keep the compaction and expiry from stepping into each other. The idea was to minimize locking esp while adding events.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975187
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    --- End diff --
    
    I will fix it across all files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154092352
  
    As a note on the late data and out of order processing please take a look at the Google Could Data Flow API.  They have been dealing with these types of use cases for a long time, and have developed an API, although a bit ugly, that really captures the essence of this type of a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44855135
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java ---
    @@ -177,6 +179,21 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint)
         }
     
         /**
    +     * Define a new bolt in this topology. This defines a windowed bolt, intended
    +     * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
    +     * is triggered for each window interval with the list of current events in the window.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
    +     * @param bolt the windowed bolt
    +     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
    +     * @return use the returned object to declare the inputs to this component
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
    --- End diff --
    
    Thanks. I 've added a tracking JIRA https://issues.apache.org/jira/browse/STORM-1207


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154471868
  
    @harshach sure I am not in a rush.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975150
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    +        }
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("sum"));
    +        }
    +
    +    }
    +
    +
    +    public static void main(String[] args) throws Exception {
    +        TopologyBuilder builder = new TopologyBuilder();
    +        builder.setSpout("integer", new RandomIntegerSpout(), 1);
    +        builder.setBolt("window", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)),
    +                        //new SlidingWindowSumBolt().withTumblingWindow(new Duration(20, TimeUnit.SECONDS))
    --- End diff --
    
    Will add a separate Bolt in that shows the usage of TumblingWindow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44016654
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private final transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    --- End diff --
    
    Extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44017408
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    --- End diff --
    
    Extra line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by satishd <gi...@git.apache.org>.
Github user satishd commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44009146
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/EventImpl.java ---
    @@ -0,0 +1,38 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +class EventImpl<T> implements Event<T> {
    +    private final T event;
    +    private long ts;
    +
    +    EventImpl(T event, long ts) {
    +        this.event = event;
    +        this.ts = ts;
    +    }
    +
    +    @Override
    +    public long getTs() {
    --- End diff --
    
    Can we rename getTs as getTimestamp?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976006
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +    }
    +
    +    @Override
    +    public void execute(TupleWindow inputWindow) {
    +
    --- End diff --
    
    Yes, makes sense to keep it abstract and not define the `execute` method, since that should be overridden by the implementor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43896655
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java ---
    @@ -0,0 +1,62 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()}
    + * when the count threshold is hit.
    + *
    + * @param <T> the type of event tracked by this policy.
    + */
    +public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
    +    private final int count;
    +    private AtomicInteger currentCount;
    +    private TriggerHandler handler;
    --- End diff --
    
    Should currentCount and handler be final as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43893747
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    +            for (Tuple tuple : inputWindow.getNew()) {
    +                sum += (int) tuple.getValue(0);
    +            }
    +            for (Tuple tuple : inputWindow.getExpired()) {
    +                sum -= (int) tuple.getValue(0);
    +            }
    +            collector.emit(new Values(sum));
    --- End diff --
    
    If this is a sliding window, why are we keeping the sum around between windows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44016552
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private final transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    --- End diff --
    
    I can see that they allow you to have withWindowLength method names for both, but that does not feel like it justifies adding in an extra class.  Although this is very minor, I am OK if you really want to leave it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43975889
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/CountEvictionPolicy.java ---
    @@ -0,0 +1,68 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * An eviction policy that tracks event counts and can
    + * evict based on a threshold count.
    + *
    + * @param <T> the type of event tracked by this policy.
    + */
    +public class CountEvictionPolicy<T> implements EvictionPolicy<T> {
    +    private final int threshold;
    +    private AtomicInteger currentCount;
    --- End diff --
    
    Thanks, I missed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154448320
  
    @arunmahadevan @Parth-Brahmbhatt @revans2 still going through the code, give me a day i'll put my comments in. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43922969
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
    + * on expiry of events or activation of the window due to {@link TriggerPolicy}.
    + *
    + * @param <T> the type of event in the window.
    + */
    +public class WindowManager<T> implements TriggerHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
    +
    +    /**
    +     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
    +     * keep the window size in check.
    +     */
    +    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    +
    +    private WindowLifecycleListener<T> windowLifecycleListener;
    +    private ConcurrentLinkedQueue<Event<T>> window;
    +    private EvictionPolicy<T> evictionPolicy;
    +    private TriggerPolicy<T> triggerPolicy;
    +    private List<T> expiredEvents;
    +    private Set<Event<T>> prevWindowEvents;
    +    private AtomicInteger eventsSinceLastExpiry;
    +    private ReentrantLock lock;
    +
    +    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
    +        windowLifecycleListener = lifecycleListener;
    +        window = new ConcurrentLinkedQueue<>();
    +        expiredEvents = new ArrayList<>();
    +        prevWindowEvents = new HashSet<>();
    +        eventsSinceLastExpiry = new AtomicInteger();
    +        lock = new ReentrantLock(true);
    +    }
    +
    +    public void setWindowLength(Count count) {
    +        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
    +    }
    +
    +    public void setWindowLength(Duration duration) {
    +        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
    +    }
    +
    +    public void setSlidingInterval(Count count) {
    +        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
    +    }
    +
    +    public void setSlidingInterval(Duration duration) {
    +        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
    +    }
    +
    +    /**
    +     * Add an event into the window, with {@link System#currentTimeMillis()} as
    +     * the tracking ts.
    +     *
    +     * @param event the event to add
    +     */
    +    public void add(T event) {
    +        add(event, System.currentTimeMillis());
    +    }
    +
    +    /**
    +     * Add an event into the window, with the given ts as the tracking ts.
    +     *
    +     * @param event the event to track
    +     * @param ts    the timestamp
    +     */
    +    public void add(T event, long ts) {
    +        Event<T> windowEvent = new EventImpl<T>(event, ts);
    +        window.add(windowEvent);
    +        track(windowEvent);
    +        compactWindow();
    +    }
    +
    +    /**
    +     * The callback invoked by the trigger policy.
    +     */
    +    @Override
    +    public void onTrigger() {
    +        List<Event<T>> windowEvents = new ArrayList<>();
    +        List<T> expired = null;
    +        try {
    +            lock.lock();
    +            /*
    +             * scan the entire window to handle out of order events in
    +             * the case of time based windows.
    +             */
    +            expireEvents(true, windowEvents);
    +            expired = new ArrayList<>(expiredEvents);
    +            expiredEvents.clear();
    +        } finally {
    +            lock.unlock();
    +        }
    +        List<T> events = new ArrayList<>();
    +        List<T> newEvents = new ArrayList<>();
    +        for (Event<T> event : windowEvents) {
    +            events.add(event.get());
    +            if (!prevWindowEvents.contains(event)) {
    +                newEvents.add(event.get());
    +            }
    +        }
    +        prevWindowEvents.clear();
    +        prevWindowEvents.addAll(windowEvents);
    +        LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
    +        windowLifecycleListener.onActivation(events, newEvents, expired);
    +        triggerPolicy.reset();
    +    }
    +
    +    public void shutdown() {
    +        LOG.debug("Shutting down WindowManager");
    +        if (triggerPolicy != null) {
    +            triggerPolicy.shutdown();
    +        }
    +    }
    +
    +    /**
    +     * expires events that fall out of the window every
    +     * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
    +     * too big.
    +     */
    +    private void compactWindow() {
    +        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
    +            expireEvents(false, null);
    +        }
    +    }
    +
    +    /**
    +     * feed the event to the eviction and trigger policies
    +     * for bookkeeping and optionally firing the trigger.
    +     */
    +    private void track(Event<T> windowEvent) {
    +        evictionPolicy.track(windowEvent);
    +        triggerPolicy.track(windowEvent);
    +    }
    +
    +    /**
    +     * Expire events from the window, using the expiration policy to check
    +     * if the event should be evicted or not.
    +     *
    +     * @param fullScan  if set, will scan the entire window. if not set, will stop
    +     *                  as soon as an event not satisfying the expiration policy is found.
    +     * @param remaining the list of remaining events in the window after expiry.
    +     */
    +    private void expireEvents(boolean fullScan, List<Event<T>> remaining) {
    --- End diff --
    
    why do we take a remaining list as input and mutate it? Why not just return the remaining list? same thing with expired events? Any reason for mutating instead of just returning?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894152
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/IWindowedBolt.java ---
    @@ -0,0 +1,40 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.windowing.TupleWindow;
    +
    +import java.util.Map;
    +
    +/**
    + * A bolt abstraction for supporting time and count based sliding & tumbling windows.
    + */
    +public interface IWindowedBolt extends IComponent {
    +    /**
    +     * This is similar to the {@link backtype.storm.task.IBolt#prepare(Map, TopologyContext, OutputCollector)} except
    +     * that while emitting the tuples are are automatically anchored to the tuples in the inputWindow.
    --- End diff --
    
    ```are are``` We probably need only one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153841906
  
    @arunmahadevan also if the window size a bigger  lets say its 1hr how are we going to handle this. I am hoping to see a state management and a pluggable storage by default we can keep this in memory and allow user to store data in other persistent storages 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43896561
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    --- End diff --
    
    Never mind I see how you are doing it, and windowConfiguration is read by the topology builder.  Looks good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976655
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
    + * on expiry of events or activation of the window due to {@link TriggerPolicy}.
    + *
    + * @param <T> the type of event in the window.
    + */
    +public class WindowManager<T> implements TriggerHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
    +
    +    /**
    +     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
    +     * keep the window size in check.
    +     */
    +    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    +
    +    private WindowLifecycleListener<T> windowLifecycleListener;
    +    private ConcurrentLinkedQueue<Event<T>> window;
    +    private EvictionPolicy<T> evictionPolicy;
    +    private TriggerPolicy<T> triggerPolicy;
    +    private List<T> expiredEvents;
    +    private Set<Event<T>> prevWindowEvents;
    +    private AtomicInteger eventsSinceLastExpiry;
    +    private ReentrantLock lock;
    +
    +    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
    +        windowLifecycleListener = lifecycleListener;
    +        window = new ConcurrentLinkedQueue<>();
    +        expiredEvents = new ArrayList<>();
    +        prevWindowEvents = new HashSet<>();
    +        eventsSinceLastExpiry = new AtomicInteger();
    +        lock = new ReentrantLock(true);
    +    }
    +
    +    public void setWindowLength(Count count) {
    +        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
    +    }
    +
    +    public void setWindowLength(Duration duration) {
    +        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
    +    }
    +
    +    public void setSlidingInterval(Count count) {
    +        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
    +    }
    +
    +    public void setSlidingInterval(Duration duration) {
    +        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
    +    }
    +
    +    /**
    +     * Add an event into the window, with {@link System#currentTimeMillis()} as
    +     * the tracking ts.
    +     *
    +     * @param event the event to add
    +     */
    +    public void add(T event) {
    +        add(event, System.currentTimeMillis());
    +    }
    +
    +    /**
    +     * Add an event into the window, with the given ts as the tracking ts.
    +     *
    +     * @param event the event to track
    +     * @param ts    the timestamp
    +     */
    +    public void add(T event, long ts) {
    +        Event<T> windowEvent = new EventImpl<T>(event, ts);
    +        window.add(windowEvent);
    +        track(windowEvent);
    +        compactWindow();
    +    }
    +
    +    /**
    +     * The callback invoked by the trigger policy.
    +     */
    +    @Override
    +    public void onTrigger() {
    +        List<Event<T>> windowEvents = new ArrayList<>();
    +        List<T> expired = null;
    +        try {
    +            lock.lock();
    +            /*
    +             * scan the entire window to handle out of order events in
    +             * the case of time based windows.
    +             */
    +            expireEvents(true, windowEvents);
    +            expired = new ArrayList<>(expiredEvents);
    +            expiredEvents.clear();
    +        } finally {
    +            lock.unlock();
    +        }
    +        List<T> events = new ArrayList<>();
    +        List<T> newEvents = new ArrayList<>();
    +        for (Event<T> event : windowEvents) {
    +            events.add(event.get());
    +            if (!prevWindowEvents.contains(event)) {
    +                newEvents.add(event.get());
    +            }
    +        }
    +        prevWindowEvents.clear();
    +        prevWindowEvents.addAll(windowEvents);
    +        LOG.debug("invoking windowLifecycleListener onActivation, [{}] events in window.", windowEvents.size());
    +        windowLifecycleListener.onActivation(events, newEvents, expired);
    +        triggerPolicy.reset();
    +    }
    +
    +    public void shutdown() {
    +        LOG.debug("Shutting down WindowManager");
    +        if (triggerPolicy != null) {
    +            triggerPolicy.shutdown();
    +        }
    +    }
    +
    +    /**
    +     * expires events that fall out of the window every
    +     * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
    +     * too big.
    +     */
    +    private void compactWindow() {
    +        if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
    +            expireEvents(false, null);
    +        }
    +    }
    +
    +    /**
    +     * feed the event to the eviction and trigger policies
    +     * for bookkeeping and optionally firing the trigger.
    +     */
    +    private void track(Event<T> windowEvent) {
    +        evictionPolicy.track(windowEvent);
    +        triggerPolicy.track(windowEvent);
    +    }
    +
    +    /**
    +     * Expire events from the window, using the expiration policy to check
    +     * if the event should be evicted or not.
    +     *
    +     * @param fullScan  if set, will scan the entire window. if not set, will stop
    +     *                  as soon as an event not satisfying the expiration policy is found.
    +     * @param remaining the list of remaining events in the window after expiry.
    +     */
    +    private void expireEvents(boolean fullScan, List<Event<T>> remaining) {
    --- End diff --
    
    We dont need the remaining list always. The `expireEvents` is also called from `compactWindow` where we dont need the result and pass null. Another way would be to pass a boolean flag to control this and return the result. The `expiredEvents` is kept as a member variable since its updated during both compaction and trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44814116
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java ---
    @@ -177,6 +179,21 @@ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint)
         }
     
         /**
    +     * Define a new bolt in this topology. This defines a windowed bolt, intended
    +     * for windowing operations. The {@link IWindowedBolt#execute(TupleWindow)} method
    +     * is triggered for each window interval with the list of current events in the window.
    +     *
    +     * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
    +     * @param bolt the windowed bolt
    +     * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
    +     * @return use the returned object to declare the inputs to this component
    +     * @throws IllegalArgumentException if {@code parallelism_hint} is not positive
    +     */
    +    public BoltDeclarer setBolt(String id, IWindowedBolt bolt, Number parallelism_hint) throws IllegalArgumentException {
    --- End diff --
    
    This API addition will require changes in Flux in order for it to support windowing.
    
    We should file a JIRA to track that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44106603
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java ---
    @@ -0,0 +1,26 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +/**
    + * A {@link Window} that contains {@link Tuple} objects.
    + */
    +public interface TupleWindow extends Window<Tuple> {
    --- End diff --
    
    This is just so that we can have `execute(TupleWindow window)` vs `execute(Window<Tuple> input)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154316797
  
    @revans2 I have added error handling to the scheduled executor used for time based expiry and addressed most of your other comments. I feel late data and out of order processing can be done incrementally once basic windowing support is added. I will track it in a separate jira - https://issues.apache.org/jira/browse/STORM-1187


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976153
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    --- End diff --
    
    Its overridden by bolt level component configuration. So users can have two bolts with different intervals.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43933512
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.windowedOutputCollector = new WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.info("Initialized window manager {} ", this.windowManager);
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        windowManager.add(input);
    +    }
    +
    +    @Override
    +    public void cleanup() {
    +        windowManager.shutdown();
    +        bolt.cleanup();
    +    }
    +
    +    @Override
    +    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +        bolt.declareOutputFields(declarer);
    +    }
    +
    +    @Override
    +    public Map<String, Object> getComponentConfiguration() {
    +        return bolt.getComponentConfiguration();
    +    }
    +
    +    private WindowLifecycleListener<Tuple> newWindowLifecycleListener() {
    +        return new WindowLifecycleListener<Tuple>() {
    +            @Override
    +            public void onExpiry(List<Tuple> tuples) {
    +                for (Tuple tuple : tuples) {
    +                    windowedOutputCollector.ack(tuple);
    +                }
    +            }
    +
    +            @Override
    +            public void onActivation(List<Tuple> tuples, List<Tuple> newTuples, List<Tuple> expiredTuples) {
    +                windowedOutputCollector.setContext(tuples);
    +                bolt.execute(new TupleWindowImpl(tuples, newTuples, expiredTuples));
    +                newTuples.clear();
    --- End diff --
    
    Again, any reason why we are relying on mutation here to clear the states, why cant the caller clear the newTuples and expiredTuples which is probably an internal state for it? 
    
    Sorry for being anal about mutation , I am just not seeing a good reasoning to do it and it will be hard for someone else to figure out where exactly the internal state variables are changing. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44017330
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/TupleWindow.java ---
    @@ -0,0 +1,26 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import backtype.storm.tuple.Tuple;
    +
    +/**
    + * A {@link Window} that contains {@link Tuple} objects.
    + */
    +public interface TupleWindow extends Window<Tuple> {
    --- End diff --
    
    I'm not sure this adds a lot, this is like Count, I'm not really a huge fan of adding extra classes that don't do a lot, but this is also very minor, so if you want to keep it I am fine with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44020085
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,224 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS) != null) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS) != null) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private int getMaxSpoutPending(Map stormConf) {
    +        int maxPending = Integer.MAX_VALUE;
    +        if (stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING) != null) {
    +            maxPending = ((Number) stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)).intValue();
    +        }
    +        return maxPending;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    private void ensureCountLessThanMaxPending(int count, int maxPending) {
    +        if (count > maxPending) {
    +            throw new IllegalArgumentException("Window count (length + sliding interval) value " + count +
    +                                                       " is more than " + Config.TOPOLOGY_MAX_SPOUT_PENDING +
    +                                                       " value " + maxPending);
    +        }
    +    }
    +
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        int maxSpoutPending = getMaxSpoutPending(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        } else if (slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(slidingIntervalDuration.value, topologyTimeout);
    +        }
    +
    +        if (windowLengthCount != null && slidingIntervalCount != null) {
    +            ensureCountLessThanMaxPending(windowLengthCount.value + slidingIntervalCount.value, maxSpoutPending);
    +        } else if (windowLengthCount != null) {
    +            ensureCountLessThanMaxPending(windowLengthCount.value, maxSpoutPending);
    +        } else if (slidingIntervalCount != null) {
    +            ensureCountLessThanMaxPending(slidingIntervalCount.value, maxSpoutPending);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    +            windowLengthCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)) {
    +            windowLengthDuration = new Duration(
    +                    ((Number) stormConf.get(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS)).intValue(),
    +                    TimeUnit.MILLISECONDS);
    +        }
    +
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)) {
    +            slidingIntervalCount = new Count(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT)).intValue());
    +        } else if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)) {
    +            slidingIntervalDuration = new Duration(((Number) stormConf.get(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS)).intValue(), TimeUnit.MILLISECONDS);
    +        } else {
    +            // default is a sliding window of count 1
    +            slidingIntervalCount = new Count(1);
    +        }
    +        // validate
    +        validate(stormConf, windowLengthCount, windowLengthDuration,
    +                 slidingIntervalCount, slidingIntervalDuration);
    +        if (windowLengthCount != null) {
    +            manager.setWindowLength(windowLengthCount);
    +        } else {
    +            manager.setWindowLength(windowLengthDuration);
    +        }
    +        if (slidingIntervalCount != null) {
    +            manager.setSlidingInterval(slidingIntervalCount);
    +        } else {
    +            manager.setSlidingInterval(slidingIntervalDuration);
    +        }
    +        return manager;
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +        this.windowedOutputCollector = new WindowedOutputCollector(collector);
    +        bolt.prepare(stormConf, context, windowedOutputCollector);
    +        this.listener = newWindowLifecycleListener();
    +        this.windowManager = initWindowManager(listener, stormConf);
    +        LOG.debug("Initialized window manager {} ", this.windowManager);
    +    }
    +
    +    @Override
    +    public void execute(Tuple input) {
    +        windowManager.add(input);
    --- End diff --
    
    WindowManager supports both `add(Tuple)` and `add(Tuple, timestamp)` the second API is tested, but it is never called.  Is this just for testing?  If so it is a real shame, because I know of many cases where a we want to window on the data at event creation time, not at the time it arrived at the bolt.  Additionally there are a few places in the code that refer to dealing with late data, but I don't see how anything can be "late" or out of order without having access to that second method.  If it is intended to just be a test API please file a follow on JIRA so that we can expose it to end users in some way and test with out-of-order data.  If it was not intended to be a test method, please provide a way to set it, preferably through an API in IWindowedBolt.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43976038
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/CountTriggerPolicy.java ---
    @@ -0,0 +1,62 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +/**
    + * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()}
    + * when the count threshold is hit.
    + *
    + * @param <T> the type of event tracked by this policy.
    + */
    +public class CountTriggerPolicy<T> implements TriggerPolicy<T> {
    +    private final int count;
    +    private AtomicInteger currentCount;
    +    private TriggerHandler handler;
    --- End diff --
    
    Thanks, I missed it will make it final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43918453
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/WindowedBoltExecutor.java ---
    @@ -0,0 +1,200 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.IOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.windowing.TupleWindowImpl;
    +import backtype.storm.windowing.WindowLifecycleListener;
    +import backtype.storm.windowing.WindowManager;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * An {@link IWindowedBolt} wrapper that does the windowing of tuples.
    + */
    +public class WindowedBoltExecutor implements IRichBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowedBoltExecutor.class);
    +
    +    private IWindowedBolt bolt;
    +    private transient WindowedOutputCollector windowedOutputCollector;
    +    private transient WindowLifecycleListener<Tuple> listener;
    +    private transient WindowManager<Tuple> windowManager;
    +
    +    public WindowedBoltExecutor(IWindowedBolt bolt) {
    +        this.bolt = bolt;
    +    }
    +
    +    private int getTopologyTimeoutMillis(Map stormConf) {
    +        if (stormConf.containsKey(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS)) {
    +            boolean timeOutsEnabled = (boolean) stormConf.get(Config.TOPOLOGY_ENABLE_MESSAGE_TIMEOUTS);
    +            if (!timeOutsEnabled) {
    +                return Integer.MAX_VALUE;
    +            }
    +        }
    +        int timeout = 0;
    +        if (stormConf.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
    +            timeout = ((Number) stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
    +        }
    +        return timeout * 1000;
    +    }
    +
    +    private void ensureDurationLessThanTimeout(int duration, int timeout) {
    +        if (duration > timeout) {
    +            throw new IllegalArgumentException("Window duration (length + sliding interval) value " + duration +
    +                                                       " is more than " + Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS +
    +                                                       " value " + timeout);
    +        }
    +    }
    +
    +    // TODO: add more validation
    +    private void validate(Map stormConf, Count windowLengthCount, Duration windowLengthDuration,
    +                          Count slidingIntervalCount, Duration slidingIntervalDuration) {
    +
    +        int topologyTimeout = getTopologyTimeoutMillis(stormConf);
    +        if (windowLengthDuration != null && slidingIntervalDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value + slidingIntervalDuration.value, topologyTimeout);
    +        } else if (windowLengthDuration != null) {
    +            ensureDurationLessThanTimeout(windowLengthDuration.value, topologyTimeout);
    +        }
    +    }
    +
    +    private WindowManager<Tuple> initWindowManager(WindowLifecycleListener<Tuple> lifecycleListener, Map stormConf) {
    +        WindowManager<Tuple> manager = new WindowManager<>(lifecycleListener);
    +        Duration windowLengthDuration = null;
    +        Count windowLengthCount = null;
    +        Duration slidingIntervalDuration = null;
    +        Count slidingIntervalCount = null;
    +        if (stormConf.containsKey(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT)) {
    --- End diff --
    
    Given we are getting these from storm config, does this mean a user can not have 2 bolts with different windowing intervals?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153820936
  
    The high level API looks good to me overall. It maintains a view of all the events in the window which is a powerful concept.
    
    I have several questions on how this PR can help implement two common use cases.
    
    (1) Aggregation (e.g., min / max) over a sliding window
    (2) Stream joins over a large amount of data
    
    The abstractions of views in memory are insufficient because for an efficient algorithm for (1) does not need every single events in the window, and (2) the events in the window need to be spilled to secondary storage. To me it seems that it still requires writing a lot of custom code. The issue might be mitigated to add flexibility on whether and where to keep the events in the window in the API.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43893450
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    --- End diff --
    
    Minor nit, declaring a new Random each time is not really good practice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43918998
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,184 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    +        public final int value;
    +
    +        public Duration(int value, TimeUnit timeUnit) {
    +            this.value = (int) timeUnit.toMillis(value);
    +        }
    +    }
    +
    +    protected BaseWindowedBolt() {
    +        windowConfiguration = new HashMap<>();
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withWindowLength(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Count count) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_COUNT, count.value);
    +        return this;
    +    }
    +
    +    private BaseWindowedBolt withSlidingInterval(Duration duration) {
    +        windowConfiguration.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, duration.value);
    +        return this;
    +    }
    +
    +    /**
    +     * Tuple count based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Tuple count and time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the number of tuples in the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration and count based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the number of tuples after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Count slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * Time duration based sliding window configuration.
    +     *
    +     * @param windowLength    the time duration of the window
    +     * @param slidingInterval the time duration after which the window slides
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength, Duration slidingInterval) {
    +        return withWindowLength(windowLength).withSlidingInterval(slidingInterval);
    +    }
    +
    +    /**
    +     * A tuple count based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the number of tuples in the window
    +     */
    +    public BaseWindowedBolt withWindow(Count windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +
    +    /**
    +     * A time duration based window that slides with every incoming tuple.
    +     *
    +     * @param windowLength the time duration of the window
    +     */
    +    public BaseWindowedBolt withWindow(Duration windowLength) {
    +        return withWindowLength(windowLength).withSlidingInterval(new Count(1));
    +    }
    +
    +    /**
    +     * A count based tumbling window.
    +     *
    +     * @param count the number of tuples after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Count count) {
    +        return withWindowLength(count).withSlidingInterval(count);
    +    }
    +
    +    /**
    +     * A time duration based tumbling window.
    +     *
    +     * @param duration the time duration after which the window tumbles
    +     */
    +    public BaseWindowedBolt withTumblingWindow(Duration duration) {
    +        return withWindowLength(duration).withSlidingInterval(duration);
    +    }
    +
    +    @Override
    +    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +    }
    +
    +    @Override
    +    public void execute(TupleWindow inputWindow) {
    +
    --- End diff --
    
    +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43894593
  
    --- Diff: examples/storm-starter/src/jvm/storm/starter/SlidingWindowTopology.java ---
    @@ -0,0 +1,129 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package storm.starter;
    +
    +import backtype.storm.Config;
    +import backtype.storm.LocalCluster;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.spout.SpoutOutputCollector;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.topology.TopologyBuilder;
    +import backtype.storm.topology.base.BaseRichSpout;
    +import backtype.storm.topology.base.BaseWindowedBolt;
    +import backtype.storm.tuple.Fields;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import backtype.storm.utils.Utils;
    +import backtype.storm.windowing.TupleWindow;
    +import storm.starter.bolt.PrinterBolt;
    +
    +import java.util.Map;
    +import java.util.Random;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * A sample topology that demonstrates the usage of {@link backtype.storm.topology.IWindowedBolt}
    + * to calculate sliding window sum.
    + */
    +public class SlidingWindowTopology {
    +
    +    /*
    +     * emits random integers every 100 ms
    +     */
    +    private static class RandomIntegerSpout extends BaseRichSpout {
    +        SpoutOutputCollector collector;
    +
    +        @Override
    +        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    +            declarer.declare(new Fields("value"));
    +        }
    +
    +        @Override
    +        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void nextTuple() {
    +            Utils.sleep(100);
    +            Random rand = new Random();
    +            Integer value = rand.nextInt(1000);
    +            collector.emit(new Values(value));
    +        }
    +    }
    +
    +    /*
    +     * Computes sliding window sum
    +     */
    +    private static class SlidingWindowSumBolt extends BaseWindowedBolt {
    +        private int sum = 0;
    +        private OutputCollector collector;
    +
    +        @Override
    +        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    +            this.collector = collector;
    +        }
    +
    +        @Override
    +        public void execute(TupleWindow inputWindow) {
    +            System.out.println("Events in current window: " + inputWindow.get().size());
    --- End diff --
    
    remove sysout


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44019125
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/TimeTriggerPolicy.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * Invokes {@link TriggerHandler#onTrigger()} after the duration.
    + */
    +public class TimeTriggerPolicy<T> implements TriggerPolicy<T> {
    +    private long duration;
    +    private final TriggerHandler handler;
    +    private final ScheduledExecutorService executor;
    +
    +    public TimeTriggerPolicy(long millis, TriggerHandler handler) {
    +        this.duration = millis;
    +        this.handler = handler;
    +        this.executor = Executors.newSingleThreadScheduledExecutor();
    +        start();
    +    }
    +
    +    @Override
    +    public void track(Event<T> event) {
    +        // NOOP
    +    }
    +
    +    @Override
    +    public void reset() {
    +        // NOOP
    +    }
    +
    +    @Override
    +    public void shutdown() {
    +        executor.shutdown();
    +        try {
    +            if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
    +                executor.shutdownNow();
    +            }
    +        } catch (InterruptedException ie) {
    +            executor.shutdownNow();
    +            Thread.currentThread().interrupt();
    +        }
    +    }
    +
    +    private void start() {
    +        Runnable task = new Runnable() {
    +            @Override
    +            public void run() {
    +                handler.onTrigger();
    --- End diff --
    
    I think we need some sort of error handling here, that can pass any exceptions from the WondowedBolt back to the WindowedBoltExecutor so we can exit the worker and fail fast on unhandled exceptions.  As it stands now the exception is just eaten.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-153772438
  
    On the surface this looks really good, a few minor nits with the code, but I have just done a quick pass through the code, and would like to dig in more before giving it a formal +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44106475
  
    --- Diff: storm-core/src/jvm/backtype/storm/topology/base/BaseWindowedBolt.java ---
    @@ -0,0 +1,180 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.topology.base;
    +
    +import backtype.storm.Config;
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.IWindowedBolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.windowing.TupleWindow;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class BaseWindowedBolt implements IWindowedBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(BaseWindowedBolt.class);
    +
    +    private final transient Map<String, Object> windowConfiguration;
    +
    +    /**
    +     * Holds a count value for count based windows and sliding intervals.
    +     */
    +    public static class Count {
    +        public final int value;
    +
    +        public Count(int value) {
    +            this.value = value;
    +        }
    +    }
    +
    +    /**
    +     * Holds a Time duration for time based windows and sliding intervals.
    +     */
    +    public static class Duration {
    --- End diff --
    
    In addition to allowing overloaded methods, it prevents accidentally passing values in wrong order if they they are just declared as ints, so I think its better to keep.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by arunmahadevan <gi...@git.apache.org>.
Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44011007
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/Event.java ---
    @@ -0,0 +1,41 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +/**
    + * An event is a wrapper object that gets stored in the window.
    + *
    + * @param <T> the type of the object thats wrapped. E.g Tuple
    + */
    +interface Event<T> {
    +    /**
    +     * The event timestamp in millis. This could be the time
    +     * when the source generated the tuple or if not the time
    +     * when the tuple was received by a bolt.
    +     *
    +     * @return
    +     */
    +    long getTs();
    --- End diff --
    
    I used `getTs` since its short and easy to type. Yes, I think `getTimestamp` is easier to read so will rename it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r43940643
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,210 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
    + * on expiry of events or activation of the window due to {@link TriggerPolicy}.
    + *
    + * @param <T> the type of event in the window.
    + */
    +public class WindowManager<T> implements TriggerHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
    +
    +    /**
    +     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
    +     * keep the window size in check.
    +     */
    +    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    +
    +    private WindowLifecycleListener<T> windowLifecycleListener;
    +    private ConcurrentLinkedQueue<Event<T>> window;
    +    private EvictionPolicy<T> evictionPolicy;
    +    private TriggerPolicy<T> triggerPolicy;
    +    private List<T> expiredEvents;
    +    private Set<Event<T>> prevWindowEvents;
    +    private AtomicInteger eventsSinceLastExpiry;
    +    private ReentrantLock lock;
    +
    +    public WindowManager(WindowLifecycleListener<T> lifecycleListener) {
    +        windowLifecycleListener = lifecycleListener;
    +        window = new ConcurrentLinkedQueue<>();
    +        expiredEvents = new ArrayList<>();
    +        prevWindowEvents = new HashSet<>();
    +        eventsSinceLastExpiry = new AtomicInteger();
    +        lock = new ReentrantLock(true);
    +    }
    +
    +    public void setWindowLength(Count count) {
    +        this.evictionPolicy = new CountEvictionPolicy<>(count.value);
    +    }
    +
    +    public void setWindowLength(Duration duration) {
    +        this.evictionPolicy = new TimeEvictionPolicy<>(duration.value);
    +    }
    +
    +    public void setSlidingInterval(Count count) {
    +        this.triggerPolicy = new CountTriggerPolicy<>(count.value, this);
    +    }
    +
    +    public void setSlidingInterval(Duration duration) {
    +        this.triggerPolicy = new TimeTriggerPolicy<>(duration.value, this);
    +    }
    +
    +    /**
    +     * Add an event into the window, with {@link System#currentTimeMillis()} as
    +     * the tracking ts.
    +     *
    +     * @param event the event to add
    +     */
    +    public void add(T event) {
    +        add(event, System.currentTimeMillis());
    +    }
    +
    +    /**
    +     * Add an event into the window, with the given ts as the tracking ts.
    +     *
    +     * @param event the event to track
    +     * @param ts    the timestamp
    +     */
    +    public void add(T event, long ts) {
    +        Event<T> windowEvent = new EventImpl<T>(event, ts);
    +        window.add(windowEvent);
    +        track(windowEvent);
    +        compactWindow();
    --- End diff --
    
    not sure why we need this? Why not just rely on onTrigger to expire events? 
    
    Also don't we need the locking here as well to ensure that when onTrigger is fired no new events can be added given that will change the events in window? plus i think it will give us undefined behavior as in line 187 we are using an iterator to remove an element from window list while this add method can add an element to it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-154419275
  
    @arunmahadevan I am OK with adding in support for other feature incrementally, but I am a bit concerned that the API will have to change fundamentally to be able to support these types of processing, especially at scale, but we can cross that bridge when we come to it, and if we need a different API we can add it and deprecate the old.
    
    I am +1, but I would like to hear from @harshach before merging this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/855#issuecomment-156320289
  
    +1. Sorry for the delay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-1167: Add windowing support for storm co...

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/855#discussion_r44017656
  
    --- Diff: storm-core/src/jvm/backtype/storm/windowing/WindowManager.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package backtype.storm.windowing;
    +
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentLinkedQueue;
    +import java.util.concurrent.atomic.AtomicInteger;
    +import java.util.concurrent.locks.ReentrantLock;
    +
    +import static backtype.storm.topology.base.BaseWindowedBolt.Count;
    +import static backtype.storm.topology.base.BaseWindowedBolt.Duration;
    +
    +/**
    + * Tracks a window of events and fires {@link WindowLifecycleListener} callbacks
    + * on expiry of events or activation of the window due to {@link TriggerPolicy}.
    + *
    + * @param <T> the type of event in the window.
    + */
    +public class WindowManager<T> implements TriggerHandler {
    +    private static final Logger LOG = LoggerFactory.getLogger(WindowManager.class);
    +
    +    /**
    +     * Expire old events every EXPIRE_EVENTS_THRESHOLD to
    +     * keep the window size in check.
    +     */
    +    public static final int EXPIRE_EVENTS_THRESHOLD = 100;
    +
    +    private WindowLifecycleListener<T> windowLifecycleListener;
    +    private ConcurrentLinkedQueue<Event<T>> window;
    +    private EvictionPolicy<T> evictionPolicy;
    +    private TriggerPolicy<T> triggerPolicy;
    +    private List<T> expiredEvents;
    +    private Set<Event<T>> prevWindowEvents;
    +    private AtomicInteger eventsSinceLastExpiry;
    +    private ReentrantLock lock;
    --- End diff --
    
    A number of these look like they could be marked as final.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---