You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:15 UTC
[14/32] incubator-ignite git commit: # Renaming
# Renaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/5fe8de26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/5fe8de26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/5fe8de26
Branch: refs/heads/master
Commit: 5fe8de26ae794b643c661dce71ab5e4f3a2755f7
Parents: 34694f3
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 5 12:45:10 2014 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 5 12:45:10 2014 +0300
----------------------------------------------------------------------
examples/config/example-streamer.xml | 12 +-
.../streamer/GridStreamProcessor.java | 4 +-
.../gridgain/grid/streamer/StreamerWindow.java | 13 +-
.../index/GridStreamerIndexProvider.java | 3 +-
.../GridStreamerBoundedSizeBatchWindow.java | 795 ----------------
.../GridStreamerBoundedSizeSortedWindow.java | 210 -----
.../window/GridStreamerBoundedSizeWindow.java | 136 ---
.../GridStreamerBoundedSizeWindowAdapter.java | 349 --------
.../GridStreamerBoundedTimeBatchWindow.java | 897 -------------------
.../window/GridStreamerBoundedTimeWindow.java | 454 ----------
.../window/GridStreamerUnboundedWindow.java | 103 ---
.../window/GridStreamerWindowAdapter.java | 529 -----------
.../window/StreamerBoundedSizeBatchWindow.java | 795 ++++++++++++++++
.../window/StreamerBoundedSizeSortedWindow.java | 210 +++++
.../window/StreamerBoundedSizeWindow.java | 136 +++
.../StreamerBoundedSizeWindowAdapter.java | 349 ++++++++
.../window/StreamerBoundedTimeBatchWindow.java | 897 +++++++++++++++++++
.../window/StreamerBoundedTimeWindow.java | 454 ++++++++++
.../window/StreamerUnboundedWindow.java | 103 +++
.../streamer/window/StreamerWindowAdapter.java | 529 +++++++++++
.../average/spring-streamer-average-base.xml | 2 +-
.../streamer/GridStreamerEvictionSelfTest.java | 2 +-
.../streamer/GridStreamerFailoverSelfTest.java | 2 +-
.../streamer/GridStreamerSelfTest.java | 2 +-
.../marshaller/GridMarshallerAbstractTest.java | 2 +-
.../index/GridStreamerIndexSelfTest.java | 12 +-
.../window/GridStreamerWindowSelfTest.java | 40 +-
.../streamer/GridStreamerIndexLoadTest.java | 2 +-
28 files changed, 3520 insertions(+), 3522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/examples/config/example-streamer.xml
----------------------------------------------------------------------
diff --git a/examples/config/example-streamer.xml b/examples/config/example-streamer.xml
index 868d277..389b143 100644
--- a/examples/config/example-streamer.xml
+++ b/examples/config/example-streamer.xml
@@ -67,7 +67,7 @@
Window size is 500, which means that we will be
computing running average over the last 500 events.
-->
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedSizeWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
<property name="maximumSize" value="500"/>
</bean>
</property>
@@ -110,7 +110,7 @@
For this example we use a running window with upper bound of 10,000 events.
This means that streamer will find most popular numbers among last 10,000 events.
-->
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedSizeWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
<property name="maximumSize" value="10000"/>
<property name="indexes">
@@ -166,7 +166,7 @@
For this example we use a running batch window with upper bound of 1 second.
This means that streamer composes a batch with events within 1 second time interval.
-->
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedTimeBatchWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
<property name="name" value="stage1"/>
<property name="batchTimeInterval" value="1000"/>
<property name="batchSize" value="2147483647"/>
@@ -177,7 +177,7 @@
For this example we use a running batch window with upper bound of 2 seconds.
This means that streamer composes a batch with events within 2 seconds time interval.
-->
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedTimeBatchWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow">
<property name="name" value="stage2"/>
<property name="batchTimeInterval" value="2000"/>
<property name="batchSize" value="2147483647"/>
@@ -215,7 +215,7 @@
This means that only events for last 10 seconds will be present in
these windows (other events are marked for eviction).
-->
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedTimeWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
<property name="name" value="AddToWindowStage"/>
<property name="timeInterval" value="10000"/>
@@ -235,7 +235,7 @@
</list>
</property>
</bean>
- <bean class="org.gridgain.grid.streamer.window.GridStreamerBoundedTimeWindow">
+ <bean class="org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow">
<property name="name" value="DetectPlacesStage"/>
<property name="timeInterval" value="10000"/>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index 10cb8ac..194f081 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -111,8 +111,8 @@ public class GridStreamProcessor extends GridProcessorAdapter {
", window=" + win.name() + ']', e);
}
- if (win instanceof GridStreamerWindowAdapter) {
- GridStreamerIndexProvider[] idxs = ((GridStreamerWindowAdapter)win).indexProviders();
+ if (win instanceof StreamerWindowAdapter) {
+ GridStreamerIndexProvider[] idxs = ((StreamerWindowAdapter)win).indexProviders();
if (idxs != null && idxs.length > 0) {
for (GridStreamerIndexProvider idx : idxs) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
index 4f2b462..1a16ff5 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/StreamerWindow.java
@@ -11,7 +11,6 @@ package org.gridgain.grid.streamer;
import org.gridgain.grid.*;
import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.streamer.window.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -25,12 +24,12 @@ import java.util.*;
* <p>
* GridGain comes with following rolling windows implementations out of the box:
* <ul>
- * <li>{@link GridStreamerUnboundedWindow}</li>
- * <li>{@link GridStreamerBoundedSizeWindow}</li>
- * <li>{@link GridStreamerBoundedSizeBatchWindow}</li>
- * <li>{@link GridStreamerBoundedSizeSortedWindow}</li>
- * <li>{@link GridStreamerBoundedTimeWindow}</li>
- * <li>{@link GridStreamerBoundedTimeBatchWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerUnboundedWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeBatchWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedSizeSortedWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeWindow}</li>
+ * <li>{@link org.gridgain.grid.streamer.window.StreamerBoundedTimeBatchWindow}</li>
* </ul>
* <p>
* Streamer window is configured via {@link StreamerConfiguration#getWindows()} method.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
index 43030fc..ebcd610 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/GridStreamerIndexProvider.java
@@ -10,14 +10,13 @@
package org.gridgain.grid.streamer.index;
import org.gridgain.grid.*;
-import org.gridgain.grid.streamer.window.*;
/**
* Represents an actual instance of an index. Used by a {@link org.gridgain.grid.streamer.StreamerWindow}
* to perform event indexing.
* <p>
* To configure index for a streamer window, use
- * {@link GridStreamerWindowAdapter#setIndexes(GridStreamerIndexProvider[])}.
+ * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(GridStreamerIndexProvider[])}.
*/
public interface GridStreamerIndexProvider<E, K, V> extends GridStreamerIndexProviderMBean {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeBatchWindow.java
deleted file mode 100644
index a030440..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeBatchWindow.java
+++ /dev/null
@@ -1,795 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that is bounded by size and accumulates events to batches.
- */
-public class GridStreamerBoundedSizeBatchWindow<E> extends GridStreamerWindowAdapter<E> {
- /** Max size. */
- private int batchSize;
-
- /** Min size. */
- private int maxBatches;
-
- /** Reference for queue and size. */
- private volatile QueueHolder holder;
-
- /** Enqueue lock. */
- private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
- /**
- * Gets maximum number of batches can be stored in window.
- *
- * @return Maximum number of batches for window.
- */
- public int getMaximumBatches() {
- return maxBatches;
- }
-
- /**
- * Sets maximum number of batches can be stored in window.
- *
- * @param maxBatches Maximum number of batches for window.
- */
- public void setMaximumBatches(int maxBatches) {
- this.maxBatches = maxBatches;
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Sets batch size.
- *
- * @param batchSize Batch size.
- */
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (batchSize <= 0)
- throw new GridException("Failed to initialize window (batchSize size must be positive) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize + ']');
-
- if (maxBatches < 0)
- throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
- Batch b = new Batch(batchSize);
-
- ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
- b.node(n);
-
- holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger());
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return holder.totalQueueSize().get();
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final QueueHolder win = holder;
-
- final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
- return new GridStreamerWindowIterator<E>() {
- /** Current batch iterator. */
- private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
- /** Next batch iterator. Will be null if no more batches available. */
- private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
- /** Last returned value. */
- private E lastRet;
-
- {
- curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean hasNext() {
- if (curBatchIt != null) {
- if (curBatchIt.hasNext())
- return true;
-
- return nextBatchIt != null && nextBatchIt.hasNext();
- }
- else
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (!curBatchIt.hasNext()) {
- if (nextBatchIt != null) {
- curBatchIt = nextBatchIt;
-
- nextBatchIt = null;
-
- lastRet = curBatchIt.next();
- }
- else
- throw new NoSuchElementException();
- }
- else {
- E next = curBatchIt.next();
-
- // Moved to last element in batch - check for next iterator.
- if (!curBatchIt.hasNext())
- advanceBatch();
-
- lastRet = next;
- }
-
- return lastRet;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public E removex() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (curBatchIt.removex()) {
- // Decrement global size if deleted.
- win.totalQueueSize().decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
-
- /**
- * Moves to the next batch.
- */
- private void advanceBatch() {
- if (batchIt.hasNext()) {
- Batch batch = batchIt.next();
-
- nextBatchIt = batch.iterator();
- }
- else
- nextBatchIt = null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- QueueHolder win = holder;
-
- int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches);
-
- Iterator<Batch> it = win.batchQueue().iterator();
-
- int size = 0;
-
- int idx = 0;
-
- while (it.hasNext()) {
- Batch batch = it.next();
-
- if (idx++ < oversizeCnt)
- size += batch.size();
- }
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- try {
- return enqueueInternal(evt);
- }
- catch (GridInterruptedException ignored) {
- return false;
- }
- }
-
- /**
- * Enqueue event to window.
- *
- * @param evt Event to add.
- * @return {@code True} if event was added.
- *
- * @throws GridInterruptedException If thread was interrupted.
- */
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- private boolean enqueueInternal(E evt) throws GridInterruptedException {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- Batch last = evts.peekLast();
-
- if (last == null || !last.add(evt)) {
- // This call will ensure that last object is actually added to batch
- // before we add new batch to events queue.
- // If exception is thrown here, window will be left in consistent state.
- if (last != null)
- last.finish();
-
- // Add new batch to queue in write lock.
- if (enqueueLock.writeLock().tryLock()) {
- try {
- Batch first0 = evts.peekLast();
-
- if (first0 == last) {
- Batch batch = new Batch(batchSize);
-
- ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
- batch.node(node);
-
- size.incrementAndGet();
-
- if (batch.removed() && evts.unlinkx(node))
- size.decrementAndGet();
- }
- }
- finally {
- enqueueLock.writeLock().unlock();
- }
- }
- else {
- // Acquire read lock to wait for batch enqueue.
- enqueueLock.readLock().lock();
-
- enqueueLock.readLock().unlock();
- }
- }
- else {
- // Event was added, global size increment.
- tup.totalQueueSize().incrementAndGet();
-
- return true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxBatches) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- assert first.finished();
-
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- if (!polled.isEmpty())
- res.addAll(polled);
-
- if (first.isEmpty()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
- }
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
- else
- break;
- }
-
- // Removed entries, update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxBatches) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Batch polled = evts.poll();
-
- if (polled != null) {
- assert polled.finished();
-
- // Mark batch deleted for consistency.
- polled.markRemoved();
-
- Collection<E> polled0 = polled.shrink();
-
- // Result of shrink is empty, must retry the poll.
- if (!polled0.isEmpty()) {
- // Update global size.
- tup.totalQueueSize().addAndGet(-polled0.size());
-
- return polled0;
- }
- }
- else {
- // Polled was zero, so we must restore counter and return.
- size.incrementAndGet();
-
- return Collections.emptyList();
- }
- }
- }
- else
- return Collections.emptyList();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- // We must check for finished before unlink as no elements
- // can be added to batch after it is finished.
- if (first.isEmpty() && first.emptyFinished()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- assert first.isEmpty();
- }
- else if (polled.isEmpty())
- break;
-
- res.addAll(polled);
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /**
- * Consistency check, used for testing.
- */
- void consistencyCheck() {
- QueueHolder win = holder;
-
- Iterator<E> it = iterator();
-
- int cnt = 0;
-
- while (it.hasNext()) {
- it.next();
-
- cnt++;
- }
-
- int cnt0 = 0;
-
- for (Batch batch : win.batchQueue())
- cnt0 += batch.size();
-
- int sz = size();
-
- assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
- assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
- assert win.batchQueue().size() == win.batchQueueSize().get();
- }
-
- /**
- * Window structure.
- */
- private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public QueueHolder() {
- // No-op.
- }
-
- /**
- * @param batchQueue Batch queue.
- * @param batchQueueSize Batch queue size counter.
- * @param globalSize Global size counter.
- */
- private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
- AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
- super(batchQueue, batchQueueSize, globalSize);
-
- assert batchQueue.size() == 1;
- assert batchQueueSize.get() == 1;
- }
-
- /**
- * @return Events queue.
- */
- @SuppressWarnings("ConstantConditions")
- public ConcurrentLinkedDeque8<Batch> batchQueue() {
- return get1();
- }
-
- /**
- * @return Batch queue size.
- */
- @SuppressWarnings("ConstantConditions")
- public AtomicInteger batchQueueSize() {
- return get2();
- }
-
- /**
- * @return Global queue size.
- */
- @SuppressWarnings("ConstantConditions")
- public AtomicInteger totalQueueSize() {
- return get3();
- }
- }
-
- /**
- * Batch.
- */
- private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Batch events. */
- private ConcurrentLinkedDeque8<E> evts;
-
- /** Capacity. */
- private AtomicInteger cap;
-
- /** Finished. */
- private volatile boolean finished;
-
- /** Queue node. */
- @GridToStringExclude
- private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
- /** Node removed flag. */
- private volatile boolean rmvd;
-
- /**
- * @param batchSize Batch size.
- */
- private Batch(int batchSize) {
- cap = new AtomicInteger(batchSize);
-
- evts = new ConcurrentLinkedDeque8<>();
- }
-
- /**
- * @return {@code True} if batch is removed.
- */
- public boolean removed() {
- return rmvd;
- }
-
- /**
- * Marks batch as removed.
- */
- public void markRemoved() {
- rmvd = true;
- }
-
- /**
- * Adds event to batch.
- *
- * @param evt Event to add.
- * @return {@code True} if event was added, {@code false} if batch is full.
- */
- public boolean add(E evt) {
- readLock().lock();
-
- try {
- if (finished)
- return false;
-
- while (true) {
- int size = cap.get();
-
- if (size > 0) {
- if (cap.compareAndSet(size, size - 1)) {
- evts.add(evt);
-
- // Will go through write lock and finish batch.
- if (size == 1)
- finished = true;
-
- return true;
- }
- }
- else
- return false;
- }
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return Queue node.
- */
- public ConcurrentLinkedDeque8.Node<Batch> node() {
- return qNode;
- }
-
- /**
- * @param qNode Queue node.
- */
- public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
- this.qNode = qNode;
- }
-
- /**
- * Waits for latch count down after last event was added.
- *
- * @throws GridInterruptedException If wait was interrupted.
- */
- public void finish() throws GridInterruptedException {
- writeLock().lock();
-
- try {
- // Safety.
- assert cap.get() == 0;
- assert finished;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is finished and no more events will be added to it.
- */
- public boolean finished() {
- readLock().lock();
-
- try {
- return finished;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int size() {
- readLock().lock();
-
- try {
- return evts == null ? 0 : evts.sizex();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is empty.
- */
- public boolean isEmpty() {
- readLock().lock();
-
- try {
- return evts == null || evts.isEmpty();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
- * be added to batch and it can be safely unlinked from the queue.
- *
- * @return {@code True} if batch is empty and finished.
- */
- public boolean emptyFinished() {
- writeLock().lock();
-
- try {
- return finished && (evts == null || evts.isEmpty());
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
- readLock().lock();
-
- try {
- if (evts != null)
- return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
- return new ConcurrentLinkedDeque8.IteratorEx<E>() {
- @Override public boolean removex() {
- throw new NoSuchElementException();
- }
-
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public E next() {
- throw new NoSuchElementException();
- }
-
- @Override public void remove() {
- throw new NoSuchElementException();
- }
- };
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Polls up to {@code cnt} objects from batch in concurrent fashion.
- *
- * @param cnt Number of objects to poll.
- * @return Collection of polled elements or empty collection if nothing to poll.
- */
- public Collection<E> pollNonBatch(int cnt) {
- readLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evt = evts.poll();
-
- if (evt != null)
- res.add(evt);
- else
- return res;
- }
-
- return res;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Shrinks this batch. No events can be polled from it after this method.
- *
- * @return Collection of events contained in batch before shrink (empty collection in
- * case no events were present).
- */
- public Collection<E> shrink() {
- writeLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- // Since iterator can concurrently delete elements, we must poll here.
- Collection<E> res = new ArrayList<>(evts.sizex());
-
- E o;
-
- while ((o = evts.poll()) != null)
- res.add(o);
-
- // Nothing cal be polled after shrink.
- evts = null;
-
- return res;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- ConcurrentLinkedDeque8<E> evts0 = evts;
-
- return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeSortedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeSortedWindow.java
deleted file mode 100644
index 7f02e14..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeSortedWindow.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Size-bounded sorted window. Unlike {@link GridStreamerBoundedSizeWindow}, which limits
- * window only on size, this window also provides events in sorted order.
- */
-public class GridStreamerBoundedSizeSortedWindow<E>
- extends GridStreamerBoundedSizeWindowAdapter<E, GridStreamerBoundedSizeSortedWindow.Holder<E>> {
- /** Comparator. */
- private Comparator<E> comp;
-
- /** Order counter. */
- private AtomicLong orderCnt = new AtomicLong();
-
- /**
- * Gets event comparator.
- *
- * @return Event comparator.
- */
- public Comparator<E> getComparator() {
- return comp;
- }
-
- /**
- * Sets event comparator.
- *
- * @param comp Comparator.
- */
- public void setComparator(Comparator<E> comp) {
- this.comp = comp;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override protected Collection<Holder<E>> newCollection() {
- final Comparator<E> comp0 = comp;
-
- Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() {
- @Override public int compare(Holder<E> h1, Holder<E> h2) {
- if (h1 == h2)
- return 0;
-
- int diff = comp0 == null ?
- ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val);
-
- if (diff != 0)
- return diff;
- else {
- assert h1.order != h2.order;
-
- return h1.order < h2.order ? -1 : 1;
- }
- }
- });
-
- return (Collection)col;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) {
- if (comp == null) {
- if (!(evt instanceof Comparable))
- throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " +
- "comparator is specified: " + evt);
- }
-
- if (set != null) {
- if (set.add(evt)) {
- col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
- return true;
- }
-
- return false;
- }
- else {
- col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) {
- int cnt = 0;
-
- for (E evt : evts) {
- if (addInternal(evt, col, set))
- cnt++;
- }
-
- return cnt;
- }
-
- /** {@inheritDoc} */
- @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) {
- Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast();
-
- if (set != null && h != null)
- set.remove(h.val);
-
- return h == null ? null : h.val;
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col,
- final Set<E> set, final AtomicInteger size) {
- final Iterator<Holder<E>> it = col.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private Holder<E> lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet.val;
- }
-
- @Override public E removex() {
- if (lastRet == null)
- throw new IllegalStateException();
-
- if (col.remove(lastRet)) {
- if (set != null)
- set.remove(lastRet.val);
-
- size.decrementAndGet();
-
- return lastRet.val;
- }
- else
- return null;
- }
- };
- }
-
- /**
- * Value wrapper.
- */
- @SuppressWarnings("PackageVisibleInnerClass")
- static class Holder<E> {
- /** Value. */
- private E val;
-
- /** Order to distinguish between objects for which comparator returns 0. */
- private long order;
-
- /**
- * @param val Value to hold.
- * @param order Adding order.
- */
- private Holder(E val, long order) {
- this.val = val;
- this.order = order;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return val.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (this == obj)
- return false;
-
- if (!(obj instanceof Holder))
- return false;
-
- Holder h = (Holder)obj;
-
- return F.eq(val, h.val) && order == h.order;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) {
- assert col.size() == size.get();
-
- if (set != null) {
- // Check no duplicates in collection.
-
- Collection<Object> vals = new HashSet<>();
-
- for (Object evt : col)
- assert vals.add(((Holder)evt).val);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindow.java
deleted file mode 100644
index 55776e4..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindow.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Queue window bounded by number of elements in queue. After adding elements to this window called
- * must check for evicted events.
- * <p>
- * It is guaranteed that window size will never get less then maximum size when poling from this window
- * concurrently from different threads.
- */
-public class GridStreamerBoundedSizeWindow<E> extends GridStreamerBoundedSizeWindowAdapter<E, E> {
- /** {@inheritDoc} */
- @Override protected Collection<E> newCollection() {
- return new ConcurrentLinkedDeque8<>();
- }
-
- /** {@inheritDoc} */
- @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set,
- final AtomicInteger size) {
- final ConcurrentLinkedDeque8.IteratorEx<E> it =
- (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private E lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet;
- }
-
- @Override public E removex() {
- if (it.removex()) {
- if (set != null)
- set.remove(lastRet);
-
- size.decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("IfMayBeConditional")
- @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
-
- // If unique.
- if (set != null) {
- if (set.add(evt)) {
- col.add(evt);
-
- return true;
- }
-
- return false;
- }
- else {
- col.add(evt);
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
- if (set != null) {
- int cnt = 0;
-
- for (E evt : evts) {
- if (set.add(evt)) {
- col.add(evt);
-
- cnt++;
- }
- }
-
- return cnt;
- }
- else {
- col.addAll(evts);
-
- return evts.size();
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
-
- E res = ((Queue<E>)col).poll();
-
- if (set != null && res != null)
- set.remove(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) {
- assert col.size() == size.get();
-
- if (set != null) {
- // Check no duplicates in collection.
-
- Collection<Object> vals = new HashSet<>();
-
- for (Object evt : col)
- assert vals.add(evt);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindowAdapter.java
deleted file mode 100644
index 15a5c2f..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedSizeWindowAdapter.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Abstract non-public class for size-bound windows. Support reset.
- */
-abstract class GridStreamerBoundedSizeWindowAdapter<E, T> extends GridStreamerWindowAdapter<E> {
- /** Reference. */
- private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
- /** If true, only unique elements will be accepted. */
- private boolean unique;
-
- /** Window maximum size. */
- protected int maxSize;
-
- /**
- * Gets window maximum size.
- *
- * @return Maximum size.
- */
- public int getMaximumSize() {
- return maxSize;
- }
-
- /**
- * Sets window maximum size.
- *
- * @param maxSize Maximum size.
- */
- public void setMaximumSize(int maxSize) {
- this.maxSize = maxSize;
- }
-
- /**
- * @return True if only unique elements will be accepted.
- */
- public boolean isUnique() {
- return unique;
- }
-
- /**
- * @param unique If true, only unique elements will be accepted.
- */
- public void setUnique(boolean unique) {
- this.unique = unique;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (maxSize < 0)
- throw new GridException("Failed to initialize window (maximumSize cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maxSize=" + maxSize +
- ", unique=" + unique + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- int size = ref.get().size().get();
-
- return size > 0 ? size : 0;
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- int evictSize = size() - maxSize;
-
- return evictSize > 0 ? evictSize : 0;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- add(evt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evicted = pollEvictedInternal();
-
- if (evicted == null)
- return res;
-
- res.add(evicted);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- E res = pollEvictedInternal();
-
- if (res == null)
- return Collections.emptyList();
-
- return Collections.singleton(res);
- }
-
- /**
- * Poll evicted internal implementation.
- *
- * @return Evicted element.
- */
- @Nullable private E pollEvictedInternal() {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxSize) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- E evt = pollInternal(tup.collection(), tup.set());
-
- if (evt != null)
- return evt;
- else {
- // No actual events in queue, it means that other thread is just adding event.
- // return null as it is a concurrent add call.
- size.incrementAndGet();
-
- return null;
- }
- }
- }
- else
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
- Collection<T> evts = tup.collection();
-
- Collection<E> resCol = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > 0) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- E res = pollInternal(evts, tup.set());
-
- if (res != null) {
- resCol.add(res);
-
- if (resCol.size() >= cnt)
- return resCol;
- }
- else {
- size.incrementAndGet();
-
- return resCol;
- }
- }
- }
- else
- return resCol;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- WindowHolder win = ref.get();
-
- return iteratorInternal(win.collection(), win.set(), win.size());
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ref.set(new WindowHolder(newCollection(),
- unique ? new GridConcurrentHashSet<E>() : null,
- new AtomicInteger()));
- }
-
- /**
- * @param evt Event to add.
- */
- private void add(E evt) {
- WindowHolder tup = ref.get();
-
- if (addInternal(evt, tup.collection(), tup.set()))
- tup.size().incrementAndGet();
- }
-
- /**
- * @param evts Events to add.
- */
- private void addAll(Collection<E> evts) {
- WindowHolder tup = ref.get();
-
- int cnt = addAllInternal(evts, tup.collection(), tup.set());
-
- tup.size().addAndGet(cnt);
- }
-
- /**
- * Checks window consistency. Used for testing.
- */
- void consistencyCheck() {
- WindowHolder win = ref.get();
-
- consistencyCheck(win.collection(), win.set(), win.size());
- }
-
- /**
- * Get underlying collection.
- *
- * @return Collection.
- */
- @SuppressWarnings("ConstantConditions")
- protected Collection<T> collection() {
- return ref.get().get1();
- }
-
- /**
- * Creates new collection specific for window implementation. This collection will be subsequently passed
- * to addInternal(...) and pollInternal() methods.
- *
- * @return Collection - holder.
- */
- protected abstract Collection<T> newCollection();
-
- /**
- * Adds event to queue implementation.
- *
- * @param evt Event to add.
- * @param col Collection to add to.
- * @param set Set to check.
- * @return {@code True} if event was added.
- */
- protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set);
-
- /**
- * Adds all events to queue implementation.
- *
- * @param evts Events to add.
- * @param col Collection to add to.
- * @param set Set to check.
- * @return Added events number.
- */
- protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set);
-
- /**
- * @param col Collection to add to.
- * @param set Set to check.
- * @return Polled object.
- */
- @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set);
-
- /**
- * Creates iterator based on implementation collection type.
- *
- * @param col Collection.
- * @param set Set to check.
- * @param size Size.
- * @return Iterator.
- */
- protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set,
- AtomicInteger size);
-
- /**
- * Checks consistency. Used in tests.
- *
- * @param col Collection.
- * @param set Set if unique.
- * @param size Size holder.
- */
- protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size);
-
- /**
- * Window holder.
- */
- @SuppressWarnings("ConstantConditions")
- private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public WindowHolder() {
- // No-op.
- }
-
- /**
- * @param col Collection.
- * @param set Set if unique.
- * @param size Window size counter.
- */
- WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) {
- super(col, set, size);
- }
-
- /**
- * @return Collection.
- */
- public Collection<T> collection() {
- return get1();
- }
-
- /**
- * @return Set.
- */
- public Set<E> set() {
- return get2();
- }
-
- /**
- * @return Size.
- */
- public AtomicInteger size() {
- return get3();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/5fe8de26/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedTimeBatchWindow.java
deleted file mode 100644
index ed52bbb..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/GridStreamerBoundedTimeBatchWindow.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that accumulates events in batches, and is bounded by time and maximum number of batches.
- */
-public class GridStreamerBoundedTimeBatchWindow<E> extends GridStreamerWindowAdapter<E> {
- /** Batch size. */
- private int batchSize;
-
- /** Maximum batches. */
- private int maxBatches;
-
- /** */
- private long batchTimeInterval;
-
- /** Atomic reference for queue and size. */
- private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
- /** Enqueue lock. */
- private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
- /**
- * Gets maximum number of batches can be stored in window.
- *
- * @return Maximum number of batches for window.
- */
- public int getMaximumBatches() {
- return maxBatches;
- }
-
- /**
- * Sets maximum number of batches can be stored in window.
- *
- * @param maxBatches Maximum number of batches for window.
- */
- public void setMaximumBatches(int maxBatches) {
- this.maxBatches = maxBatches;
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Sets batch size.
- *
- * @param batchSize Batch size.
- */
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- /**
- * Gets batch time interval.
- *
- * @return Batch time interval.
- */
- public long getBatchTimeInterval() {
- return batchTimeInterval;
- }
-
- /**
- * Sets batch time interval.
- *
- * @param batchTimeInterval Batch time interval.
- */
- public void setBatchTimeInterval(long batchTimeInterval) {
- this.batchTimeInterval = batchTimeInterval;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (maxBatches < 0)
- throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
-
- if (batchSize < 0)
- throw new GridException("Failed to initialize window (batchSize cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
- else if (batchSize == 0)
- batchSize = Integer.MAX_VALUE;
-
- if (batchTimeInterval <= 0)
- throw new GridException("Failed to initialize window (batchTimeInterval must be positive) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
- Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval);
-
- ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
- b.node(n);
-
- ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger()));
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return ref.get().totalQueueSize().get();
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final WindowHolder win = ref.get();
-
- final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
- return new GridStreamerWindowIterator<E>() {
- /** Current batch iterator. */
- private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
- /** Next batch iterator. Will be null if no more batches available. */
- private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
- /** Last returned value. */
- private E lastRet;
-
- {
- curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean hasNext() {
- if (curBatchIt != null) {
- if (curBatchIt.hasNext())
- return true;
-
- return nextBatchIt != null && nextBatchIt.hasNext();
- }
- else
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (!curBatchIt.hasNext()) {
- if (nextBatchIt != null) {
- curBatchIt = nextBatchIt;
-
- nextBatchIt = null;
-
- lastRet = curBatchIt.next();
- }
- else
- throw new NoSuchElementException();
- }
- else {
- E next = curBatchIt.next();
-
- // Moved to last element in batch - check for next iterator.
- if (!curBatchIt.hasNext())
- advanceBatch();
-
- lastRet = next;
- }
-
- return lastRet;
- }
-
- /** {@inheritDoc} */
- @Override public E removex() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (curBatchIt.removex()) {
- // Decrement global size if deleted.
- win.totalQueueSize().decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
-
- /**
- * Moves to the next batch.
- */
- private void advanceBatch() {
- if (batchIt.hasNext()) {
- Batch batch = batchIt.next();
-
- nextBatchIt = batch.iterator();
- }
- else
- nextBatchIt = null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- WindowHolder win = ref.get();
-
- int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0;
-
- long now = U.currentTimeMillis();
-
- Iterator<Batch> it = win.batchQueue().iterator();
-
- int size = 0;
-
- int idx = 0;
-
- while (it.hasNext()) {
- Batch batch = it.next();
-
- if (idx++ < oversizeCnt || batch.batchEndTs < now)
- size += batch.size();
- }
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- try {
- return enqueue0(evt, U.currentTimeMillis());
- }
- catch (GridInterruptedException ignored) {
- return false;
- }
- }
-
- /**
- * Enqueue event to window.
- *
- * @param evt Event to add.
- * @param ts Event timestamp.
- * @return {@code True} if event was added.
- *
- * @throws GridInterruptedException If thread was interrupted.
- */
- private boolean enqueue0(E evt, long ts) throws GridInterruptedException {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- Batch last = evts.peekLast();
-
- if (last == null || !last.add(evt, ts)) {
- // This call will ensure that last object is actually added to batch
- // before we add new batch to events queue.
- // If exception is thrown here, window will be left in consistent state.
- if (last != null)
- last.finish();
-
- // Add new batch to queue in write lock.
- if (enqueueLock.writeLock().tryLock()) {
- try {
- Batch first0 = evts.peekLast();
-
- if (first0 == last) {
- Batch batch = new Batch(batchSize, ts + batchTimeInterval);
-
- ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
- batch.node(node);
-
- size.incrementAndGet();
-
- // If batch was removed in other thread.
- if (batch.removed() && evts.unlinkx(node))
- size.decrementAndGet();
- }
- }
- finally {
- enqueueLock.writeLock().unlock();
- }
- }
- else {
- // Acquire read lock to wait for batch enqueue.
- enqueueLock.readLock().lock();
-
- try {
- evts.peekLast();
- }
- finally {
- enqueueLock.readLock().unlock();
- }
- }
- }
- else {
- // Event was added, global size increment.
- tup.totalQueueSize().incrementAndGet();
-
- return true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) {
- assert first.finished();
-
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- if (!polled.isEmpty())
- res.addAll(polled);
-
- if (first.isEmpty()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
- }
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Removed entries, update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- int curSize = size.get();
-
- if (maxBatches > 0 && curSize > maxBatches) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Batch polled = evts.poll();
-
- if (polled != null) {
- assert polled.finished();
-
- // Mark batch removed for consistency.
- polled.markRemoved();
-
- Collection<E> polled0 = polled.shrink();
-
- // Result of shrink is empty, must retry the poll.
- if (!polled0.isEmpty()) {
- // Update global size.
- tup.totalQueueSize().addAndGet(-polled0.size());
-
- return polled0;
- }
- }
- else {
- // Polled was zero, so we must restore counter and return.
- size.incrementAndGet();
-
- return Collections.emptyList();
- }
- }
- }
- else {
- while (true) {
- Batch batch = evts.peekFirst();
-
- // This call will finish batch and return true if batch is expired.
- if (batch != null && batch.checkExpired()) {
- assert batch.finished();
-
- ConcurrentLinkedDeque8.Node<Batch> node = batch.node();
-
- batch.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- Collection<E> col = batch.shrink();
-
- tup.totalQueueSize().addAndGet(-col.size());
-
- if (!col.isEmpty())
- return col;
- }
- else
- return Collections.emptyList();
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- // We must check for finished before unlink as no elements
- // can be added to batch after it is finished.
- if (first.isEmpty() && first.emptyFinished()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- assert first.isEmpty();
- }
- else if (polled.isEmpty())
- break;
-
- res.addAll(polled);
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /**
- * Consistency check, used for testing.
- */
- void consistencyCheck() {
- WindowHolder win = ref.get();
-
- Iterator<E> it = iterator();
-
- int cnt = 0;
-
- while (it.hasNext()) {
- it.next();
-
- cnt++;
- }
-
- int cnt0 = 0;
-
- for (Batch batch : win.batchQueue())
- cnt0 += batch.size();
-
- int sz = size();
-
- assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
- assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
- assert win.batchQueue().size() == win.batchQueueSize().get();
- }
-
- /**
- * Window structure.
- */
- @SuppressWarnings("ConstantConditions")
- private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public WindowHolder() {
- // No-op.
- }
-
- /**
- * @param batchQueue Batch queue.
- * @param batchQueueSize Batch queue size counter.
- * @param globalSize Global size counter.
- */
- private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
- AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
- super(batchQueue, batchQueueSize, globalSize);
-
- assert batchQueue.size() == 1;
- assert batchQueueSize.get() == 1;
- }
-
- /**
- * @return Events queue.
- */
- public ConcurrentLinkedDeque8<Batch> batchQueue() {
- return get1();
- }
-
- /**
- * @return Batch queue size.
- */
- public AtomicInteger batchQueueSize() {
- return get2();
- }
-
- /**
- * @return Global queue size.
- */
- public AtomicInteger totalQueueSize() {
- return get3();
- }
- }
-
- /**
- * Batch.
- */
- private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Batch events. */
- private ConcurrentLinkedDeque8<E> evts;
-
- /** Capacity. */
- private AtomicInteger cap;
-
- /** Batch end timestamp. */
- private final long batchEndTs;
-
- /** Finished flag. */
- private boolean finished;
-
- /** Queue node. */
- @GridToStringExclude
- private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
- /** Removed flag. */
- private volatile boolean rmvd;
-
- /**
- * @param batchSize Batch size.
- * @param batchEndTs Batch end timestamp.
- */
- private Batch(int batchSize, long batchEndTs) {
- cap = new AtomicInteger(batchSize);
- this.batchEndTs = batchEndTs;
-
- evts = new ConcurrentLinkedDeque8<>();
- }
-
- /**
- * @return {@code True} if removed.
- */
- public boolean removed() {
- return rmvd;
- }
-
- /**
- * Marks batch as removed.
- */
- public void markRemoved() {
- rmvd = true;
- }
-
- /**
- * Adds event to batch.
- *
- * @param evt Event to add.
- * @param ts Event timestamp.
- * @return {@code True} if event was added, {@code false} if batch is full.
- */
- public boolean add(E evt, long ts) {
- if (ts <= batchEndTs) {
- readLock().lock();
-
- try {
- if (finished)
- // Finished was set inside write lock.
- return false;
-
- while (true) {
- int size = cap.get();
-
- if (size > 0) {
- if (cap.compareAndSet(size, size - 1)) {
- evts.add(evt);
-
- // Will go through write lock and finish batch.
- if (size == 1)
- finished = true;
-
- return true;
- }
- }
- else
- return false;
- }
- }
- finally {
- readLock().unlock();
- }
- }
- else {
- writeLock().lock();
-
- try {
- // No events could be added to this batch.
- finished = true;
-
- return false;
- }
- finally {
- writeLock().unlock();
- }
- }
- }
-
- /**
- * @return Queue node.
- */
- public ConcurrentLinkedDeque8.Node<Batch> node() {
- return qNode;
- }
-
- /**
- * @param qNode Queue node.
- */
- public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
- this.qNode = qNode;
- }
-
- /**
- * Waits for latch count down after last event was added.
- *
- * @throws GridInterruptedException If wait was interrupted.
- */
- public void finish() throws GridInterruptedException {
- writeLock().lock();
-
- try {
- // Safety.
- assert cap.get() == 0 || finished;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is finished and no more events will be added to it.
- */
- public boolean finished() {
- readLock().lock();
-
- try {
- return finished;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int size() {
- readLock().lock();
-
- try {
- return evts == null ? 0 : evts.sizex();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is empty.
- */
- public boolean isEmpty() {
- readLock().lock();
-
- try {
- return evts == null || evts.isEmpty();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
- * be added to batch and it can be safely unlinked from the queue.
- *
- * @return {@code True} if batch is empty and finished.
- */
- public boolean emptyFinished() {
- writeLock().lock();
-
- try {
- return finished && (evts == null || evts.isEmpty());
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * Checks if the batch has expired.
- *
- * @return {@code True} if the batch has expired, {@code false} otherwise.
- */
- public boolean checkExpired() {
- if (U.currentTimeMillis() > batchEndTs) {
- writeLock().lock();
-
- try {
- finished = true;
-
- return true;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
- readLock().lock();
-
- try {
- if (evts != null)
- return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
- return new ConcurrentLinkedDeque8.IteratorEx<E>() {
- @Override public boolean removex() {
- throw new NoSuchElementException();
- }
-
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public E next() {
- throw new NoSuchElementException();
- }
-
- @Override public void remove() {
- throw new NoSuchElementException();
- }
- };
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Polls up to {@code cnt} objects from batch in concurrent fashion.
- *
- * @param cnt Number of objects to poll.
- * @return Collection of polled elements (empty collection in case no events were
- * present).
- */
- public Collection<E> pollNonBatch(int cnt) {
- readLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evt = evts.poll();
-
- if (evt != null)
- res.add(evt);
- else
- return res;
- }
-
- return res;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Shrinks this batch. No events can be polled from it after this method.
- *
- * @return Collection of events contained in batch before shrink (empty collection in
- * case no events were present).
- */
- public Collection<E> shrink() {
- writeLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- // Since iterator can concurrently delete elements, we must poll here.
- Collection<E> res = new ArrayList<>(evts.sizex());
-
- E o;
-
- while ((o = evts.poll()) != null)
- res.add(o);
-
- // Nothing cal be polled after shrink.
- evts = null;
-
- return res;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- ConcurrentLinkedDeque8<E> evts0 = evts;
-
- return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
- }
- }
-}