You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:26 UTC
[25/32] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3299c52b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3299c52b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3299c52b
Branch: refs/heads/master
Commit: 3299c52b2f679b745f6a05776bfe73fb45b70d84
Parents: 42704cb
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:51:25 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:51:25 2014 +0300
----------------------------------------------------------------------
examples/config/example-streamer.xml | 12 +-
.../apache/ignite/streamer/StreamerWindow.java | 12 +-
.../window/StreamerBoundedSizeBatchWindow.java | 795 ++++++++++++++++
.../window/StreamerBoundedSizeSortedWindow.java | 210 +++++
.../window/StreamerBoundedSizeWindow.java | 136 +++
.../StreamerBoundedSizeWindowAdapter.java | 349 ++++++++
.../window/StreamerBoundedTimeBatchWindow.java | 897 +++++++++++++++++++
.../window/StreamerBoundedTimeWindow.java | 454 ++++++++++
.../window/StreamerUnboundedWindow.java | 103 +++
.../streamer/window/StreamerWindowAdapter.java | 529 +++++++++++
.../apache/ignite/streamer/window/package.html | 14 +
.../streamer/GridStreamProcessor.java | 2 +-
.../streamer/index/StreamerIndexProvider.java | 2 +-
.../window/StreamerBoundedSizeBatchWindow.java | 795 ----------------
.../window/StreamerBoundedSizeSortedWindow.java | 210 -----
.../window/StreamerBoundedSizeWindow.java | 136 ---
.../StreamerBoundedSizeWindowAdapter.java | 349 --------
.../window/StreamerBoundedTimeBatchWindow.java | 897 -------------------
.../window/StreamerBoundedTimeWindow.java | 454 ----------
.../window/StreamerUnboundedWindow.java | 103 ---
.../streamer/window/StreamerWindowAdapter.java | 529 -----------
.../gridgain/grid/streamer/window/package.html | 14 -
.../average/spring-streamer-average-base.xml | 2 +-
.../streamer/GridStreamerEvictionSelfTest.java | 2 +-
.../streamer/GridStreamerFailoverSelfTest.java | 2 +-
.../streamer/GridStreamerSelfTest.java | 2 +-
.../marshaller/GridMarshallerAbstractTest.java | 2 +-
.../index/GridStreamerIndexSelfTest.java | 2 +-
.../window/GridStreamerWindowSelfTest.java | 1 +
.../streamer/GridStreamerIndexLoadTest.java | 2 +-
30 files changed, 3509 insertions(+), 3508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index ff81b19..5aa0fec 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -67,7 +67,7 @@
Window size is 500, which means that we will be
computing running average over the last 500 events.
-->
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedSizeWindow">
<property name="maximumSize" value="500"/>
</bean>
</property>
@@ -110,7 +110,7 @@
For this example we use a running window with upper bound of 10,000 events.
This means that streamer will find most popular numbers among last 10,000 events.
-->
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedSizeWindow">
<property name="maximumSize" value="10000"/>
<property name="indexes">
@@ -166,7 +166,7 @@
For this example we use a running batch window with upper bound of 1 second.
This means that streamer composes a batch with events within 1 second time interval.
-->
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow">
<property name="name" value="stage1"/>
<property name="batchTimeInterval" value="1000"/>
<property name="batchSize" value="2147483647"/>
@@ -177,7 +177,7 @@
For this example we use a running batch window with upper bound of 2 seconds.
This means that streamer composes a batch with events within 2 seconds time interval.
-->
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow">
<property name="name" value="stage2"/>
<property name="batchTimeInterval" value="2000"/>
<property name="batchSize" value="2147483647"/>
@@ -215,7 +215,7 @@
This means that only events for last 10 seconds will be present in
these windows (other events are marked for eviction).
-->
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeWindow">
<property name="name" value="AddToWindowStage"/>
<property name="timeInterval" value="10000"/>
@@ -235,7 +235,7 @@
</list>
</property>
</bean>
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedTimeWindow">
<property name="name" value="DetectPlacesStage"/>
<property name="timeInterval" value="10000"/>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
index 69f5344..f9c0e05 100644
--- a/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/StreamerWindow.java
@@ -24,12 +24,12 @@ import java.util.*;
* <p>
* GridGain comes with following rolling windows implementations out of the box:
* <ul>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerUnboundedWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeBatchWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeSortedWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow}</li>
- * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerUnboundedWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeBatchWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedSizeSortedWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedTimeWindow}</li>
+ * <li>{@link org.apache.ignite.streamer.window.StreamerBoundedTimeBatchWindow}</li>
* </ul>
* <p>
* Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
new file mode 100644
index 0000000..f220159
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeBatchWindow.java
@@ -0,0 +1,795 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Window that is bounded by size and accumulates events to batches.
+ */
+public class StreamerBoundedSizeBatchWindow<E> extends StreamerWindowAdapter<E> {
+ /** Max size. */
+ private int batchSize;
+
+ /** Min size. */
+ private int maxBatches;
+
+ /** Reference for queue and size. */
+ private volatile QueueHolder holder;
+
+ /** Enqueue lock. */
+ private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
+
+ /**
+ * Gets maximum number of batches can be stored in window.
+ *
+ * @return Maximum number of batches for window.
+ */
+ public int getMaximumBatches() {
+ return maxBatches;
+ }
+
+ /**
+ * Sets maximum number of batches can be stored in window.
+ *
+ * @param maxBatches Maximum number of batches for window.
+ */
+ public void setMaximumBatches(int maxBatches) {
+ this.maxBatches = maxBatches;
+ }
+
+ /**
+ * Gets batch size.
+ *
+ * @return Batch size.
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Sets batch size.
+ *
+ * @param batchSize Batch size.
+ */
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkConfiguration() throws GridException {
+ if (batchSize <= 0)
+ throw new GridException("Failed to initialize window (batchSize size must be positive) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maximumBatches=" + maxBatches +
+ ", batchSize=" + batchSize + ']');
+
+ if (maxBatches < 0)
+ throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maximumBatches=" + maxBatches +
+ ", batchSize=" + batchSize + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void stop0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reset0() {
+ ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
+
+ Batch b = new Batch(batchSize);
+
+ ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
+
+ b.node(n);
+
+ holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return holder.totalQueueSize().get();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iterator0() {
+ final QueueHolder win = holder;
+
+ final Iterator<Batch> batchIt = win.batchQueue().iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ /** Current batch iterator. */
+ private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
+
+ /** Next batch iterator. Will be null if no more batches available. */
+ private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
+
+ /** Last returned value. */
+ private E lastRet;
+
+ {
+ curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean hasNext() {
+ if (curBatchIt != null) {
+ if (curBatchIt.hasNext())
+ return true;
+
+ return nextBatchIt != null && nextBatchIt.hasNext();
+ }
+ else
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public E next() {
+ if (curBatchIt == null)
+ throw new NoSuchElementException();
+
+ if (!curBatchIt.hasNext()) {
+ if (nextBatchIt != null) {
+ curBatchIt = nextBatchIt;
+
+ nextBatchIt = null;
+
+ lastRet = curBatchIt.next();
+ }
+ else
+ throw new NoSuchElementException();
+ }
+ else {
+ E next = curBatchIt.next();
+
+ // Moved to last element in batch - check for next iterator.
+ if (!curBatchIt.hasNext())
+ advanceBatch();
+
+ lastRet = next;
+ }
+
+ return lastRet;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public E removex() {
+ if (curBatchIt == null)
+ throw new NoSuchElementException();
+
+ if (curBatchIt.removex()) {
+ // Decrement global size if deleted.
+ win.totalQueueSize().decrementAndGet();
+
+ return lastRet;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Moves to the next batch.
+ */
+ private void advanceBatch() {
+ if (batchIt.hasNext()) {
+ Batch batch = batchIt.next();
+
+ nextBatchIt = batch.iterator();
+ }
+ else
+ nextBatchIt = null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ QueueHolder win = holder;
+
+ int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches);
+
+ Iterator<Batch> it = win.batchQueue().iterator();
+
+ int size = 0;
+
+ int idx = 0;
+
+ while (it.hasNext()) {
+ Batch batch = it.next();
+
+ if (idx++ < oversizeCnt)
+ size += batch.size();
+ }
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean enqueue0(E evt) {
+ try {
+ return enqueueInternal(evt);
+ }
+ catch (GridInterruptedException ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Enqueue event to window.
+ *
+ * @param evt Event to add.
+ * @return {@code True} if event was added.
+ *
+ * @throws GridInterruptedException If thread was interrupted.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ private boolean enqueueInternal(E evt) throws GridInterruptedException {
+ QueueHolder tup = holder;
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ while (true) {
+ Batch last = evts.peekLast();
+
+ if (last == null || !last.add(evt)) {
+ // This call will ensure that last object is actually added to batch
+ // before we add new batch to events queue.
+ // If exception is thrown here, window will be left in consistent state.
+ if (last != null)
+ last.finish();
+
+ // Add new batch to queue in write lock.
+ if (enqueueLock.writeLock().tryLock()) {
+ try {
+ Batch first0 = evts.peekLast();
+
+ if (first0 == last) {
+ Batch batch = new Batch(batchSize);
+
+ ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
+
+ batch.node(node);
+
+ size.incrementAndGet();
+
+ if (batch.removed() && evts.unlinkx(node))
+ size.decrementAndGet();
+ }
+ }
+ finally {
+ enqueueLock.writeLock().unlock();
+ }
+ }
+ else {
+ // Acquire read lock to wait for batch enqueue.
+ enqueueLock.readLock().lock();
+
+ enqueueLock.readLock().unlock();
+ }
+ }
+ else {
+ // Event was added, global size increment.
+ tup.totalQueueSize().incrementAndGet();
+
+ return true;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvicted0(int cnt) {
+ QueueHolder tup = holder;
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ while (true) {
+ int curSize = size.get();
+
+ if (curSize > maxBatches) {
+ // Just peek the first batch.
+ Batch first = evts.peekFirst();
+
+ if (first != null) {
+ assert first.finished();
+
+ Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+ if (!polled.isEmpty())
+ res.addAll(polled);
+
+ if (first.isEmpty()) {
+ ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+ first.markRemoved();
+
+ if (node != null && evts.unlinkx(node))
+ size.decrementAndGet();
+ }
+
+ if (res.size() == cnt)
+ break;
+ }
+ else
+ break;
+ }
+ else
+ break;
+ }
+
+ // Removed entries, update global size.
+ tup.totalQueueSize().addAndGet(-res.size());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvictedBatch0() {
+ QueueHolder tup = holder;
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ while (true) {
+ int curSize = size.get();
+
+ if (curSize > maxBatches) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ Batch polled = evts.poll();
+
+ if (polled != null) {
+ assert polled.finished();
+
+ // Mark batch deleted for consistency.
+ polled.markRemoved();
+
+ Collection<E> polled0 = polled.shrink();
+
+ // Result of shrink is empty, must retry the poll.
+ if (!polled0.isEmpty()) {
+ // Update global size.
+ tup.totalQueueSize().addAndGet(-polled0.size());
+
+ return polled0;
+ }
+ }
+ else {
+ // Polled was zero, so we must restore counter and return.
+ size.incrementAndGet();
+
+ return Collections.emptyList();
+ }
+ }
+ }
+ else
+ return Collections.emptyList();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> dequeue0(int cnt) {
+ QueueHolder tup = holder;
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ while (true) {
+ // Just peek the first batch.
+ Batch first = evts.peekFirst();
+
+ if (first != null) {
+ Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+ // We must check for finished before unlink as no elements
+ // can be added to batch after it is finished.
+ if (first.isEmpty() && first.emptyFinished()) {
+ ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+ first.markRemoved();
+
+ if (node != null && evts.unlinkx(node))
+ size.decrementAndGet();
+
+ assert first.isEmpty();
+ }
+ else if (polled.isEmpty())
+ break;
+
+ res.addAll(polled);
+
+ if (res.size() == cnt)
+ break;
+ }
+ else
+ break;
+ }
+
+ // Update global size.
+ tup.totalQueueSize().addAndGet(-res.size());
+
+ return res;
+ }
+
+ /**
+ * Consistency check, used for testing.
+ */
+ void consistencyCheck() {
+ QueueHolder win = holder;
+
+ Iterator<E> it = iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext()) {
+ it.next();
+
+ cnt++;
+ }
+
+ int cnt0 = 0;
+
+ for (Batch batch : win.batchQueue())
+ cnt0 += batch.size();
+
+ int sz = size();
+
+ assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
+ assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
+ assert win.batchQueue().size() == win.batchQueueSize().get();
+ }
+
+ /**
+ * Window structure.
+ */
+ private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public QueueHolder() {
+ // No-op.
+ }
+
+ /**
+ * @param batchQueue Batch queue.
+ * @param batchQueueSize Batch queue size counter.
+ * @param globalSize Global size counter.
+ */
+ private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
+ AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
+ super(batchQueue, batchQueueSize, globalSize);
+
+ assert batchQueue.size() == 1;
+ assert batchQueueSize.get() == 1;
+ }
+
+ /**
+ * @return Events queue.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public ConcurrentLinkedDeque8<Batch> batchQueue() {
+ return get1();
+ }
+
+ /**
+ * @return Batch queue size.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public AtomicInteger batchQueueSize() {
+ return get2();
+ }
+
+ /**
+ * @return Global queue size.
+ */
+ @SuppressWarnings("ConstantConditions")
+ public AtomicInteger totalQueueSize() {
+ return get3();
+ }
+ }
+
+ /**
+ * Batch.
+ */
+ private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Batch events. */
+ private ConcurrentLinkedDeque8<E> evts;
+
+ /** Capacity. */
+ private AtomicInteger cap;
+
+ /** Finished. */
+ private volatile boolean finished;
+
+ /** Queue node. */
+ @GridToStringExclude
+ private ConcurrentLinkedDeque8.Node<Batch> qNode;
+
+ /** Node removed flag. */
+ private volatile boolean rmvd;
+
+ /**
+ * @param batchSize Batch size.
+ */
+ private Batch(int batchSize) {
+ cap = new AtomicInteger(batchSize);
+
+ evts = new ConcurrentLinkedDeque8<>();
+ }
+
+ /**
+ * @return {@code True} if batch is removed.
+ */
+ public boolean removed() {
+ return rmvd;
+ }
+
+ /**
+ * Marks batch as removed.
+ */
+ public void markRemoved() {
+ rmvd = true;
+ }
+
+ /**
+ * Adds event to batch.
+ *
+ * @param evt Event to add.
+ * @return {@code True} if event was added, {@code false} if batch is full.
+ */
+ public boolean add(E evt) {
+ readLock().lock();
+
+ try {
+ if (finished)
+ return false;
+
+ while (true) {
+ int size = cap.get();
+
+ if (size > 0) {
+ if (cap.compareAndSet(size, size - 1)) {
+ evts.add(evt);
+
+ // Will go through write lock and finish batch.
+ if (size == 1)
+ finished = true;
+
+ return true;
+ }
+ }
+ else
+ return false;
+ }
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * @return Queue node.
+ */
+ public ConcurrentLinkedDeque8.Node<Batch> node() {
+ return qNode;
+ }
+
+ /**
+ * @param qNode Queue node.
+ */
+ public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
+ this.qNode = qNode;
+ }
+
+ /**
+ * Waits for latch count down after last event was added.
+ *
+ * @throws GridInterruptedException If wait was interrupted.
+ */
+ public void finish() throws GridInterruptedException {
+ writeLock().lock();
+
+ try {
+ // Safety.
+ assert cap.get() == 0;
+ assert finished;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /**
+ * @return {@code True} if batch is finished and no more events will be added to it.
+ */
+ public boolean finished() {
+ readLock().lock();
+
+ try {
+ return finished;
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets batch size.
+ *
+ * @return Batch size.
+ */
+ public int size() {
+ readLock().lock();
+
+ try {
+ return evts == null ? 0 : evts.sizex();
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * @return {@code True} if batch is empty.
+ */
+ public boolean isEmpty() {
+ readLock().lock();
+
+ try {
+ return evts == null || evts.isEmpty();
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
+ * be added to batch and it can be safely unlinked from the queue.
+ *
+ * @return {@code True} if batch is empty and finished.
+ */
+ public boolean emptyFinished() {
+ writeLock().lock();
+
+ try {
+ return finished && (evts == null || evts.isEmpty());
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
+ readLock().lock();
+
+ try {
+ if (evts != null)
+ return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
+
+ return new ConcurrentLinkedDeque8.IteratorEx<E>() {
+ @Override public boolean removex() {
+ throw new NoSuchElementException();
+ }
+
+ @Override public boolean hasNext() {
+ return false;
+ }
+
+ @Override public E next() {
+ throw new NoSuchElementException();
+ }
+
+ @Override public void remove() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Polls up to {@code cnt} objects from batch in concurrent fashion.
+ *
+ * @param cnt Number of objects to poll.
+ * @return Collection of polled elements or empty collection if nothing to poll.
+ */
+ public Collection<E> pollNonBatch(int cnt) {
+ readLock().lock();
+
+ try {
+ if (evts == null)
+ return Collections.emptyList();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ E evt = evts.poll();
+
+ if (evt != null)
+ res.add(evt);
+ else
+ return res;
+ }
+
+ return res;
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Shrinks this batch. No events can be polled from it after this method.
+ *
+ * @return Collection of events contained in batch before shrink (empty collection in
+ * case no events were present).
+ */
+ public Collection<E> shrink() {
+ writeLock().lock();
+
+ try {
+ if (evts == null)
+ return Collections.emptyList();
+
+ // Since iterator can concurrently delete elements, we must poll here.
+ Collection<E> res = new ArrayList<>(evts.sizex());
+
+ E o;
+
+ while ((o = evts.poll()) != null)
+ res.add(o);
+
+ // Nothing cal be polled after shrink.
+ evts = null;
+
+ return res;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ ConcurrentLinkedDeque8<E> evts0 = evts;
+
+ return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
new file mode 100644
index 0000000..8f50fb4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeSortedWindow.java
@@ -0,0 +1,210 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Size-bounded sorted window. Unlike {@link StreamerBoundedSizeWindow}, which limits
+ * window only on size, this window also provides events in sorted order.
+ */
+public class StreamerBoundedSizeSortedWindow<E>
+ extends StreamerBoundedSizeWindowAdapter<E, StreamerBoundedSizeSortedWindow.Holder<E>> {
+ /** Comparator. */
+ private Comparator<E> comp;
+
+ /** Order counter. */
+ private AtomicLong orderCnt = new AtomicLong();
+
+ /**
+ * Gets event comparator.
+ *
+ * @return Event comparator.
+ */
+ public Comparator<E> getComparator() {
+ return comp;
+ }
+
+ /**
+ * Sets event comparator.
+ *
+ * @param comp Comparator.
+ */
+ public void setComparator(Comparator<E> comp) {
+ this.comp = comp;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected Collection<Holder<E>> newCollection() {
+ final Comparator<E> comp0 = comp;
+
+ Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() {
+ @Override public int compare(Holder<E> h1, Holder<E> h2) {
+ if (h1 == h2)
+ return 0;
+
+ int diff = comp0 == null ?
+ ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val);
+
+ if (diff != 0)
+ return diff;
+ else {
+ assert h1.order != h2.order;
+
+ return h1.order < h2.order ? -1 : 1;
+ }
+ }
+ });
+
+ return (Collection)col;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) {
+ if (comp == null) {
+ if (!(evt instanceof Comparable))
+ throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " +
+ "comparator is specified: " + evt);
+ }
+
+ if (set != null) {
+ if (set.add(evt)) {
+ col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
+
+ return true;
+ }
+
+ return false;
+ }
+ else {
+ col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
+
+ return true;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) {
+ int cnt = 0;
+
+ for (E evt : evts) {
+ if (addInternal(evt, col, set))
+ cnt++;
+ }
+
+ return cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) {
+ Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast();
+
+ if (set != null && h != null)
+ set.remove(h.val);
+
+ return h == null ? null : h.val;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col,
+ final Set<E> set, final AtomicInteger size) {
+ final Iterator<Holder<E>> it = col.iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ private Holder<E> lastRet;
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public E next() {
+ lastRet = it.next();
+
+ return lastRet.val;
+ }
+
+ @Override public E removex() {
+ if (lastRet == null)
+ throw new IllegalStateException();
+
+ if (col.remove(lastRet)) {
+ if (set != null)
+ set.remove(lastRet.val);
+
+ size.decrementAndGet();
+
+ return lastRet.val;
+ }
+ else
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Value wrapper.
+ */
+ @SuppressWarnings("PackageVisibleInnerClass")
+ static class Holder<E> {
+ /** Value. */
+ private E val;
+
+ /** Order to distinguish between objects for which comparator returns 0. */
+ private long order;
+
+ /**
+ * @param val Value to hold.
+ * @param order Adding order.
+ */
+ private Holder(E val, long order) {
+ this.val = val;
+ this.order = order;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return false;
+
+ if (!(obj instanceof Holder))
+ return false;
+
+ Holder h = (Holder)obj;
+
+ return F.eq(val, h.val) && order == h.order;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) {
+ assert col.size() == size.get();
+
+ if (set != null) {
+ // Check no duplicates in collection.
+
+ Collection<Object> vals = new HashSet<>();
+
+ for (Object evt : col)
+ assert vals.add(((Holder)evt).val);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
new file mode 100644
index 0000000..1a51f52
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindow.java
@@ -0,0 +1,136 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Queue window bounded by number of elements in queue. After adding elements to this window called
+ * must check for evicted events.
+ * <p>
+ * It is guaranteed that window size will never get less then maximum size when poling from this window
+ * concurrently from different threads.
+ */
+public class StreamerBoundedSizeWindow<E> extends StreamerBoundedSizeWindowAdapter<E, E> {
+ /** {@inheritDoc} */
+ @Override protected Collection<E> newCollection() {
+ return new ConcurrentLinkedDeque8<>();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set,
+ final AtomicInteger size) {
+ final ConcurrentLinkedDeque8.IteratorEx<E> it =
+ (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ private E lastRet;
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public E next() {
+ lastRet = it.next();
+
+ return lastRet;
+ }
+
+ @Override public E removex() {
+ if (it.removex()) {
+ if (set != null)
+ set.remove(lastRet);
+
+ size.decrementAndGet();
+
+ return lastRet;
+ }
+ else
+ return null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("IfMayBeConditional")
+ @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) {
+ assert col instanceof ConcurrentLinkedDeque8;
+
+ // If unique.
+ if (set != null) {
+ if (set.add(evt)) {
+ col.add(evt);
+
+ return true;
+ }
+
+ return false;
+ }
+ else {
+ col.add(evt);
+
+ return true;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) {
+ assert col instanceof ConcurrentLinkedDeque8;
+ if (set != null) {
+ int cnt = 0;
+
+ for (E evt : evts) {
+ if (set.add(evt)) {
+ col.add(evt);
+
+ cnt++;
+ }
+ }
+
+ return cnt;
+ }
+ else {
+ col.addAll(evts);
+
+ return evts.size();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) {
+ assert col instanceof ConcurrentLinkedDeque8;
+
+ E res = ((Queue<E>)col).poll();
+
+ if (set != null && res != null)
+ set.remove(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) {
+ assert col.size() == size.get();
+
+ if (set != null) {
+ // Check no duplicates in collection.
+
+ Collection<Object> vals = new HashSet<>();
+
+ for (Object evt : col)
+ assert vals.add(evt);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
new file mode 100644
index 0000000..c31b32e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedSizeWindowAdapter.java
@@ -0,0 +1,349 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Abstract non-public class for size-bound windows. Support reset.
+ */
+abstract class StreamerBoundedSizeWindowAdapter<E, T> extends StreamerWindowAdapter<E> {
+ /** Reference. */
+ private AtomicReference<WindowHolder> ref = new AtomicReference<>();
+
+ /** If true, only unique elements will be accepted. */
+ private boolean unique;
+
+ /** Window maximum size. */
+ protected int maxSize;
+
+ /**
+ * Gets window maximum size.
+ *
+ * @return Maximum size.
+ */
+ public int getMaximumSize() {
+ return maxSize;
+ }
+
+ /**
+ * Sets window maximum size.
+ *
+ * @param maxSize Maximum size.
+ */
+ public void setMaximumSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * @return True if only unique elements will be accepted.
+ */
+ public boolean isUnique() {
+ return unique;
+ }
+
+ /**
+ * @param unique If true, only unique elements will be accepted.
+ */
+ public void setUnique(boolean unique) {
+ this.unique = unique;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkConfiguration() throws GridException {
+ if (maxSize < 0)
+ throw new GridException("Failed to initialize window (maximumSize cannot be negative) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maxSize=" + maxSize +
+ ", unique=" + unique + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void stop0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ int size = ref.get().size().get();
+
+ return size > 0 ? size : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ int evictSize = size() - maxSize;
+
+ return evictSize > 0 ? evictSize : 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean enqueue0(E evt) {
+ add(evt);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvicted0(int cnt) {
+ Collection<E> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ E evicted = pollEvictedInternal();
+
+ if (evicted == null)
+ return res;
+
+ res.add(evicted);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvictedBatch0() {
+ E res = pollEvictedInternal();
+
+ if (res == null)
+ return Collections.emptyList();
+
+ return Collections.singleton(res);
+ }
+
+ /**
+ * Poll evicted internal implementation.
+ *
+ * @return Evicted element.
+ */
+ @Nullable private E pollEvictedInternal() {
+ WindowHolder tup = ref.get();
+
+ AtomicInteger size = tup.size();
+
+ while (true) {
+ int curSize = size.get();
+
+ if (curSize > maxSize) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ E evt = pollInternal(tup.collection(), tup.set());
+
+ if (evt != null)
+ return evt;
+ else {
+ // No actual events in queue, it means that other thread is just adding event.
+ // return null as it is a concurrent add call.
+ size.incrementAndGet();
+
+ return null;
+ }
+ }
+ }
+ else
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> dequeue0(int cnt) {
+ WindowHolder tup = ref.get();
+
+ AtomicInteger size = tup.size();
+ Collection<T> evts = tup.collection();
+
+ Collection<E> resCol = new ArrayList<>(cnt);
+
+ while (true) {
+ int curSize = size.get();
+
+ if (curSize > 0) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ E res = pollInternal(evts, tup.set());
+
+ if (res != null) {
+ resCol.add(res);
+
+ if (resCol.size() >= cnt)
+ return resCol;
+ }
+ else {
+ size.incrementAndGet();
+
+ return resCol;
+ }
+ }
+ }
+ else
+ return resCol;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iterator0() {
+ WindowHolder win = ref.get();
+
+ return iteratorInternal(win.collection(), win.set(), win.size());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reset0() {
+ ref.set(new WindowHolder(newCollection(),
+ unique ? new GridConcurrentHashSet<E>() : null,
+ new AtomicInteger()));
+ }
+
+ /**
+ * @param evt Event to add.
+ */
+ private void add(E evt) {
+ WindowHolder tup = ref.get();
+
+ if (addInternal(evt, tup.collection(), tup.set()))
+ tup.size().incrementAndGet();
+ }
+
+ /**
+ * @param evts Events to add.
+ */
+ private void addAll(Collection<E> evts) {
+ WindowHolder tup = ref.get();
+
+ int cnt = addAllInternal(evts, tup.collection(), tup.set());
+
+ tup.size().addAndGet(cnt);
+ }
+
+ /**
+ * Checks window consistency. Used for testing.
+ */
+ void consistencyCheck() {
+ WindowHolder win = ref.get();
+
+ consistencyCheck(win.collection(), win.set(), win.size());
+ }
+
+ /**
+ * Get underlying collection.
+ *
+ * @return Collection.
+ */
+ @SuppressWarnings("ConstantConditions")
+ protected Collection<T> collection() {
+ return ref.get().get1();
+ }
+
+ /**
+ * Creates new collection specific for window implementation. This collection will be subsequently passed
+ * to addInternal(...) and pollInternal() methods.
+ *
+ * @return Collection - holder.
+ */
+ protected abstract Collection<T> newCollection();
+
+ /**
+ * Adds event to queue implementation.
+ *
+ * @param evt Event to add.
+ * @param col Collection to add to.
+ * @param set Set to check.
+ * @return {@code True} if event was added.
+ */
+ protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set);
+
+ /**
+ * Adds all events to queue implementation.
+ *
+ * @param evts Events to add.
+ * @param col Collection to add to.
+ * @param set Set to check.
+ * @return Added events number.
+ */
+ protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set);
+
+ /**
+ * @param col Collection to add to.
+ * @param set Set to check.
+ * @return Polled object.
+ */
+ @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set);
+
+ /**
+ * Creates iterator based on implementation collection type.
+ *
+ * @param col Collection.
+ * @param set Set to check.
+ * @param size Size.
+ * @return Iterator.
+ */
+ protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set,
+ AtomicInteger size);
+
+ /**
+ * Checks consistency. Used in tests.
+ *
+ * @param col Collection.
+ * @param set Set if unique.
+ * @param size Size holder.
+ */
+ protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size);
+
+ /**
+ * Window holder.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public WindowHolder() {
+ // No-op.
+ }
+
+ /**
+ * @param col Collection.
+ * @param set Set if unique.
+ * @param size Window size counter.
+ */
+ WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) {
+ super(col, set, size);
+ }
+
+ /**
+ * @return Collection.
+ */
+ public Collection<T> collection() {
+ return get1();
+ }
+
+ /**
+ * @return Set.
+ */
+ public Set<E> set() {
+ return get2();
+ }
+
+ /**
+ * @return Size.
+ */
+ public AtomicInteger size() {
+ return get3();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
new file mode 100644
index 0000000..187c34e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeBatchWindow.java
@@ -0,0 +1,897 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.lang.*;
+import org.gridgain.grid.util.tostring.*;
+import org.jdk8.backport.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Window that accumulates events in batches, and is bounded by time and maximum number of batches.
+ */
+public class StreamerBoundedTimeBatchWindow<E> extends StreamerWindowAdapter<E> {
+ /** Batch size. */
+ private int batchSize;
+
+ /** Maximum batches. */
+ private int maxBatches;
+
+ /** */
+ private long batchTimeInterval;
+
+ /** Atomic reference for queue and size. */
+ private AtomicReference<WindowHolder> ref = new AtomicReference<>();
+
+ /** Enqueue lock. */
+ private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
+
+ /**
+ * Gets maximum number of batches can be stored in window.
+ *
+ * @return Maximum number of batches for window.
+ */
+ public int getMaximumBatches() {
+ return maxBatches;
+ }
+
+ /**
+ * Sets maximum number of batches can be stored in window.
+ *
+ * @param maxBatches Maximum number of batches for window.
+ */
+ public void setMaximumBatches(int maxBatches) {
+ this.maxBatches = maxBatches;
+ }
+
+ /**
+ * Gets batch size.
+ *
+ * @return Batch size.
+ */
+ public int getBatchSize() {
+ return batchSize;
+ }
+
+ /**
+ * Sets batch size.
+ *
+ * @param batchSize Batch size.
+ */
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Gets batch time interval.
+ *
+ * @return Batch time interval.
+ */
+ public long getBatchTimeInterval() {
+ return batchTimeInterval;
+ }
+
+ /**
+ * Sets batch time interval.
+ *
+ * @param batchTimeInterval Batch time interval.
+ */
+ public void setBatchTimeInterval(long batchTimeInterval) {
+ this.batchTimeInterval = batchTimeInterval;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkConfiguration() throws GridException {
+ if (maxBatches < 0)
+ throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maximumBatches=" + maxBatches +
+ ", batchSize=" + batchSize +
+ ", batchTimeInterval=" + batchTimeInterval + ']');
+
+ if (batchSize < 0)
+ throw new GridException("Failed to initialize window (batchSize cannot be negative) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maximumBatches=" + maxBatches +
+ ", batchSize=" + batchSize +
+ ", batchTimeInterval=" + batchTimeInterval + ']');
+ else if (batchSize == 0)
+ batchSize = Integer.MAX_VALUE;
+
+ if (batchTimeInterval <= 0)
+ throw new GridException("Failed to initialize window (batchTimeInterval must be positive) " +
+ "[windowClass=" + getClass().getSimpleName() +
+ ", maximumBatches=" + maxBatches +
+ ", batchSize=" + batchSize +
+ ", batchTimeInterval=" + batchTimeInterval + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void stop0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reset0() {
+ ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
+
+ Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval);
+
+ ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
+
+ b.node(n);
+
+ ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return ref.get().totalQueueSize().get();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iterator0() {
+ final WindowHolder win = ref.get();
+
+ final Iterator<Batch> batchIt = win.batchQueue().iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ /** Current batch iterator. */
+ private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
+
+ /** Next batch iterator. Will be null if no more batches available. */
+ private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
+
+ /** Last returned value. */
+ private E lastRet;
+
+ {
+ curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("SimplifiableIfStatement")
+ @Override public boolean hasNext() {
+ if (curBatchIt != null) {
+ if (curBatchIt.hasNext())
+ return true;
+
+ return nextBatchIt != null && nextBatchIt.hasNext();
+ }
+ else
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public E next() {
+ if (curBatchIt == null)
+ throw new NoSuchElementException();
+
+ if (!curBatchIt.hasNext()) {
+ if (nextBatchIt != null) {
+ curBatchIt = nextBatchIt;
+
+ nextBatchIt = null;
+
+ lastRet = curBatchIt.next();
+ }
+ else
+ throw new NoSuchElementException();
+ }
+ else {
+ E next = curBatchIt.next();
+
+ // Moved to last element in batch - check for next iterator.
+ if (!curBatchIt.hasNext())
+ advanceBatch();
+
+ lastRet = next;
+ }
+
+ return lastRet;
+ }
+
+ /** {@inheritDoc} */
+ @Override public E removex() {
+ if (curBatchIt == null)
+ throw new NoSuchElementException();
+
+ if (curBatchIt.removex()) {
+ // Decrement global size if deleted.
+ win.totalQueueSize().decrementAndGet();
+
+ return lastRet;
+ }
+ else
+ return null;
+ }
+
+ /**
+ * Moves to the next batch.
+ */
+ private void advanceBatch() {
+ if (batchIt.hasNext()) {
+ Batch batch = batchIt.next();
+
+ nextBatchIt = batch.iterator();
+ }
+ else
+ nextBatchIt = null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ WindowHolder win = ref.get();
+
+ int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0;
+
+ long now = U.currentTimeMillis();
+
+ Iterator<Batch> it = win.batchQueue().iterator();
+
+ int size = 0;
+
+ int idx = 0;
+
+ while (it.hasNext()) {
+ Batch batch = it.next();
+
+ if (idx++ < oversizeCnt || batch.batchEndTs < now)
+ size += batch.size();
+ }
+
+ return size;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean enqueue0(E evt) {
+ try {
+ return enqueue0(evt, U.currentTimeMillis());
+ }
+ catch (GridInterruptedException ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Enqueue event to window.
+ *
+ * @param evt Event to add.
+ * @param ts Event timestamp.
+ * @return {@code True} if event was added.
+ *
+ * @throws GridInterruptedException If thread was interrupted.
+ */
+ private boolean enqueue0(E evt, long ts) throws GridInterruptedException {
+ WindowHolder tup = ref.get();
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ while (true) {
+ Batch last = evts.peekLast();
+
+ if (last == null || !last.add(evt, ts)) {
+ // This call will ensure that last object is actually added to batch
+ // before we add new batch to events queue.
+ // If exception is thrown here, window will be left in consistent state.
+ if (last != null)
+ last.finish();
+
+ // Add new batch to queue in write lock.
+ if (enqueueLock.writeLock().tryLock()) {
+ try {
+ Batch first0 = evts.peekLast();
+
+ if (first0 == last) {
+ Batch batch = new Batch(batchSize, ts + batchTimeInterval);
+
+ ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
+
+ batch.node(node);
+
+ size.incrementAndGet();
+
+ // If batch was removed in other thread.
+ if (batch.removed() && evts.unlinkx(node))
+ size.decrementAndGet();
+ }
+ }
+ finally {
+ enqueueLock.writeLock().unlock();
+ }
+ }
+ else {
+ // Acquire read lock to wait for batch enqueue.
+ enqueueLock.readLock().lock();
+
+ try {
+ evts.peekLast();
+ }
+ finally {
+ enqueueLock.readLock().unlock();
+ }
+ }
+ }
+ else {
+ // Event was added, global size increment.
+ tup.totalQueueSize().incrementAndGet();
+
+ return true;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvicted0(int cnt) {
+ WindowHolder tup = ref.get();
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ while (true) {
+ int curSize = size.get();
+
+ // Just peek the first batch.
+ Batch first = evts.peekFirst();
+
+ if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) {
+ assert first.finished();
+
+ Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+ if (!polled.isEmpty())
+ res.addAll(polled);
+
+ if (first.isEmpty()) {
+ ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+ first.markRemoved();
+
+ if (node != null && evts.unlinkx(node))
+ size.decrementAndGet();
+ }
+
+ if (res.size() == cnt)
+ break;
+ }
+ else
+ break;
+ }
+
+ // Removed entries, update global size.
+ tup.totalQueueSize().addAndGet(-res.size());
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvictedBatch0() {
+ WindowHolder tup = ref.get();
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ while (true) {
+ int curSize = size.get();
+
+ if (maxBatches > 0 && curSize > maxBatches) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ Batch polled = evts.poll();
+
+ if (polled != null) {
+ assert polled.finished();
+
+ // Mark batch removed for consistency.
+ polled.markRemoved();
+
+ Collection<E> polled0 = polled.shrink();
+
+ // Result of shrink is empty, must retry the poll.
+ if (!polled0.isEmpty()) {
+ // Update global size.
+ tup.totalQueueSize().addAndGet(-polled0.size());
+
+ return polled0;
+ }
+ }
+ else {
+ // Polled was zero, so we must restore counter and return.
+ size.incrementAndGet();
+
+ return Collections.emptyList();
+ }
+ }
+ }
+ else {
+ while (true) {
+ Batch batch = evts.peekFirst();
+
+ // This call will finish batch and return true if batch is expired.
+ if (batch != null && batch.checkExpired()) {
+ assert batch.finished();
+
+ ConcurrentLinkedDeque8.Node<Batch> node = batch.node();
+
+ batch.markRemoved();
+
+ if (node != null && evts.unlinkx(node))
+ size.decrementAndGet();
+
+ Collection<E> col = batch.shrink();
+
+ tup.totalQueueSize().addAndGet(-col.size());
+
+ if (!col.isEmpty())
+ return col;
+ }
+ else
+ return Collections.emptyList();
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> dequeue0(int cnt) {
+ WindowHolder tup = ref.get();
+
+ ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
+ AtomicInteger size = tup.batchQueueSize();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ while (true) {
+ // Just peek the first batch.
+ Batch first = evts.peekFirst();
+
+ if (first != null) {
+ Collection<E> polled = first.pollNonBatch(cnt - res.size());
+
+ // We must check for finished before unlink as no elements
+ // can be added to batch after it is finished.
+ if (first.isEmpty() && first.emptyFinished()) {
+ ConcurrentLinkedDeque8.Node<Batch> node = first.node();
+
+ first.markRemoved();
+
+ if (node != null && evts.unlinkx(node))
+ size.decrementAndGet();
+
+ assert first.isEmpty();
+ }
+ else if (polled.isEmpty())
+ break;
+
+ res.addAll(polled);
+
+ if (res.size() == cnt)
+ break;
+ }
+ else
+ break;
+ }
+
+ // Update global size.
+ tup.totalQueueSize().addAndGet(-res.size());
+
+ return res;
+ }
+
+ /**
+ * Consistency check, used for testing.
+ */
+ void consistencyCheck() {
+ WindowHolder win = ref.get();
+
+ Iterator<E> it = iterator();
+
+ int cnt = 0;
+
+ while (it.hasNext()) {
+ it.next();
+
+ cnt++;
+ }
+
+ int cnt0 = 0;
+
+ for (Batch batch : win.batchQueue())
+ cnt0 += batch.size();
+
+ int sz = size();
+
+ assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
+ assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
+ assert win.batchQueue().size() == win.batchQueueSize().get();
+ }
+
+ /**
+ * Window structure.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public WindowHolder() {
+ // No-op.
+ }
+
+ /**
+ * @param batchQueue Batch queue.
+ * @param batchQueueSize Batch queue size counter.
+ * @param globalSize Global size counter.
+ */
+ private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
+ AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
+ super(batchQueue, batchQueueSize, globalSize);
+
+ assert batchQueue.size() == 1;
+ assert batchQueueSize.get() == 1;
+ }
+
+ /**
+ * @return Events queue.
+ */
+ public ConcurrentLinkedDeque8<Batch> batchQueue() {
+ return get1();
+ }
+
+ /**
+ * @return Batch queue size.
+ */
+ public AtomicInteger batchQueueSize() {
+ return get2();
+ }
+
+ /**
+ * @return Global queue size.
+ */
+ public AtomicInteger totalQueueSize() {
+ return get3();
+ }
+ }
+
+ /**
+ * Batch.
+ */
+ private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Batch events. */
+ private ConcurrentLinkedDeque8<E> evts;
+
+ /** Capacity. */
+ private AtomicInteger cap;
+
+ /** Batch end timestamp. */
+ private final long batchEndTs;
+
+ /** Finished flag. */
+ private boolean finished;
+
+ /** Queue node. */
+ @GridToStringExclude
+ private ConcurrentLinkedDeque8.Node<Batch> qNode;
+
+ /** Removed flag. */
+ private volatile boolean rmvd;
+
+ /**
+ * @param batchSize Batch size.
+ * @param batchEndTs Batch end timestamp.
+ */
+ private Batch(int batchSize, long batchEndTs) {
+ cap = new AtomicInteger(batchSize);
+ this.batchEndTs = batchEndTs;
+
+ evts = new ConcurrentLinkedDeque8<>();
+ }
+
+ /**
+ * @return {@code True} if removed.
+ */
+ public boolean removed() {
+ return rmvd;
+ }
+
+ /**
+ * Marks batch as removed.
+ */
+ public void markRemoved() {
+ rmvd = true;
+ }
+
+ /**
+ * Adds event to batch.
+ *
+ * @param evt Event to add.
+ * @param ts Event timestamp.
+ * @return {@code True} if event was added, {@code false} if batch is full.
+ */
+ public boolean add(E evt, long ts) {
+ if (ts <= batchEndTs) {
+ readLock().lock();
+
+ try {
+ if (finished)
+ // Finished was set inside write lock.
+ return false;
+
+ while (true) {
+ int size = cap.get();
+
+ if (size > 0) {
+ if (cap.compareAndSet(size, size - 1)) {
+ evts.add(evt);
+
+ // Will go through write lock and finish batch.
+ if (size == 1)
+ finished = true;
+
+ return true;
+ }
+ }
+ else
+ return false;
+ }
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+ else {
+ writeLock().lock();
+
+ try {
+ // No events could be added to this batch.
+ finished = true;
+
+ return false;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+ }
+
+ /**
+ * @return Queue node.
+ */
+ public ConcurrentLinkedDeque8.Node<Batch> node() {
+ return qNode;
+ }
+
+ /**
+ * @param qNode Queue node.
+ */
+ public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
+ this.qNode = qNode;
+ }
+
+ /**
+ * Waits for latch count down after last event was added.
+ *
+ * @throws GridInterruptedException If wait was interrupted.
+ */
+ public void finish() throws GridInterruptedException {
+ writeLock().lock();
+
+ try {
+ // Safety.
+ assert cap.get() == 0 || finished;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /**
+ * @return {@code True} if batch is finished and no more events will be added to it.
+ */
+ public boolean finished() {
+ readLock().lock();
+
+ try {
+ return finished;
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Gets batch size.
+ *
+ * @return Batch size.
+ */
+ public int size() {
+ readLock().lock();
+
+ try {
+ return evts == null ? 0 : evts.sizex();
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * @return {@code True} if batch is empty.
+ */
+ public boolean isEmpty() {
+ readLock().lock();
+
+ try {
+ return evts == null || evts.isEmpty();
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
+ * be added to batch and it can be safely unlinked from the queue.
+ *
+ * @return {@code True} if batch is empty and finished.
+ */
+ public boolean emptyFinished() {
+ writeLock().lock();
+
+ try {
+ return finished && (evts == null || evts.isEmpty());
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /**
+ * Checks if the batch has expired.
+ *
+ * @return {@code True} if the batch has expired, {@code false} otherwise.
+ */
+ public boolean checkExpired() {
+ if (U.currentTimeMillis() > batchEndTs) {
+ writeLock().lock();
+
+ try {
+ finished = true;
+
+ return true;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
+ readLock().lock();
+
+ try {
+ if (evts != null)
+ return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
+
+ return new ConcurrentLinkedDeque8.IteratorEx<E>() {
+ @Override public boolean removex() {
+ throw new NoSuchElementException();
+ }
+
+ @Override public boolean hasNext() {
+ return false;
+ }
+
+ @Override public E next() {
+ throw new NoSuchElementException();
+ }
+
+ @Override public void remove() {
+ throw new NoSuchElementException();
+ }
+ };
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Polls up to {@code cnt} objects from batch in concurrent fashion.
+ *
+ * @param cnt Number of objects to poll.
+ * @return Collection of polled elements (empty collection in case no events were
+ * present).
+ */
+ public Collection<E> pollNonBatch(int cnt) {
+ readLock().lock();
+
+ try {
+ if (evts == null)
+ return Collections.emptyList();
+
+ Collection<E> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ E evt = evts.poll();
+
+ if (evt != null)
+ res.add(evt);
+ else
+ return res;
+ }
+
+ return res;
+ }
+ finally {
+ readLock().unlock();
+ }
+ }
+
+ /**
+ * Shrinks this batch. No events can be polled from it after this method.
+ *
+ * @return Collection of events contained in batch before shrink (empty collection in
+ * case no events were present).
+ */
+ public Collection<E> shrink() {
+ writeLock().lock();
+
+ try {
+ if (evts == null)
+ return Collections.emptyList();
+
+ // Since iterator can concurrently delete elements, we must poll here.
+ Collection<E> res = new ArrayList<>(evts.sizex());
+
+ E o;
+
+ while ((o = evts.poll()) != null)
+ res.add(o);
+
+ // Nothing cal be polled after shrink.
+ evts = null;
+
+ return res;
+ }
+ finally {
+ writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ ConcurrentLinkedDeque8<E> evts0 = evts;
+
+ return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
+ }
+ }
+}