You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:26 UTC

[25/32] incubator-ignite git commit: # Renaming

# Renaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3299c52b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3299c52b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3299c52b

Branch: refs/heads/master
Commit: 3299c52b2f679b745f6a05776bfe73fb45b70d84
Parents: 42704cb
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:51:25 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:51:25 2014 +0300

----------------------------------------------------------------------
 examples/config/example-streamer.xml            |  12 +-
 .../apache/ignite/streamer/StreamerWindow.java  |  12 +-
 .../window/StreamerBoundedSizeBatchWindow.java  | 795 ++++++++++++++++
 .../window/StreamerBoundedSizeSortedWindow.java | 210 +++++
 .../window/StreamerBoundedSizeWindow.java       | 136 +++
 .../StreamerBoundedSizeWindowAdapter.java       | 349 ++++++++
 .../window/StreamerBoundedTimeBatchWindow.java  | 897 +++++++++++++++++++
 .../window/StreamerBoundedTimeWindow.java       | 454 ++++++++++
 .../window/StreamerUnboundedWindow.java         | 103 +++
 .../streamer/window/StreamerWindowAdapter.java  | 529 +++++++++++
 .../apache/ignite/streamer/window/package.html  |  14 +
 .../streamer/GridStreamProcessor.java           |   2 +-
 .../streamer/index/StreamerIndexProvider.java   |   2 +-
 .../window/StreamerBoundedSizeBatchWindow.java  | 795 ----------------
 .../window/StreamerBoundedSizeSortedWindow.java | 210 -----
 .../window/StreamerBoundedSizeWindow.java       | 136 ---
 .../StreamerBoundedSizeWindowAdapter.java       | 349 --------
 .../window/StreamerBoundedTimeBatchWindow.java  | 897 -------------------
 .../window/StreamerBoundedTimeWindow.java       | 454 ----------
 .../window/StreamerUnboundedWindow.java         | 103 ---
 .../streamer/window/StreamerWindowAdapter.java  | 529 -----------
 .../gridgain/grid/streamer/window/package.html  |  14 -
 .../average/spring-streamer-average-base.xml    |   2 +-
 .../streamer/GridStreamerEvictionSelfTest.java  |   2 +-
 .../streamer/GridStreamerFailoverSelfTest.java  |   2 +-
 .../streamer/GridStreamerSelfTest.java          |   2 +-
 .../marshaller/GridMarshallerAbstractTest.java  |   2 +-
 .../index/GridStreamerIndexSelfTest.java        |   2 +-
 .../window/GridStreamerWindowSelfTest.java      |   1 +
 .../streamer/GridStreamerIndexLoadTest.java     |   2 +-
 30 files changed, 3509 insertions(+), 3508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index ff81b19..5aa0fec 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -67,7 +67,7 @@
                             Window size is 500, which means that we will be
                             computing running average over the last 500 events.
                         -->
-                        <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
+                        <bean class="org.apache.ignite.streamer.window.StreamerBoundedSizeWindow">
                             <property name="maximumSize" value="500"/>
                         </bean>
                     </property>
@@ -110,7 +110,7 @@
                             For this example we use a running window with upper bound of 10,000 events.
                             This means that streamer will find most popular numbers among last 10,000 events.
                         -->
-                        <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
+                        <bean class="org.apache.ignite.streamer.window.StreamerBoundedSizeWindow">
                             <property name="maximumSize" value="10000"/>
 
                             <property name="indexes">
@@ -166,7 +166,7 @@
                                 For this example we use a running batch window with upper bound of 1 second.
                                 This means that streamer composes a batch with events within 1 second time interval.
                             -->
-                            <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
+                            <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow">
                                 <property name="name" value="stage1"/>
                                 <property name="batchTimeInterval" value="1000"/>
                                 <property name="batchSize" value="2147483647"/>
@@ -177,7 +177,7 @@
                                 For this example we use a running batch window with upper bound of 2 seconds.
                                 This means that streamer composes a batch with events within 2 seconds time interval.
                             -->
-                            <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
+                            <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow">
                                 <property name="name" value="stage2"/>
                                 <property name="batchTimeInterval" value="2000"/>
                                 <property name="batchSize" value="2147483647"/>
@@ -215,7 +215,7 @@
                                 This means that only events for last 10 seconds will be present in
                                 these windows (other events are marked for eviction).
                             -->
-                            <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
+                            <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeWindow">
                                 <property name="name" value="AddToWindowStage"/>
                                 <property name="timeInterval" value="10000"/>
 
@@ -235,7 +235,7 @@
                                     </list>
                                 </property>
                             </bean>
-                            <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
+                            <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeWindow">
                                 <property name="name" value="DetectPlacesStage"/>
                                 <property name="timeInterval" value="10000"/>
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
index 69f5344..f9c0e05 100644
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
@@ -24,12 +24,12 @@ import java.util.*;
  * <p>
  * GridGain comes with following rolling windows implementations out of the box:
  * <ul>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerUnboundedWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeBatchWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeSortedWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerUnboundedWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeBatchWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeSortedWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedTimeWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow}</li>
  * </ul>
  * <p>
  * Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
new file mode 100644
index 0000000..f220159
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
@@ -0,0 +1,795 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Window that is bounded by size and accumulates events to batches.
+ */
+public class StreamerBoundedSizeBatchWindow<E> extends StreamerWindowAdapter<E> {
+    /** Max size. */
+    private int batchSize;
+
+    /** Min size. */
+    private int maxBatches;
+
+    /** Reference for queue and size. */
+    private volatile QueueHolder holder;
+
+    /** Enqueue lock. */
+    private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
+
+    /**
+     * Gets maximum number of batches can be stored in window.
+     *
+     * @return Maximum number of batches for window.
+     */
+    public int getMaximumBatches() {
+        return maxBatches;
+    }
+
+    /**
+     * Sets maximum number of batches can be stored in window.
+     *
+     * @param maxBatches Maximum number of batches for window.
+     */
+    public void setMaximumBatches(int maxBatches) {
+        this.maxBatches = maxBatches;
+    }
+
+    /**
+     * Gets batch size.
+     *
+     * @return Batch size.
+     */
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets batch size.
+     *
+     * @param batchSize Batch size.
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkConfiguration() throws GridException {
+        if (batchSize <= 0)
+            throw new GridException("Failed to initialize window (batchSize size must be positive) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maximumBatches=" + maxBatches +
+                ", batchSize=" + batchSize + ']');
+
+        if (maxBatches < 0)
+            throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maximumBatches=" + maxBatches +
+                ", batchSize=" + batchSize + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void reset0() {
+        ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
+
+        Batch b = new Batch(batchSize);
+
+        ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
+
+        b.node(n);
+
+        holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger());
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return holder.totalQueueSize().get();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridStreamerWindowIterator<E> iterator0() {
+        final QueueHolder win = holder;
+
+        final Iterator<Batch> batchIt = win.batchQueue().iterator();
+
+        return new GridStreamerWindowIterator<E>() {
+            /** Current batch iterator. */
+            private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
+
+            /** Next batch iterator. Will be null if no more batches available. */
+            private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
+
+            /** Last returned value. */
+            private E lastRet;
+
+            {
+                curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
+            }
+
+            /** {@inheritDoc} */
+            @SuppressWarnings("SimplifiableIfStatement")
+            @Override public boolean hasNext() {
+                if (curBatchIt != null) {
+                    if (curBatchIt.hasNext())
+                        return true;
+
+                    return nextBatchIt != null && nextBatchIt.hasNext();
+                }
+                else
+                    return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override public E next() {
+                if (curBatchIt == null)
+                    throw new NoSuchElementException();
+
+                if (!curBatchIt.hasNext()) {
+                    if (nextBatchIt != null) {
+                        curBatchIt = nextBatchIt;
+
+                        nextBatchIt = null;
+
+                        lastRet = curBatchIt.next();
+                    }
+                    else
+                        throw new NoSuchElementException();
+                }
+                else {
+                    E next = curBatchIt.next();
+
+                    // Moved to last element in batch - check for next iterator.
+                    if (!curBatchIt.hasNext())
+                        advanceBatch();
+
+                    lastRet = next;
+                }
+
+                return lastRet;
+            }
+
+            /** {@inheritDoc} */
+            @Nullable @Override public E removex() {
+                if (curBatchIt == null)
+                    throw new NoSuchElementException();
+
+                if (curBatchIt.removex()) {
+                    // Decrement global size if deleted.
+                    win.totalQueueSize().decrementAndGet();
+
+                    return lastRet;
+                }
+                else
+                    return null;
+            }
+
+            /**
+             * Moves to the next batch.
+             */
+            private void advanceBatch() {
+                if (batchIt.hasNext()) {
+                    Batch batch = batchIt.next();
+
+                    nextBatchIt = batch.iterator();
+                }
+                else
+                    nextBatchIt = null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public int evictionQueueSize() {
+        QueueHolder win = holder;
+
+        int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches);
+
+        Iterator<Batch> it = win.batchQueue().iterator();
+
+        int size = 0;
+
+        int idx = 0;
+
+        while (it.hasNext()) {
+            Batch batch = it.next();
+
+            if (idx++ < oversizeCnt)
+                size += batch.size();
+        }
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean enqueue0(E evt) {
+        try {
+            return enqueueInternal(evt);
+        }
+        catch (GridInterruptedException ignored) {
+            return false;
+        }
+    }
+
+    /**
+     * Enqueue event to window.
+     *
+     * @param evt Event to add.
+     * @return {@code True} if event was added.
+     *
+     * @throws GridInterruptedException If thread was interrupted.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    private boolean enqueueInternal(E evt) throws GridInterruptedException {
+        QueueHolder tup = holder;
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        while (true) {
+            Batch last = evts.peekLast();
+
+            if (last == null || !last.add(evt)) {
+                // This call will ensure that last object is actually added to batch
+                // before we add new batch to events queue.
+                // If exception is thrown here, window will be left in consistent state.
+                if (last != null)
+                    last.finish();
+
+                // Add new batch to queue in write lock.
+                if (enqueueLock.writeLock().tryLock()) {
+                    try {
+                        Batch first0 = evts.peekLast();
+
+                        if (first0 == last) {
+                            Batch batch = new Batch(batchSize);
+
+                            ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
+
+                            batch.node(node);
+
+                            size.incrementAndGet();
+
+                            if (batch.removed() && evts.unlinkx(node))
+                                size.decrementAndGet();
+                        }
+                    }
+                    finally {
+                        enqueueLock.writeLock().unlock();
+                    }
+                }
+                else {
+                    // Acquire read lock to wait for batch enqueue.
+                    enqueueLock.readLock().lock();
+
+                    enqueueLock.readLock().unlock();
+                }
+            }
+            else {
+                // Event was added, global size increment.
+                tup.totalQueueSize().incrementAndGet();
+
+                return true;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvicted0(int cnt) {
+        QueueHolder tup = holder;
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        Collection<E> res = new ArrayList<>(cnt);
+
+        while (true) {
+            int curSize = size.get();
+
+            if (curSize > maxBatches) {
+                // Just peek the first batch.
+                Batch first = evts.peekFirst();
+
+                if (first != null) {
+                    assert first.finished();
+
+                    Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+                    if (!polled.isEmpty())
+                        res.addAll(polled);
+
+                    if (first.isEmpty()) {
+                        ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+                        first.markRemoved();
+
+                        if (node != null && evts.unlinkx(node))
+                            size.decrementAndGet();
+                    }
+
+                    if (res.size() == cnt)
+                        break;
+                }
+                else
+                    break;
+            }
+            else
+                break;
+        }
+
+        // Removed entries, update global size.
+        tup.totalQueueSize().addAndGet(-res.size());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvictedBatch0() {
+        QueueHolder tup = holder;
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        while (true) {
+            int curSize = size.get();
+
+            if (curSize > maxBatches) {
+                if (size.compareAndSet(curSize, curSize - 1)) {
+                    Batch polled = evts.poll();
+
+                    if (polled != null) {
+                        assert polled.finished();
+
+                        // Mark batch deleted for consistency.
+                        polled.markRemoved();
+
+                        Collection<E> polled0 = polled.shrink();
+
+                        // Result of shrink is empty, must retry the poll.
+                        if (!polled0.isEmpty()) {
+                            // Update global size.
+                            tup.totalQueueSize().addAndGet(-polled0.size());
+
+                            return polled0;
+                        }
+                    }
+                    else {
+                        // Polled was zero, so we must restore counter and return.
+                        size.incrementAndGet();
+
+                        return Collections.emptyList();
+                    }
+                }
+            }
+            else
+                return Collections.emptyList();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> dequeue0(int cnt) {
+        QueueHolder tup = holder;
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        Collection<E> res = new ArrayList<>(cnt);
+
+        while (true) {
+            // Just peek the first batch.
+            Batch first = evts.peekFirst();
+
+            if (first != null) {
+                Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+                // We must check for finished before unlink as no elements
+                // can be added to batch after it is finished.
+                if (first.isEmpty() && first.emptyFinished()) {
+                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+                    first.markRemoved();
+
+                    if (node != null && evts.unlinkx(node))
+                        size.decrementAndGet();
+
+                    assert first.isEmpty();
+                }
+                else if (polled.isEmpty())
+                    break;
+
+                res.addAll(polled);
+
+                if (res.size() == cnt)
+                    break;
+            }
+            else
+                break;
+        }
+
+        // Update global size.
+        tup.totalQueueSize().addAndGet(-res.size());
+
+        return res;
+    }
+
+    /**
+     * Consistency check, used for testing.
+     */
+    void consistencyCheck() {
+        QueueHolder win = holder;
+
+        Iterator<E> it = iterator();
+
+        int cnt = 0;
+
+        while (it.hasNext()) {
+            it.next();
+
+            cnt++;
+        }
+
+        int cnt0 = 0;
+
+        for (Batch batch : win.batchQueue())
+            cnt0 += batch.size();
+
+        int sz = size();
+
+        assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
+        assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
+        assert win.batchQueue().size() == win.batchQueueSize().get();
+    }
+
+    /**
+     * Window structure.
+     */
+    private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public QueueHolder() {
+            // No-op.
+        }
+
+        /**
+         * @param batchQueue Batch queue.
+         * @param batchQueueSize Batch queue size counter.
+         * @param globalSize Global size counter.
+         */
+        private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
+            AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
+            super(batchQueue, batchQueueSize, globalSize);
+
+            assert batchQueue.size() == 1;
+            assert batchQueueSize.get() == 1;
+        }
+
+        /**
+         * @return Events queue.
+         */
+        @SuppressWarnings("ConstantConditions")
+        public ConcurrentLinkedDeque8<Batch> batchQueue() {
+            return get1();
+        }
+
+        /**
+         * @return Batch queue size.
+         */
+        @SuppressWarnings("ConstantConditions")
+        public AtomicInteger batchQueueSize() {
+            return get2();
+        }
+
+        /**
+         * @return Global queue size.
+         */
+        @SuppressWarnings("ConstantConditions")
+        public AtomicInteger totalQueueSize() {
+            return get3();
+        }
+    }
+
+    /**
+     * Batch.
+     */
+    private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Batch events. */
+        private ConcurrentLinkedDeque8<E> evts;
+
+        /** Capacity. */
+        private AtomicInteger cap;
+
+        /** Finished. */
+        private volatile boolean finished;
+
+        /** Queue node. */
+        @GridToStringExclude
+        private ConcurrentLinkedDeque8.Node<Batch> qNode;
+
+        /** Node removed flag. */
+        private volatile boolean rmvd;
+
+        /**
+         * @param batchSize Batch size.
+         */
+        private Batch(int batchSize) {
+            cap = new AtomicInteger(batchSize);
+
+            evts = new ConcurrentLinkedDeque8<>();
+        }
+
+        /**
+         * @return {@code True} if batch is removed.
+         */
+        public boolean removed() {
+            return rmvd;
+        }
+
+        /**
+         * Marks batch as removed.
+         */
+        public void markRemoved() {
+            rmvd = true;
+        }
+
+        /**
+         * Adds event to batch.
+         *
+         * @param evt Event to add.
+         * @return {@code True} if event was added, {@code false} if batch is full.
+         */
+        public boolean add(E evt) {
+            readLock().lock();
+
+            try {
+                if (finished)
+                    return false;
+
+                while (true) {
+                    int size = cap.get();
+
+                    if (size > 0) {
+                        if (cap.compareAndSet(size, size - 1)) {
+                            evts.add(evt);
+
+                            // Will go through write lock and finish batch.
+                            if (size == 1)
+                                finished = true;
+
+                            return true;
+                        }
+                    }
+                    else
+                        return false;
+                }
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * @return Queue node.
+         */
+        public ConcurrentLinkedDeque8.Node<Batch> node() {
+            return qNode;
+        }
+
+        /**
+         * @param qNode Queue node.
+         */
+        public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
+            this.qNode = qNode;
+        }
+
+        /**
+         * Waits for latch count down after last event was added.
+         *
+         * @throws GridInterruptedException If wait was interrupted.
+         */
+        public void finish() throws GridInterruptedException {
+            writeLock().lock();
+
+            try {
+                // Safety.
+                assert cap.get() == 0;
+                assert finished;
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /**
+         * @return {@code True} if batch is finished and no more events will be added to it.
+         */
+        public boolean finished() {
+            readLock().lock();
+
+            try {
+                return finished;
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Gets batch size.
+         *
+         * @return Batch size.
+         */
+        public int size() {
+            readLock().lock();
+
+            try {
+                return evts == null ? 0 : evts.sizex();
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * @return {@code True} if batch is empty.
+         */
+        public boolean isEmpty() {
+            readLock().lock();
+
+            try {
+                return evts == null || evts.isEmpty();
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
+         * be added to batch and it can be safely unlinked from the queue.
+         *
+         * @return {@code True} if batch is empty and finished.
+         */
+        public boolean emptyFinished() {
+            writeLock().lock();
+
+            try {
+                return finished && (evts == null || evts.isEmpty());
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
+            readLock().lock();
+
+            try {
+                if (evts != null)
+                    return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
+
+                return new ConcurrentLinkedDeque8.IteratorEx<E>() {
+                    @Override public boolean removex() {
+                        throw new NoSuchElementException();
+                    }
+
+                    @Override public boolean hasNext() {
+                        return false;
+                    }
+
+                    @Override public E next() {
+                        throw new NoSuchElementException();
+                    }
+
+                    @Override public void remove() {
+                        throw new NoSuchElementException();
+                    }
+                };
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Polls up to {@code cnt} objects from batch in concurrent fashion.
+         *
+         * @param cnt Number of objects to poll.
+         * @return Collection of polled elements or empty collection if nothing to poll.
+         */
+        public Collection<E> pollNonBatch(int cnt) {
+            readLock().lock();
+
+            try {
+                if (evts == null)
+                    return Collections.emptyList();
+
+                Collection<E> res = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    E evt = evts.poll();
+
+                    if (evt != null)
+                        res.add(evt);
+                    else
+                        return res;
+                }
+
+                return res;
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Shrinks this batch. No events can be polled from it after this method.
+         *
+         * @return Collection of events contained in batch before shrink (empty collection in
+         *         case no events were present).
+         */
+        public Collection<E> shrink() {
+            writeLock().lock();
+
+            try {
+                if (evts == null)
+                    return Collections.emptyList();
+
+                // Since iterator can concurrently delete elements, we must poll here.
+                Collection<E> res = new ArrayList<>(evts.sizex());
+
+                E o;
+
+                while ((o = evts.poll()) != null)
+                    res.add(o);
+
+                // Nothing cal be polled after shrink.
+                evts = null;
+
+                return res;
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            ConcurrentLinkedDeque8<E> evts0 = evts;
+
+            return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
new file mode 100644
index 0000000..8f50fb4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
@@ -0,0 +1,210 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Size-bounded sorted window. Unlike {@link StreamerBoundedSizeWindow}, which limits
+ * window only on size, this window also provides events in sorted order.
+ */
+public class StreamerBoundedSizeSortedWindow<E>
+    extends StreamerBoundedSizeWindowAdapter<E, StreamerBoundedSizeSortedWindow.Holder<E>> {
+    /** Comparator. */
+    private Comparator<E> comp;
+
+    /** Order counter. */
+    private AtomicLong orderCnt = new AtomicLong();
+
+    /**
+     * Gets event comparator.
+     *
+     * @return Event comparator.
+     */
+    public Comparator<E> getComparator() {
+        return comp;
+    }
+
+    /**
+     * Sets event comparator.
+     *
+     * @param comp Comparator.
+     */
+    public void setComparator(Comparator<E> comp) {
+        this.comp = comp;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected Collection<Holder<E>> newCollection() {
+        final Comparator<E> comp0 = comp;
+
+        Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() {
+            @Override public int compare(Holder<E> h1, Holder<E> h2) {
+                if (h1 == h2)
+                    return 0;
+
+                int diff = comp0 == null ?
+                    ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val);
+
+                if (diff != 0)
+                    return diff;
+                else {
+                    assert h1.order != h2.order;
+
+                    return h1.order < h2.order ? -1 : 1;
+                }
+            }
+        });
+
+        return (Collection)col;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) {
+        if (comp == null) {
+            if (!(evt instanceof Comparable))
+                throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " +
+                    "comparator is specified: " + evt);
+        }
+
+        if (set != null) {
+            if (set.add(evt)) {
+                col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
+
+                return true;
+            }
+
+            return false;
+        }
+        else {
+            col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
+
+            return true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) {
+        int cnt = 0;
+
+        for (E evt : evts) {
+            if (addInternal(evt, col, set))
+                cnt++;
+        }
+
+        return cnt;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) {
+        Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast();
+
+        if (set != null && h != null)
+            set.remove(h.val);
+
+        return h == null ? null : h.val;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col,
+        final Set<E> set, final AtomicInteger size) {
+        final Iterator<Holder<E>> it = col.iterator();
+
+        return new GridStreamerWindowIterator<E>() {
+            private Holder<E> lastRet;
+
+            @Override public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override public E next() {
+                lastRet = it.next();
+
+                return lastRet.val;
+            }
+
+            @Override public E removex() {
+                if (lastRet == null)
+                    throw new IllegalStateException();
+
+                if (col.remove(lastRet)) {
+                    if (set != null)
+                        set.remove(lastRet.val);
+
+                    size.decrementAndGet();
+
+                    return lastRet.val;
+                }
+                else
+                    return null;
+            }
+        };
+    }
+
+    /**
+     * Value wrapper.
+     */
+    @SuppressWarnings("PackageVisibleInnerClass")
+    static class Holder<E> {
+        /** Value. */
+        private E val;
+
+        /** Order to distinguish between objects for which comparator returns 0. */
+        private long order;
+
+        /**
+         * @param val Value to hold.
+         * @param order Adding order.
+         */
+        private Holder(E val, long order) {
+            this.val = val;
+            this.order = order;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return val.hashCode();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (this == obj)
+                return false;
+
+            if (!(obj instanceof Holder))
+                return false;
+
+            Holder h = (Holder)obj;
+
+            return F.eq(val, h.val) && order == h.order;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) {
+        assert col.size() == size.get();
+
+        if (set != null) {
+            // Check no duplicates in collection.
+
+            Collection<Object> vals = new HashSet<>();
+
+            for (Object evt : col)
+                assert vals.add(((Holder)evt).val);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
new file mode 100644
index 0000000..1a51f52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
@@ -0,0 +1,136 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Queue window bounded by number of elements in queue. After adding elements to this window called
+ * must check for evicted events.
+ * <p>
+ * It is guaranteed that window size will never get less then maximum size when poling from this window
+ * concurrently from different threads.
+ */
+public class StreamerBoundedSizeWindow<E> extends StreamerBoundedSizeWindowAdapter<E, E> {
+    /** {@inheritDoc} */
+    @Override protected Collection<E> newCollection() {
+        return new ConcurrentLinkedDeque8<>();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set,
+        final AtomicInteger size) {
+        final ConcurrentLinkedDeque8.IteratorEx<E> it =
+            (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator();
+
+        return new GridStreamerWindowIterator<E>() {
+            private E lastRet;
+
+            @Override public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override public E next() {
+                lastRet = it.next();
+
+                return lastRet;
+            }
+
+            @Override public E removex() {
+                if (it.removex()) {
+                    if (set != null)
+                        set.remove(lastRet);
+
+                    size.decrementAndGet();
+
+                    return lastRet;
+                }
+                else
+                    return null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("IfMayBeConditional")
+    @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) {
+        assert col instanceof ConcurrentLinkedDeque8;
+
+        // If unique.
+        if (set != null) {
+            if (set.add(evt)) {
+                col.add(evt);
+
+                return true;
+            }
+
+            return false;
+        }
+        else {
+            col.add(evt);
+
+            return true;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) {
+        assert col instanceof ConcurrentLinkedDeque8;
+        if (set != null) {
+            int cnt = 0;
+
+            for (E evt : evts) {
+                if (set.add(evt)) {
+                    col.add(evt);
+
+                    cnt++;
+                }
+            }
+
+            return cnt;
+        }
+        else {
+            col.addAll(evts);
+
+            return evts.size();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) {
+        assert col instanceof ConcurrentLinkedDeque8;
+
+        E res = ((Queue<E>)col).poll();
+
+        if (set != null && res != null)
+            set.remove(res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) {
+        assert col.size() == size.get();
+
+        if (set != null) {
+            // Check no duplicates in collection.
+
+            Collection<Object> vals = new HashSet<>();
+
+            for (Object evt : col)
+                assert vals.add(evt);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
new file mode 100644
index 0000000..c31b32e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
@@ -0,0 +1,349 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Abstract non-public class for size-bound windows. Support reset.
+ */
+abstract class StreamerBoundedSizeWindowAdapter<E, T> extends StreamerWindowAdapter<E> {
+    /** Reference. */
+    private AtomicReference<WindowHolder> ref = new AtomicReference<>();
+
+    /** If true, only unique elements will be accepted. */
+    private boolean unique;
+
+    /** Window maximum size. */
+    protected int maxSize;
+
+    /**
+     * Gets window maximum size.
+     *
+     * @return Maximum size.
+     */
+    public int getMaximumSize() {
+        return maxSize;
+    }
+
+    /**
+     * Sets window maximum size.
+     *
+     * @param maxSize Maximum size.
+     */
+    public void setMaximumSize(int maxSize) {
+        this.maxSize = maxSize;
+    }
+
+    /**
+     * @return True if only unique elements will be accepted.
+     */
+    public boolean isUnique() {
+        return unique;
+    }
+
+    /**
+     * @param unique If true, only unique elements will be accepted.
+     */
+    public void setUnique(boolean unique) {
+        this.unique = unique;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkConfiguration() throws GridException {
+        if (maxSize < 0)
+            throw new GridException("Failed to initialize window (maximumSize cannot be negative) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maxSize=" + maxSize +
+                ", unique=" + unique + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        int size = ref.get().size().get();
+
+        return size > 0 ? size : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int evictionQueueSize() {
+        int evictSize = size() - maxSize;
+
+        return evictSize > 0 ? evictSize : 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean enqueue0(E evt) {
+        add(evt);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvicted0(int cnt) {
+        Collection<E> res = new ArrayList<>(cnt);
+
+        for (int i = 0; i < cnt; i++) {
+            E evicted = pollEvictedInternal();
+
+            if (evicted == null)
+                return res;
+
+            res.add(evicted);
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvictedBatch0() {
+        E res = pollEvictedInternal();
+
+        if (res == null)
+            return Collections.emptyList();
+
+        return Collections.singleton(res);
+    }
+
+    /**
+     * Poll evicted internal implementation.
+     *
+     * @return Evicted element.
+     */
+    @Nullable private E pollEvictedInternal() {
+        WindowHolder tup = ref.get();
+
+        AtomicInteger size = tup.size();
+
+        while (true) {
+            int curSize = size.get();
+
+            if (curSize > maxSize) {
+                if (size.compareAndSet(curSize, curSize - 1)) {
+                    E evt = pollInternal(tup.collection(), tup.set());
+
+                    if (evt != null)
+                        return evt;
+                    else {
+                        // No actual events in queue, it means that other thread is just adding event.
+                        // return null as it is a concurrent add call.
+                        size.incrementAndGet();
+
+                        return null;
+                    }
+                }
+            }
+            else
+                return null;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> dequeue0(int cnt) {
+        WindowHolder tup = ref.get();
+
+        AtomicInteger size = tup.size();
+        Collection<T> evts = tup.collection();
+
+        Collection<E> resCol = new ArrayList<>(cnt);
+
+        while (true) {
+            int curSize = size.get();
+
+            if (curSize > 0) {
+                if (size.compareAndSet(curSize, curSize - 1)) {
+                    E res = pollInternal(evts, tup.set());
+
+                    if (res != null) {
+                        resCol.add(res);
+
+                        if (resCol.size() >= cnt)
+                            return resCol;
+                    }
+                    else {
+                        size.incrementAndGet();
+
+                        return resCol;
+                    }
+                }
+            }
+            else
+                return resCol;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridStreamerWindowIterator<E> iterator0() {
+        WindowHolder win = ref.get();
+
+        return iteratorInternal(win.collection(), win.set(), win.size());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void reset0() {
+        ref.set(new WindowHolder(newCollection(),
+            unique ? new GridConcurrentHashSet<E>() : null,
+            new AtomicInteger()));
+    }
+
+    /**
+     * @param evt Event to add.
+     */
+    private void add(E evt) {
+        WindowHolder tup = ref.get();
+
+        if (addInternal(evt, tup.collection(), tup.set()))
+            tup.size().incrementAndGet();
+    }
+
+    /**
+     * @param evts Events to add.
+     */
+    private void addAll(Collection<E> evts) {
+        WindowHolder tup = ref.get();
+
+        int cnt = addAllInternal(evts, tup.collection(), tup.set());
+
+        tup.size().addAndGet(cnt);
+    }
+
+    /**
+     * Checks window consistency. Used for testing.
+     */
+    void consistencyCheck() {
+        WindowHolder win = ref.get();
+
+        consistencyCheck(win.collection(), win.set(), win.size());
+    }
+
+    /**
+     * Get underlying collection.
+     *
+     * @return Collection.
+     */
+    @SuppressWarnings("ConstantConditions")
+    protected Collection<T> collection() {
+        return ref.get().get1();
+    }
+
+    /**
+     * Creates new collection specific for window implementation. This collection will be subsequently passed
+     * to addInternal(...) and pollInternal() methods.
+     *
+     * @return Collection - holder.
+     */
+    protected abstract Collection<T> newCollection();
+
+    /**
+     * Adds event to queue implementation.
+     *
+     * @param evt Event to add.
+     * @param col Collection to add to.
+     * @param set Set to check.
+     * @return {@code True} if event was added.
+     */
+    protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set);
+
+    /**
+     * Adds all events to queue implementation.
+     *
+     * @param evts Events to add.
+     * @param col Collection to add to.
+     * @param set Set to check.
+     * @return Added events number.
+     */
+    protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set);
+
+    /**
+     * @param col Collection to add to.
+     * @param set Set to check.
+     * @return Polled object.
+     */
+    @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set);
+
+    /**
+     * Creates iterator based on implementation collection type.
+     *
+     * @param col Collection.
+     * @param set Set to check.
+     * @param size Size.
+     * @return Iterator.
+     */
+    protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set,
+        AtomicInteger size);
+
+    /**
+     * Checks consistency. Used in tests.
+     *
+     * @param col Collection.
+     * @param set Set if unique.
+     * @param size Size holder.
+     */
+    protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size);
+
+    /**
+     * Window holder.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public WindowHolder() {
+            // No-op.
+        }
+
+        /**
+         * @param col Collection.
+         * @param set Set if unique.
+         * @param size Window size counter.
+         */
+        WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) {
+            super(col, set, size);
+        }
+
+        /**
+         * @return Collection.
+         */
+        public Collection<T> collection() {
+            return get1();
+        }
+
+        /**
+         * @return Set.
+         */
+        public Set<E> set() {
+            return get2();
+        }
+
+        /**
+         * @return Size.
+         */
+        public AtomicInteger size() {
+            return get3();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
new file mode 100644
index 0000000..187c34e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
@@ -0,0 +1,897 @@
+/* @java.file.header */
+
+/*  _________        _____ __________________        _____
+ *  __  ____/___________(_)______  /__  ____/______ ____(_)_______
+ *  _  / __  __  ___/__  / _  __  / _  / __  _  __ `/__  / __  __ \
+ *  / /_/ /  _  /    _  /  / /_/ /  / /_/ /  / /_/ / _  /  _  / / /
+ *  \____/   /_/     /_/   \_,__/   \____/   \__,_/  /_/   /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Window that accumulates events in batches, and is bounded by time and maximum number of batches.
+ */
+public class StreamerBoundedTimeBatchWindow<E> extends StreamerWindowAdapter<E> {
+    /** Batch size. */
+    private int batchSize;
+
+    /** Maximum batches. */
+    private int maxBatches;
+
+    /** */
+    private long batchTimeInterval;
+
+    /** Atomic reference for queue and size. */
+    private AtomicReference<WindowHolder> ref = new AtomicReference<>();
+
+    /** Enqueue lock. */
+    private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
+
+    /**
+     * Gets maximum number of batches can be stored in window.
+     *
+     * @return Maximum number of batches for window.
+     */
+    public int getMaximumBatches() {
+        return maxBatches;
+    }
+
+    /**
+     * Sets maximum number of batches can be stored in window.
+     *
+     * @param maxBatches Maximum number of batches for window.
+     */
+    public void setMaximumBatches(int maxBatches) {
+        this.maxBatches = maxBatches;
+    }
+
+    /**
+     * Gets batch size.
+     *
+     * @return Batch size.
+     */
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    /**
+     * Sets batch size.
+     *
+     * @param batchSize Batch size.
+     */
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    /**
+     * Gets batch time interval.
+     *
+     * @return Batch time interval.
+     */
+    public long getBatchTimeInterval() {
+        return batchTimeInterval;
+    }
+
+    /**
+     * Sets batch time interval.
+     *
+     * @param batchTimeInterval Batch time interval.
+     */
+    public void setBatchTimeInterval(long batchTimeInterval) {
+        this.batchTimeInterval = batchTimeInterval;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void checkConfiguration() throws GridException {
+        if (maxBatches < 0)
+            throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maximumBatches=" + maxBatches +
+                ", batchSize=" + batchSize +
+                ", batchTimeInterval=" + batchTimeInterval + ']');
+
+        if (batchSize < 0)
+            throw new GridException("Failed to initialize window (batchSize cannot be negative) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maximumBatches=" + maxBatches +
+                ", batchSize=" + batchSize +
+                ", batchTimeInterval=" + batchTimeInterval + ']');
+        else if (batchSize == 0)
+            batchSize = Integer.MAX_VALUE;
+
+        if (batchTimeInterval <= 0)
+            throw new GridException("Failed to initialize window (batchTimeInterval must be positive) " +
+                "[windowClass=" + getClass().getSimpleName() +
+                ", maximumBatches=" + maxBatches +
+                ", batchSize=" + batchSize +
+                ", batchTimeInterval=" + batchTimeInterval + ']');
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void stop0() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void reset0() {
+        ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
+
+        Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval);
+
+        ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
+
+        b.node(n);
+
+        ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger()));
+    }
+
+    /** {@inheritDoc} */
+    @Override public int size() {
+        return ref.get().totalQueueSize().get();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridStreamerWindowIterator<E> iterator0() {
+        final WindowHolder win = ref.get();
+
+        final Iterator<Batch> batchIt = win.batchQueue().iterator();
+
+        return new GridStreamerWindowIterator<E>() {
+            /** Current batch iterator. */
+            private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
+
+            /** Next batch iterator. Will be null if no more batches available. */
+            private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
+
+            /** Last returned value. */
+            private E lastRet;
+
+            {
+                curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
+            }
+
+            /** {@inheritDoc} */
+            @SuppressWarnings("SimplifiableIfStatement")
+            @Override public boolean hasNext() {
+                if (curBatchIt != null) {
+                    if (curBatchIt.hasNext())
+                        return true;
+
+                    return nextBatchIt != null && nextBatchIt.hasNext();
+                }
+                else
+                    return false;
+            }
+
+            /** {@inheritDoc} */
+            @Override public E next() {
+                if (curBatchIt == null)
+                    throw new NoSuchElementException();
+
+                if (!curBatchIt.hasNext()) {
+                    if (nextBatchIt != null) {
+                        curBatchIt = nextBatchIt;
+
+                        nextBatchIt = null;
+
+                        lastRet = curBatchIt.next();
+                    }
+                    else
+                        throw new NoSuchElementException();
+                }
+                else {
+                    E next = curBatchIt.next();
+
+                    // Moved to last element in batch - check for next iterator.
+                    if (!curBatchIt.hasNext())
+                        advanceBatch();
+
+                    lastRet = next;
+                }
+
+                return lastRet;
+            }
+
+            /** {@inheritDoc} */
+            @Override public E removex() {
+                if (curBatchIt == null)
+                    throw new NoSuchElementException();
+
+                if (curBatchIt.removex()) {
+                    // Decrement global size if deleted.
+                    win.totalQueueSize().decrementAndGet();
+
+                    return lastRet;
+                }
+                else
+                    return null;
+            }
+
+            /**
+             * Moves to the next batch.
+             */
+            private void advanceBatch() {
+                if (batchIt.hasNext()) {
+                    Batch batch = batchIt.next();
+
+                    nextBatchIt = batch.iterator();
+                }
+                else
+                    nextBatchIt = null;
+            }
+        };
+    }
+
+    /** {@inheritDoc} */
+    @Override public int evictionQueueSize() {
+        WindowHolder win = ref.get();
+
+        int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0;
+
+        long now = U.currentTimeMillis();
+
+        Iterator<Batch> it = win.batchQueue().iterator();
+
+        int size = 0;
+
+        int idx = 0;
+
+        while (it.hasNext()) {
+            Batch batch = it.next();
+
+            if (idx++ < oversizeCnt || batch.batchEndTs < now)
+                size += batch.size();
+        }
+
+        return size;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean enqueue0(E evt) {
+        try {
+            return enqueue0(evt, U.currentTimeMillis());
+        }
+        catch (GridInterruptedException ignored) {
+            return false;
+        }
+    }
+
+    /**
+     * Enqueue event to window.
+     *
+     * @param evt Event to add.
+     * @param ts Event timestamp.
+     * @return {@code True} if event was added.
+     *
+     * @throws GridInterruptedException If thread was interrupted.
+     */
+    private boolean enqueue0(E evt, long ts) throws GridInterruptedException {
+        WindowHolder tup = ref.get();
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        while (true) {
+            Batch last = evts.peekLast();
+
+            if (last == null || !last.add(evt, ts)) {
+                // This call will ensure that last object is actually added to batch
+                // before we add new batch to events queue.
+                // If exception is thrown here, window will be left in consistent state.
+                if (last != null)
+                    last.finish();
+
+                // Add new batch to queue in write lock.
+                if (enqueueLock.writeLock().tryLock()) {
+                    try {
+                        Batch first0 = evts.peekLast();
+
+                        if (first0 == last) {
+                            Batch batch = new Batch(batchSize, ts + batchTimeInterval);
+
+                            ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
+
+                            batch.node(node);
+
+                            size.incrementAndGet();
+
+                            // If batch was removed in other thread.
+                            if (batch.removed() && evts.unlinkx(node))
+                                size.decrementAndGet();
+                        }
+                    }
+                    finally {
+                        enqueueLock.writeLock().unlock();
+                    }
+                }
+                else {
+                    // Acquire read lock to wait for batch enqueue.
+                    enqueueLock.readLock().lock();
+
+                    try {
+                        evts.peekLast();
+                    }
+                    finally {
+                        enqueueLock.readLock().unlock();
+                    }
+                }
+            }
+            else {
+                // Event was added, global size increment.
+                tup.totalQueueSize().incrementAndGet();
+
+                return true;
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvicted0(int cnt) {
+        WindowHolder tup = ref.get();
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        Collection<E> res = new ArrayList<>(cnt);
+
+        while (true) {
+            int curSize = size.get();
+
+            // Just peek the first batch.
+            Batch first = evts.peekFirst();
+
+            if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) {
+                assert first.finished();
+
+                Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+                if (!polled.isEmpty())
+                    res.addAll(polled);
+
+                if (first.isEmpty()) {
+                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+                    first.markRemoved();
+
+                    if (node != null && evts.unlinkx(node))
+                        size.decrementAndGet();
+                }
+
+                if (res.size() == cnt)
+                    break;
+            }
+            else
+                break;
+        }
+
+        // Removed entries, update global size.
+        tup.totalQueueSize().addAndGet(-res.size());
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> pollEvictedBatch0() {
+        WindowHolder tup = ref.get();
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        while (true) {
+            int curSize = size.get();
+
+            if (maxBatches > 0 && curSize > maxBatches) {
+                if (size.compareAndSet(curSize, curSize - 1)) {
+                    Batch polled = evts.poll();
+
+                    if (polled != null) {
+                        assert polled.finished();
+
+                        // Mark batch removed for consistency.
+                        polled.markRemoved();
+
+                        Collection<E> polled0 = polled.shrink();
+
+                        // Result of shrink is empty, must retry the poll.
+                        if (!polled0.isEmpty()) {
+                            // Update global size.
+                            tup.totalQueueSize().addAndGet(-polled0.size());
+
+                            return polled0;
+                        }
+                    }
+                    else {
+                        // Polled was zero, so we must restore counter and return.
+                        size.incrementAndGet();
+
+                        return Collections.emptyList();
+                    }
+                }
+            }
+            else {
+                while (true) {
+                    Batch batch = evts.peekFirst();
+
+                    // This call will finish batch and return true if batch is expired.
+                    if (batch != null && batch.checkExpired()) {
+                        assert batch.finished();
+
+                        ConcurrentLinkedDeque8.Node<Batch> node = batch.node();
+
+                        batch.markRemoved();
+
+                        if (node != null && evts.unlinkx(node))
+                            size.decrementAndGet();
+
+                        Collection<E> col = batch.shrink();
+
+                        tup.totalQueueSize().addAndGet(-col.size());
+
+                        if (!col.isEmpty())
+                            return col;
+                    }
+                    else
+                        return Collections.emptyList();
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Collection<E> dequeue0(int cnt) {
+        WindowHolder tup = ref.get();
+
+        ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+        AtomicInteger size = tup.batchQueueSize();
+
+        Collection<E> res = new ArrayList<>(cnt);
+
+        while (true) {
+            // Just peek the first batch.
+            Batch first = evts.peekFirst();
+
+            if (first != null) {
+                Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+                // We must check for finished before unlink as no elements
+                // can be added to batch after it is finished.
+                if (first.isEmpty() && first.emptyFinished()) {
+                    ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+                    first.markRemoved();
+
+                    if (node != null && evts.unlinkx(node))
+                        size.decrementAndGet();
+
+                    assert first.isEmpty();
+                }
+                else if (polled.isEmpty())
+                    break;
+
+                res.addAll(polled);
+
+                if (res.size() == cnt)
+                    break;
+            }
+            else
+                break;
+        }
+
+        // Update global size.
+        tup.totalQueueSize().addAndGet(-res.size());
+
+        return res;
+    }
+
+    /**
+     * Consistency check, used for testing.
+     */
+    void consistencyCheck() {
+        WindowHolder win = ref.get();
+
+        Iterator<E> it = iterator();
+
+        int cnt = 0;
+
+        while (it.hasNext()) {
+            it.next();
+
+            cnt++;
+        }
+
+        int cnt0 = 0;
+
+        for (Batch batch : win.batchQueue())
+            cnt0 += batch.size();
+
+        int sz = size();
+
+        assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
+        assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
+        assert win.batchQueue().size() == win.batchQueueSize().get();
+    }
+
+    /**
+     * Window structure.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /**
+         * Empty constructor required by {@link Externalizable}.
+         */
+        public WindowHolder() {
+            // No-op.
+        }
+
+        /**
+         * @param batchQueue Batch queue.
+         * @param batchQueueSize Batch queue size counter.
+         * @param globalSize Global size counter.
+         */
+        private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
+            AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
+            super(batchQueue, batchQueueSize, globalSize);
+
+            assert batchQueue.size() == 1;
+            assert batchQueueSize.get() == 1;
+        }
+
+        /**
+         * @return Events queue.
+         */
+        public ConcurrentLinkedDeque8<Batch> batchQueue() {
+            return get1();
+        }
+
+        /**
+         * @return Batch queue size.
+         */
+        public AtomicInteger batchQueueSize() {
+            return get2();
+        }
+
+        /**
+         * @return Global queue size.
+         */
+        public AtomicInteger totalQueueSize() {
+            return get3();
+        }
+    }
+
+    /**
+     * Batch.
+     */
+    private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Batch events. */
+        private ConcurrentLinkedDeque8<E> evts;
+
+        /** Capacity. */
+        private AtomicInteger cap;
+
+        /** Batch end timestamp. */
+        private final long batchEndTs;
+
+        /** Finished flag. */
+        private boolean finished;
+
+        /** Queue node. */
+        @GridToStringExclude
+        private ConcurrentLinkedDeque8.Node<Batch> qNode;
+
+        /** Removed flag. */
+        private volatile boolean rmvd;
+
+        /**
+         * @param batchSize Batch size.
+         * @param batchEndTs Batch end timestamp.
+         */
+        private Batch(int batchSize, long batchEndTs) {
+            cap = new AtomicInteger(batchSize);
+            this.batchEndTs = batchEndTs;
+
+            evts = new ConcurrentLinkedDeque8<>();
+        }
+
+        /**
+         * @return {@code True} if removed.
+         */
+        public boolean removed() {
+            return rmvd;
+        }
+
+        /**
+         * Marks batch as removed.
+         */
+        public void markRemoved() {
+            rmvd = true;
+        }
+
+        /**
+         * Adds event to batch.
+         *
+         * @param evt Event to add.
+         * @param ts Event timestamp.
+         * @return {@code True} if event was added, {@code false} if batch is full.
+         */
+        public boolean add(E evt, long ts) {
+            if (ts <= batchEndTs) {
+                readLock().lock();
+
+                try {
+                    if (finished)
+                        // Finished was set inside write lock.
+                        return false;
+
+                    while (true) {
+                        int size = cap.get();
+
+                        if (size > 0) {
+                            if (cap.compareAndSet(size, size - 1)) {
+                                evts.add(evt);
+
+                                // Will go through write lock and finish batch.
+                                if (size == 1)
+                                    finished = true;
+
+                                return true;
+                            }
+                        }
+                        else
+                            return false;
+                    }
+                }
+                finally {
+                    readLock().unlock();
+                }
+            }
+            else {
+                writeLock().lock();
+
+                try {
+                    // No events could be added to this batch.
+                    finished = true;
+
+                    return false;
+                }
+                finally {
+                    writeLock().unlock();
+                }
+            }
+        }
+
+        /**
+         * @return Queue node.
+         */
+        public ConcurrentLinkedDeque8.Node<Batch> node() {
+            return qNode;
+        }
+
+        /**
+         * @param qNode Queue node.
+         */
+        public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
+            this.qNode = qNode;
+        }
+
+        /**
+         * Waits for latch count down after last event was added.
+         *
+         * @throws GridInterruptedException If wait was interrupted.
+         */
+        public void finish() throws GridInterruptedException {
+            writeLock().lock();
+
+            try {
+                // Safety.
+                assert cap.get() == 0 || finished;
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /**
+         * @return {@code True} if batch is finished and no more events will be added to it.
+         */
+        public boolean finished() {
+            readLock().lock();
+
+            try {
+                return finished;
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Gets batch size.
+         *
+         * @return Batch size.
+         */
+        public int size() {
+            readLock().lock();
+
+            try {
+                return evts == null ? 0 : evts.sizex();
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * @return {@code True} if batch is empty.
+         */
+        public boolean isEmpty() {
+            readLock().lock();
+
+            try {
+                return evts == null || evts.isEmpty();
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
+         * be added to batch and it can be safely unlinked from the queue.
+         *
+         * @return {@code True} if batch is empty and finished.
+         */
+        public boolean emptyFinished() {
+            writeLock().lock();
+
+            try {
+                return finished && (evts == null || evts.isEmpty());
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /**
+         * Checks if the batch has expired.
+         *
+         * @return {@code True} if the batch has expired, {@code false} otherwise.
+         */
+        public boolean checkExpired() {
+            if (U.currentTimeMillis() > batchEndTs) {
+                writeLock().lock();
+
+                try {
+                    finished = true;
+
+                    return true;
+                }
+                finally {
+                    writeLock().unlock();
+                }
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
+            readLock().lock();
+
+            try {
+                if (evts != null)
+                    return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
+
+                return new ConcurrentLinkedDeque8.IteratorEx<E>() {
+                    @Override public boolean removex() {
+                        throw new NoSuchElementException();
+                    }
+
+                    @Override public boolean hasNext() {
+                        return false;
+                    }
+
+                    @Override public E next() {
+                        throw new NoSuchElementException();
+                    }
+
+                    @Override public void remove() {
+                        throw new NoSuchElementException();
+                    }
+                };
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Polls up to {@code cnt} objects from batch in concurrent fashion.
+         *
+         * @param cnt Number of objects to poll.
+         * @return Collection of polled elements (empty collection in case no events were
+         *         present).
+         */
+        public Collection<E> pollNonBatch(int cnt) {
+            readLock().lock();
+
+            try {
+                if (evts == null)
+                    return Collections.emptyList();
+
+                Collection<E> res = new ArrayList<>(cnt);
+
+                for (int i = 0; i < cnt; i++) {
+                    E evt = evts.poll();
+
+                    if (evt != null)
+                        res.add(evt);
+                    else
+                        return res;
+                }
+
+                return res;
+            }
+            finally {
+                readLock().unlock();
+            }
+        }
+
+        /**
+         * Shrinks this batch. No events can be polled from it after this method.
+         *
+         * @return Collection of events contained in batch before shrink (empty collection in
+         *         case no events were present).
+         */
+        public Collection<E> shrink() {
+            writeLock().lock();
+
+            try {
+                if (evts == null)
+                    return Collections.emptyList();
+
+                // Since iterator can concurrently delete elements, we must poll here.
+                Collection<E> res = new ArrayList<>(evts.sizex());
+
+                E o;
+
+                while ((o = evts.poll()) != null)
+                    res.add(o);
+
+                // Nothing cal be polled after shrink.
+                evts = null;
+
+                return res;
+            }
+            finally {
+                writeLock().unlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            ConcurrentLinkedDeque8<E> evts0 = evts;
+
+            return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
+        }
+    }
+}