You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:24 UTC
[23/32] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeBatchWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeBatchWindow.java
deleted file mode 100644
index 09e8352..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeBatchWindow.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that accumulates events in batches, and is bounded by time and maximum number of batches.
- */
-public class StreamerBoundedTimeBatchWindow<E> extends StreamerWindowAdapter<E> {
- /** Batch size. */
- private int batchSize;
-
- /** Maximum batches. */
- private int maxBatches;
-
- /** */
- private long batchTimeInterval;
-
- /** Atomic reference for queue and size. */
- private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
- /** Enqueue lock. */
- private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
- /**
- * Gets maximum number of batches can be stored in window.
- *
- * @return Maximum number of batches for window.
- */
- public int getMaximumBatches() {
- return maxBatches;
- }
-
- /**
- * Sets maximum number of batches can be stored in window.
- *
- * @param maxBatches Maximum number of batches for window.
- */
- public void setMaximumBatches(int maxBatches) {
- this.maxBatches = maxBatches;
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Sets batch size.
- *
- * @param batchSize Batch size.
- */
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- /**
- * Gets batch time interval.
- *
- * @return Batch time interval.
- */
- public long getBatchTimeInterval() {
- return batchTimeInterval;
- }
-
- /**
- * Sets batch time interval.
- *
- * @param batchTimeInterval Batch time interval.
- */
- public void setBatchTimeInterval(long batchTimeInterval) {
- this.batchTimeInterval = batchTimeInterval;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (maxBatches < 0)
- throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
-
- if (batchSize < 0)
- throw new GridException("Failed to initialize window (batchSize cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
- else if (batchSize == 0)
- batchSize = Integer.MAX_VALUE;
-
- if (batchTimeInterval <= 0)
- throw new GridException("Failed to initialize window (batchTimeInterval must be positive) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize +
- ", batchTimeInterval=" + batchTimeInterval + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
- Batch b = new Batch(batchSize, U.currentTimeMillis() + batchTimeInterval);
-
- ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
- b.node(n);
-
- ref.set(new WindowHolder(first, new AtomicInteger(1), new AtomicInteger()));
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return ref.get().totalQueueSize().get();
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final WindowHolder win = ref.get();
-
- final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
- return new GridStreamerWindowIterator<E>() {
- /** Current batch iterator. */
- private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
- /** Next batch iterator. Will be null if no more batches available. */
- private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
- /** Last returned value. */
- private E lastRet;
-
- {
- curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean hasNext() {
- if (curBatchIt != null) {
- if (curBatchIt.hasNext())
- return true;
-
- return nextBatchIt != null && nextBatchIt.hasNext();
- }
- else
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (!curBatchIt.hasNext()) {
- if (nextBatchIt != null) {
- curBatchIt = nextBatchIt;
-
- nextBatchIt = null;
-
- lastRet = curBatchIt.next();
- }
- else
- throw new NoSuchElementException();
- }
- else {
- E next = curBatchIt.next();
-
- // Moved to last element in batch - check for next iterator.
- if (!curBatchIt.hasNext())
- advanceBatch();
-
- lastRet = next;
- }
-
- return lastRet;
- }
-
- /** {@inheritDoc} */
- @Override public E removex() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (curBatchIt.removex()) {
- // Decrement global size if deleted.
- win.totalQueueSize().decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
-
- /**
- * Moves to the next batch.
- */
- private void advanceBatch() {
- if (batchIt.hasNext()) {
- Batch batch = batchIt.next();
-
- nextBatchIt = batch.iterator();
- }
- else
- nextBatchIt = null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- WindowHolder win = ref.get();
-
- int oversizeCnt = maxBatches > 0 ? Math.max(0, win.batchQueueSize().get() - maxBatches) : 0;
-
- long now = U.currentTimeMillis();
-
- Iterator<Batch> it = win.batchQueue().iterator();
-
- int size = 0;
-
- int idx = 0;
-
- while (it.hasNext()) {
- Batch batch = it.next();
-
- if (idx++ < oversizeCnt || batch.batchEndTs < now)
- size += batch.size();
- }
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- try {
- return enqueue0(evt, U.currentTimeMillis());
- }
- catch (GridInterruptedException ignored) {
- return false;
- }
- }
-
- /**
- * Enqueue event to window.
- *
- * @param evt Event to add.
- * @param ts Event timestamp.
- * @return {@code True} if event was added.
- *
- * @throws GridInterruptedException If thread was interrupted.
- */
- private boolean enqueue0(E evt, long ts) throws GridInterruptedException {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- Batch last = evts.peekLast();
-
- if (last == null || !last.add(evt, ts)) {
- // This call will ensure that last object is actually added to batch
- // before we add new batch to events queue.
- // If exception is thrown here, window will be left in consistent state.
- if (last != null)
- last.finish();
-
- // Add new batch to queue in write lock.
- if (enqueueLock.writeLock().tryLock()) {
- try {
- Batch first0 = evts.peekLast();
-
- if (first0 == last) {
- Batch batch = new Batch(batchSize, ts + batchTimeInterval);
-
- ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
- batch.node(node);
-
- size.incrementAndGet();
-
- // If batch was removed in other thread.
- if (batch.removed() && evts.unlinkx(node))
- size.decrementAndGet();
- }
- }
- finally {
- enqueueLock.writeLock().unlock();
- }
- }
- else {
- // Acquire read lock to wait for batch enqueue.
- enqueueLock.readLock().lock();
-
- try {
- evts.peekLast();
- }
- finally {
- enqueueLock.readLock().unlock();
- }
- }
- }
- else {
- // Event was added, global size increment.
- tup.totalQueueSize().incrementAndGet();
-
- return true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null && ((maxBatches > 0 && curSize > maxBatches) || first.checkExpired())) {
- assert first.finished();
-
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- if (!polled.isEmpty())
- res.addAll(polled);
-
- if (first.isEmpty()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
- }
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Removed entries, update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- int curSize = size.get();
-
- if (maxBatches > 0 && curSize > maxBatches) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Batch polled = evts.poll();
-
- if (polled != null) {
- assert polled.finished();
-
- // Mark batch removed for consistency.
- polled.markRemoved();
-
- Collection<E> polled0 = polled.shrink();
-
- // Result of shrink is empty, must retry the poll.
- if (!polled0.isEmpty()) {
- // Update global size.
- tup.totalQueueSize().addAndGet(-polled0.size());
-
- return polled0;
- }
- }
- else {
- // Polled was zero, so we must restore counter and return.
- size.incrementAndGet();
-
- return Collections.emptyList();
- }
- }
- }
- else {
- while (true) {
- Batch batch = evts.peekFirst();
-
- // This call will finish batch and return true if batch is expired.
- if (batch != null && batch.checkExpired()) {
- assert batch.finished();
-
- ConcurrentLinkedDeque8.Node<Batch> node = batch.node();
-
- batch.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- Collection<E> col = batch.shrink();
-
- tup.totalQueueSize().addAndGet(-col.size());
-
- if (!col.isEmpty())
- return col;
- }
- else
- return Collections.emptyList();
- }
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- WindowHolder tup = ref.get();
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- // We must check for finished before unlink as no elements
- // can be added to batch after it is finished.
- if (first.isEmpty() && first.emptyFinished()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- assert first.isEmpty();
- }
- else if (polled.isEmpty())
- break;
-
- res.addAll(polled);
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /**
- * Consistency check, used for testing.
- */
- void consistencyCheck() {
- WindowHolder win = ref.get();
-
- Iterator<E> it = iterator();
-
- int cnt = 0;
-
- while (it.hasNext()) {
- it.next();
-
- cnt++;
- }
-
- int cnt0 = 0;
-
- for (Batch batch : win.batchQueue())
- cnt0 += batch.size();
-
- int sz = size();
-
- assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
- assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
- assert win.batchQueue().size() == win.batchQueueSize().get();
- }
-
- /**
- * Window structure.
- */
- @SuppressWarnings("ConstantConditions")
- private class WindowHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public WindowHolder() {
- // No-op.
- }
-
- /**
- * @param batchQueue Batch queue.
- * @param batchQueueSize Batch queue size counter.
- * @param globalSize Global size counter.
- */
- private WindowHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
- AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
- super(batchQueue, batchQueueSize, globalSize);
-
- assert batchQueue.size() == 1;
- assert batchQueueSize.get() == 1;
- }
-
- /**
- * @return Events queue.
- */
- public ConcurrentLinkedDeque8<Batch> batchQueue() {
- return get1();
- }
-
- /**
- * @return Batch queue size.
- */
- public AtomicInteger batchQueueSize() {
- return get2();
- }
-
- /**
- * @return Global queue size.
- */
- public AtomicInteger totalQueueSize() {
- return get3();
- }
- }
-
- /**
- * Batch.
- */
- private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Batch events. */
- private ConcurrentLinkedDeque8<E> evts;
-
- /** Capacity. */
- private AtomicInteger cap;
-
- /** Batch end timestamp. */
- private final long batchEndTs;
-
- /** Finished flag. */
- private boolean finished;
-
- /** Queue node. */
- @GridToStringExclude
- private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
- /** Removed flag. */
- private volatile boolean rmvd;
-
- /**
- * @param batchSize Batch size.
- * @param batchEndTs Batch end timestamp.
- */
- private Batch(int batchSize, long batchEndTs) {
- cap = new AtomicInteger(batchSize);
- this.batchEndTs = batchEndTs;
-
- evts = new ConcurrentLinkedDeque8<>();
- }
-
- /**
- * @return {@code True} if removed.
- */
- public boolean removed() {
- return rmvd;
- }
-
- /**
- * Marks batch as removed.
- */
- public void markRemoved() {
- rmvd = true;
- }
-
- /**
- * Adds event to batch.
- *
- * @param evt Event to add.
- * @param ts Event timestamp.
- * @return {@code True} if event was added, {@code false} if batch is full.
- */
- public boolean add(E evt, long ts) {
- if (ts <= batchEndTs) {
- readLock().lock();
-
- try {
- if (finished)
- // Finished was set inside write lock.
- return false;
-
- while (true) {
- int size = cap.get();
-
- if (size > 0) {
- if (cap.compareAndSet(size, size - 1)) {
- evts.add(evt);
-
- // Will go through write lock and finish batch.
- if (size == 1)
- finished = true;
-
- return true;
- }
- }
- else
- return false;
- }
- }
- finally {
- readLock().unlock();
- }
- }
- else {
- writeLock().lock();
-
- try {
- // No events could be added to this batch.
- finished = true;
-
- return false;
- }
- finally {
- writeLock().unlock();
- }
- }
- }
-
- /**
- * @return Queue node.
- */
- public ConcurrentLinkedDeque8.Node<Batch> node() {
- return qNode;
- }
-
- /**
- * @param qNode Queue node.
- */
- public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
- this.qNode = qNode;
- }
-
- /**
- * Waits for latch count down after last event was added.
- *
- * @throws GridInterruptedException If wait was interrupted.
- */
- public void finish() throws GridInterruptedException {
- writeLock().lock();
-
- try {
- // Safety.
- assert cap.get() == 0 || finished;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is finished and no more events will be added to it.
- */
- public boolean finished() {
- readLock().lock();
-
- try {
- return finished;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int size() {
- readLock().lock();
-
- try {
- return evts == null ? 0 : evts.sizex();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is empty.
- */
- public boolean isEmpty() {
- readLock().lock();
-
- try {
- return evts == null || evts.isEmpty();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
- * be added to batch and it can be safely unlinked from the queue.
- *
- * @return {@code True} if batch is empty and finished.
- */
- public boolean emptyFinished() {
- writeLock().lock();
-
- try {
- return finished && (evts == null || evts.isEmpty());
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * Checks if the batch has expired.
- *
- * @return {@code True} if the batch has expired, {@code false} otherwise.
- */
- public boolean checkExpired() {
- if (U.currentTimeMillis() > batchEndTs) {
- writeLock().lock();
-
- try {
- finished = true;
-
- return true;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
- readLock().lock();
-
- try {
- if (evts != null)
- return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
- return new ConcurrentLinkedDeque8.IteratorEx<E>() {
- @Override public boolean removex() {
- throw new NoSuchElementException();
- }
-
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public E next() {
- throw new NoSuchElementException();
- }
-
- @Override public void remove() {
- throw new NoSuchElementException();
- }
- };
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Polls up to {@code cnt} objects from batch in concurrent fashion.
- *
- * @param cnt Number of objects to poll.
- * @return Collection of polled elements (empty collection in case no events were
- * present).
- */
- public Collection<E> pollNonBatch(int cnt) {
- readLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evt = evts.poll();
-
- if (evt != null)
- res.add(evt);
- else
- return res;
- }
-
- return res;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Shrinks this batch. No events can be polled from it after this method.
- *
- * @return Collection of events contained in batch before shrink (empty collection in
- * case no events were present).
- */
- public Collection<E> shrink() {
- writeLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- // Since iterator can concurrently delete elements, we must poll here.
- Collection<E> res = new ArrayList<>(evts.sizex());
-
- E o;
-
- while ((o = evts.poll()) != null)
- res.add(o);
-
- // Nothing cal be polled after shrink.
- evts = null;
-
- return res;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- ConcurrentLinkedDeque8<E> evts0 = evts;
-
- return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeWindow.java
deleted file mode 100644
index a3ff42c..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedTimeWindow.java
+++ /dev/null
@@ -1,454 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Window which is bounded by size and time interval.
- */
-public class StreamerBoundedTimeWindow<E> extends StreamerWindowAdapter<E> {
- /** Window structures holder. */
- private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
- /** Time interval. */
- private long timeInterval;
-
- /** Window maximum size. */
- private int maxSize;
-
- /** Unique flag. */
- private boolean unique;
-
- /** Event order counter. */
- private AtomicLong orderCnt = new AtomicLong();
-
- /**
- * Gets window maximum size.
- *
- * @return Maximum size.
- */
- public int getMaximumSize() {
- return maxSize;
- }
-
- /**
- * Sets window maximum size.
- *
- * @param maxSize Max size.
- */
- public void setMaximumSize(int maxSize) {
- this.maxSize = maxSize;
- }
-
- /**
- * Gets window time interval.
- *
- * @return Time interval.
- */
- public long getTimeInterval() {
- return timeInterval;
- }
-
- /**
- * Sets window time interval.
- *
- * @param timeInterval Time interval.
- */
- public void setTimeInterval(long timeInterval) {
- this.timeInterval = timeInterval;
- }
-
- /**
- * Gets window unique flag.
- *
- * @return {@code True} if only unique events should be added to window.
- */
- public boolean isUnique() {
- return unique;
- }
-
- /**
- * Sets window unique flag.
- *
- * @param unique {@code True} if only unique events should be added to window.
- */
- public void setUnique(boolean unique) {
- this.unique = unique;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (timeInterval <= 0)
- throw new GridException("Failed to initialize window (timeInterval must be positive): [windowClass=" +
- getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
- unique + ']');
-
- if (maxSize < 0)
- throw new GridException("Failed to initialize window (maximumSize cannot be negative): [windowClass=" +
- getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
- unique + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("RedundantCast")
- @Override protected void reset0() {
- ref.set(new WindowHolder(newQueue(), unique ? (Set<Object>)new GridConcurrentHashSet<>() : null,
- new AtomicInteger()));
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return ref.get().size().get();
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- // Get estimate for eviction queue size.
- WindowHolder tup = ref.get();
-
- GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
- boolean sizeCheck = maxSize != 0;
-
- int overflow = tup.size().get() - maxSize;
-
- long timeBound = U.currentTimeMillis() - timeInterval;
-
- int idx = 0;
- int cnt = 0;
-
- for (Holder holder : evtsQueue) {
- if ((idx < overflow && sizeCheck) || holder.ts < timeBound)
- cnt++;
- else if ((idx >= overflow && sizeCheck) && holder.ts >= timeBound)
- break;
- else if (!sizeCheck && holder.ts >= timeBound)
- break;
-
- idx++;
- }
-
- return cnt;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- add(evt, U.currentTimeMillis());
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evicted = pollEvictedInternal();
-
- if (evicted == null)
- return res;
-
- res.add(evicted);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- E res = pollEvictedInternal();
-
- if (res == null)
- return Collections.emptyList();
-
- return Collections.singleton(res);
- }
-
- /** {@inheritDoc} */
- @Nullable private <T> T pollEvictedInternal() {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
-
- GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
- long now = U.currentTimeMillis();
-
- while (true) {
- int curSize = size.get();
-
- if (maxSize > 0 && curSize > maxSize) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Holder hldr = evtsQueue.pollFirst();
-
- if (hldr != null) {
- if (unique)
- tup.set().remove(hldr.val);
-
- return (T)hldr.val;
- }
- else {
- // No actual events in queue, it means that other thread is just adding event.
- // return null as it is a concurrent add call.
- size.incrementAndGet();
-
- return null;
- }
- }
- }
- else {
- // Check if first entry qualifies for eviction.
- Holder first = evtsQueue.firstx();
-
- if (first != null && first.ts < now - timeInterval) {
- if (evtsQueue.remove(first)) {
- if (unique)
- tup.set().remove(first.val);
-
- size.decrementAndGet();
-
- return (T)first.val;
- }
- }
- else
- return null;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
- GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
-
- Collection<E> resCol = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > 0) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Holder<E> h = evtsQueue.pollLast();
-
- if (h != null) {
- resCol.add(h.val);
-
- if (unique)
- tup.set().remove(h.val);
-
- if (resCol.size() >= cnt)
- return resCol;
- }
- else {
- size.incrementAndGet();
-
- return resCol;
- }
- }
- }
- else
- return resCol;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final WindowHolder win = ref.get();
-
- final GridConcurrentSkipListSet<Holder<E>> col = win.collection();
- final Set<Object> set = win.set();
-
- final Iterator<Holder<E>> it = col.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private Holder<E> lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet.val;
- }
-
- @Override public E removex() {
- if (lastRet == null)
- throw new IllegalStateException();
-
- if (col.remove(lastRet)) {
- if (set != null)
- set.remove(lastRet.val);
-
- win.size().decrementAndGet();
-
- return lastRet.val;
- }
- else
- return null;
- }
- };
- }
-
- /**
- * Checks queue consistency. Used in tests.
- */
- void consistencyCheck() {
- WindowHolder win = ref.get();
-
- assert win.collection().size() == win.size().get();
-
- if (win.set() != null) {
- // Check no duplicates in collection.
-
- Collection<Object> vals = new HashSet<>();
-
- for (Object evt : win.collection())
- assert vals.add(((Holder)evt).val);
- }
- }
-
- /**
- * @return New queue.
- */
- private GridConcurrentSkipListSet<Holder<E>> newQueue() {
- return new GridConcurrentSkipListSet<>(new Comparator<Holder>() {
- @Override public int compare(Holder h1, Holder h2) {
- if (h1 == h2)
- return 0;
-
- if (h1.ts != h2.ts)
- return h1.ts < h2.ts ? -1 : 1;
-
- return h1.order < h2.order ? -1 : 1;
- }
- });
- }
-
- /**
- * @param evt Event to add.
- * @param ts Event timestamp.
- */
- private void add(E evt, long ts) {
- WindowHolder tup = ref.get();
-
- if (!unique) {
- tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
-
- tup.size().incrementAndGet();
- }
- else {
- if (tup.set().add(evt)) {
- tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
-
- tup.size().incrementAndGet();
- }
- }
- }
-
- /**
- * @param evts Events to add.
- * @param ts Timestamp for added events.
- */
- private void addAll(Iterable<E> evts, long ts) {
- for (E evt : evts)
- add(evt, ts);
- }
-
- /**
- * Holder.
- */
- private static class Holder<E> {
- /** Value. */
- private E val;
-
- /** Event timestamp. */
- private long ts;
-
- /** Event order. */
- private long order;
-
- /**
- * @param val Event.
- * @param ts Timestamp.
- * @param order Order.
- */
- private Holder(E val, long ts, long order) {
- this.val = val;
- this.ts = ts;
- this.order = order;
- }
- }
-
- /**
- * Window holder.
- */
- @SuppressWarnings("ConstantConditions")
- private class WindowHolder extends GridTuple3<GridConcurrentSkipListSet<Holder<E>>, Set<Object>, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public WindowHolder() {
- // No-op.
- }
-
- /**
- * @param col Collection.
- * @param set Set if unique.
- * @param size Size.
- */
- private WindowHolder(@Nullable GridConcurrentSkipListSet<Holder<E>> col,
- @Nullable Set<Object> set, @Nullable AtomicInteger size) {
- super(col, set, size);
- }
-
- /**
- * @return Holders collection.
- */
- public GridConcurrentSkipListSet<Holder<E>> collection() {
- return get1();
- }
-
- /**
- * @return Uniqueness set.
- */
- public Set<Object> set() {
- return get2();
- }
-
- /**
- * @return Size counter.
- */
- public AtomicInteger size() {
- return get3();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerUnboundedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerUnboundedWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerUnboundedWindow.java
deleted file mode 100644
index 63434d5..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerUnboundedWindow.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.jdk8.backport.*;
-
-import java.util.*;
-
-/**
- * Unbounded window which holds all events. Events can be evicted manually from window
- * via any of the {@code dequeue(...)} methods.
- */
-public class StreamerUnboundedWindow<E> extends StreamerWindowAdapter<E> {
- /** Events. */
- private ConcurrentLinkedDeque8<E> evts = new ConcurrentLinkedDeque8<>();
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- evts.clear();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return evts.sizex();
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final ConcurrentLinkedDeque8.IteratorEx<E> it = (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private E lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet;
- }
-
- @Override public E removex() {
- return (it.removex()) ? lastRet : null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean enqueue0(E evt) {
- return evts.add(evt);
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evicted = evts.pollLast();
-
- if (evicted != null)
- res.add(evicted);
- else
- break;
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- return Collections.emptyList();
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- return Collections.emptyList();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
deleted file mode 100644
index b8b8f26..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerWindowAdapter.java
+++ /dev/null
@@ -1,529 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.apache.ignite.lang.*;
-import org.apache.ignite.lifecycle.*;
-import org.apache.ignite.streamer.*;
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.util.typedef.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Streamer window adapter.
- */
-public abstract class StreamerWindowAdapter<E> implements LifecycleAware, StreamerWindow<E>,
- StreamerWindowMBean {
- /** Default window name. */
- private String name = getClass().getSimpleName();
-
- /** Filter predicate. */
- private IgnitePredicate<Object> filter;
-
- /** Indexes. */
- private Map<String, StreamerIndexProvider<E, ?, ?>> idxsAsMap;
-
- /** */
- private StreamerIndexProvider<E, ?, ?>[] idxs;
-
- /** Lock for updates and snapshot. */
- private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
-
- /** {@inheritDoc} */
- @Override public String getClassName() {
- return U.compact(getClass().getName());
- }
-
- /** {@inheritDoc} */
- @Override public int getSize() {
- return size();
- }
-
- /** {@inheritDoc} */
- @Override public int getEvictionQueueSize() {
- return evictionQueueSize();
- }
-
- /** {@inheritDoc} */
- @Override public String name() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public String getName() {
- return name;
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<E> iterator() {
- return new BoundedIterator(iterator0());
- }
-
- /**
- * Returns an iterator over a set of elements of type T without check for iteration limit. That is,
- * in case concurrent thread constantly adding new elements to the window we could iterate forever.
- *
- * @return Iterator.
- */
- protected abstract GridStreamerWindowIterator<E> iterator0();
-
- /** {@inheritDoc} */
- @Override public boolean enqueue(E evt) throws GridException {
- lock.readLock();
-
- try {
- boolean res = (filter == null || filter.apply(evt));
-
- if (res) {
- updateIndexes(evt, false);
-
- if (!enqueue0(evt))
- updateIndexes(evt, true);
- }
-
- return res;
- }
- finally {
- lock.readUnlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean enqueue(E... evts) throws GridException {
- return enqueueAll(Arrays.asList(evts));
- }
-
- /** {@inheritDoc} */
- @Override public boolean enqueueAll(Collection<E> evts) throws GridException {
- lock.readLock();
-
- try {
- boolean ignoreFilter = filter == null || F.isAlwaysTrue(filter);
-
- boolean res = true;
-
- for (E evt : evts) {
- if (ignoreFilter || filter.apply(evt)) {
- updateIndexes(evt, false);
-
- boolean added = enqueue0(evt);
-
- if (!added)
- updateIndexes(evt, true);
-
- res &= added;
- }
- }
-
- return res;
- }
- finally {
- lock.readUnlock();
- }
- }
-
- /**
- * Adds event to window.
- *
- * @param evt Event.
- * @return {@code True} if event added.
- */
- protected abstract boolean enqueue0(E evt);
-
- /** {@inheritDoc} */
- @Override public E dequeue() throws GridException {
- return F.first(dequeue(1));
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> dequeueAll() throws GridException {
- return dequeue(size());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> dequeue(int cnt) throws GridException {
- lock.readLock();
-
- try {
- Collection<E> evts = dequeue0(cnt);
-
- if (!evts.isEmpty() && idxs != null) {
- for (E evt : evts)
- updateIndexes(evt, true);
- }
-
- return evts;
- }
- finally {
- lock.readUnlock();
- }
- }
-
- /**
- * Dequeues up to cnt elements from window. If current window size is less than cnt, will dequeue all elements
- * from window.
- *
- * @param cnt Count.
- * @return Dequeued elements.
- */
- protected abstract Collection<E> dequeue0(int cnt);
-
- /** {@inheritDoc} */
- @Override public E pollEvicted() throws GridException {
- return F.first(pollEvicted(1));
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> pollEvictedAll() throws GridException {
- return pollEvicted(evictionQueueSize());
- }
-
- /** {@inheritDoc} */
- @Override public Collection<E> pollEvicted(int cnt) throws GridException {
- lock.readLock();
-
- try {
- Collection<E> evts = pollEvicted0(cnt);
-
- if (!evts.isEmpty() && idxs != null) {
- for (E evt : evts)
- updateIndexes(evt, true);
- }
-
- return evts;
- }
- finally {
- lock.readUnlock();
- }
- }
-
- /**
- * If window supports eviction, this method will return up to cnt evicted elements.
- *
- * @param cnt Count.
- * @return Evicted elements.
- */
- protected abstract Collection<E> pollEvicted0(int cnt);
-
- /** {@inheritDoc} */
- @Override public Collection<E> pollEvictedBatch() throws GridException {
- lock.readLock();
-
- try {
- Collection<E> evts = pollEvictedBatch0();
-
- if (!evts.isEmpty() && idxs != null) {
- for (E evt : evts)
- updateIndexes(evt, true);
- }
-
- return evts;
- }
- finally {
- lock.readUnlock();
- }
- }
-
- /**
- * If window supports batch eviction, this method will poll next evicted batch from window. If windows does not
- * support batch eviction but supports eviction, will return collection of single last evicted element. If window
- * does not support eviction, will return empty collection.
- *
- * @return Elements from evicted batch.
- */
- protected abstract Collection<E> pollEvictedBatch0();
-
- /** {@inheritDoc} */
- @Override public final void start() throws GridException {
- checkConfiguration();
-
- if (idxs != null) {
- for (StreamerIndexProvider<E, ?, ?> idx : idxs)
- idx.initialize();
- }
-
- reset();
- }
-
- /** {@inheritDoc} */
- @Override public final void reset(){
- lock.writeLock();
-
- try {
- if (idxs != null) {
- for (StreamerIndexProvider<E, ?, ?> idx : idxs)
- idx.reset();
- }
-
- reset0();
- }
- finally {
- lock.writeUnlock();
- }
- }
-
- /**
- * Check window configuration.
- *
- * @throws GridException If failed.
- */
- protected abstract void checkConfiguration() throws GridException;
-
- /**
- * Reset routine.
- */
- protected abstract void reset0();
-
- /** {@inheritDoc} */
- @Override public void stop() {
- lock.writeLock();
-
- try {
- stop0();
- }
- finally {
- lock.writeUnlock();
- }
- }
-
- /**
- * Dispose window.
- */
- protected abstract void stop0();
-
- /** {@inheritDoc} */
- @Override public Collection<E> snapshot(boolean includeEvicted) {
- lock.writeLock();
-
- try {
- int skip = includeEvicted ? 0 : evictionQueueSize();
-
- List<E> res = new ArrayList<>(size() - skip);
-
- Iterator<E> iter = iterator();
-
- int i = 0;
-
- while (iter.hasNext()) {
- E next = iter.next();
-
- if (i++ >= skip)
- res.add(next);
- }
-
- return Collections.unmodifiableList(res);
- }
- finally {
- lock.writeUnlock();
- }
- }
-
- /**
- * Sets window name.
- *
- * @param name Window name.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets optional event filter.
- *
- * @return Optional event filter.
- */
- @Nullable public IgnitePredicate<Object> getFilter() {
- return filter;
- }
-
- /**
- * Sets event filter.
- *
- * @param filter Event filter.
- */
- public void setFilter(@Nullable IgnitePredicate<Object> filter) {
- this.filter = filter;
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> StreamerIndex<E, K, V> index() {
- return index(null);
- }
-
- /** {@inheritDoc} */
- @Override public <K, V> StreamerIndex<E, K, V> index(@Nullable String name) {
- if (idxsAsMap != null) {
- StreamerIndexProvider<E, K, V> idx = (StreamerIndexProvider<E, K, V>)idxsAsMap.get(name);
-
- if (idx == null)
- throw new IllegalArgumentException("Streamer index is not configured: " + name);
-
- return idx.index();
- }
-
- throw new IllegalArgumentException("Streamer index is not configured: " + name);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<StreamerIndex<E, ?, ?>> indexes() {
- if (idxs != null) {
- Collection<StreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length);
-
- for (StreamerIndexProvider<E, ?, ?> idx : idxs)
- res.add(idx.index());
-
- return res;
- }
- else
- return Collections.emptyList();
- }
-
- /**
- * Get array of index providers.
- *
- * @return Index providers.
- */
- public StreamerIndexProvider<E, ?, ?>[] indexProviders() {
- return idxs;
- }
-
- /**
- * Set indexes.
- *
- * @param idxs Indexes.
- * @throws IllegalArgumentException If some index names are not unique.
- */
- @SuppressWarnings("unchecked")
- public void setIndexes(StreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException {
- A.ensure(!F.isEmpty(idxs), "!F.isEmpty(idxs)");
-
- idxsAsMap = new HashMap<>(idxs.length, 1.0f);
- this.idxs = new StreamerIndexProvider[idxs.length];
-
- int i = 0;
-
- for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
- StreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx);
-
- if (old != null)
- throw new IllegalArgumentException("Index name is not unique [idx1=" + old + ", idx2=" + idx + ']');
-
- this.idxs[i++] = idx;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void clearEvicted() throws GridException {
- pollEvictedAll();
- }
-
- /**
- * Update indexes.
- *
- * @param evt Event.
- * @param rmv Remove flag.
- * @throws GridException If index update failed.
- */
- protected void updateIndexes(E evt, boolean rmv) throws GridException {
- if (idxs != null) {
- StreamerIndexUpdateSync sync = new StreamerIndexUpdateSync();
-
- boolean rollback = true;
-
- try {
- for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
- if (rmv)
- idx.remove(sync, evt);
- else
- idx.add(sync, evt);
- }
-
- rollback = false;
- }
- finally {
- for (StreamerIndexProvider<E, ?, ?> idx : idxs)
- idx.endUpdate(sync, evt, rollback, rmv);
-
- sync.finish(1);
- }
- }
- }
-
- /**
- * Window iterator wrapper which prevent returning more elements that existed in the underlying collection by the
- * time of iterator creation.
- */
- private class BoundedIterator implements Iterator<E> {
- /** Iterator. */
- private final GridStreamerWindowIterator<E> iter;
-
- /** How many elements to return left (at most). */
- private int left;
-
- /**
- * Constructor.
- *
- * @param iter Iterator.
- */
- private BoundedIterator(GridStreamerWindowIterator<E> iter) {
- assert iter != null;
- assert lock != null;
-
- this.iter = iter;
-
- left = size();
- }
-
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- return left > 0 && iter.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- left--;
-
- if (left < 0)
- throw new NoSuchElementException();
-
- return iter.next();
- }
-
- /** {@inheritDoc} */
- @Override public void remove() {
- if (left < 0)
- throw new IllegalStateException();
-
- lock.readLock();
-
- try {
- E evt = iter.removex();
-
- if (evt != null) {
- try {
- updateIndexes(evt, true);
- }
- catch (GridException e) {
- throw new GridRuntimeException("Faied to remove event: " + evt, e);
- }
- }
- }
- finally {
- lock.readUnlock();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/package.html b/modules/core/src/main/java/org/gridgain/grid/streamer/window/package.html
deleted file mode 100644
index 859a458..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/package.html
+++ /dev/null
@@ -1,14 +0,0 @@
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<!--
- @html.file.header
- _________ _____ __________________ _____
- __ ____/___________(_)______ /__ ____/______ ____(_)_______
- _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
--->
-<html>
-<body>
- Contains streamer window implementations.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
----------------------------------------------------------------------
diff --git a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
index 64363c5..fbc93ea 100644
--- a/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
+++ b/modules/core/src/test/config/streamer/average/spring-streamer-average-base.xml
@@ -35,7 +35,7 @@
<list>
<bean class="org.apache.ignite.streamer.StreamerConfiguration">
<property name="windows">
- <bean class="org.gridgain.grid.streamer.window.StreamerBoundedSizeWindow">
+ <bean class="org.apache.ignite.streamer.window.StreamerBoundedSizeWindow">
<property name="maximumSize" value="500"/>
<property name="indexes">
<list>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
index 37c6585..3c7a0a9 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerEvictionSelfTest.java
@@ -12,11 +12,11 @@ package org.gridgain.grid.kernal.processors.streamer;
import org.apache.ignite.configuration.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.junits.common.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
index a6f4891..0d89ed3 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerFailoverSelfTest.java
@@ -14,10 +14,10 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.junits.common.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
index 00faa98..763c060 100644
--- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerSelfTest.java
@@ -16,12 +16,12 @@ import org.apache.ignite.lang.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.resources.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.spi.discovery.tcp.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.*;
import org.gridgain.grid.spi.discovery.tcp.ipfinder.vm.*;
import org.gridgain.grid.streamer.router.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
index 3b8c47e..e40e9cf 100644
--- a/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/marshaller/GridMarshallerAbstractTest.java
@@ -19,6 +19,7 @@ import org.apache.ignite.marshaller.*;
import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.product.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.cache.*;
import org.gridgain.grid.cache.affinity.*;
@@ -31,7 +32,6 @@ import org.gridgain.grid.kernal.processors.cache.datastructures.*;
import org.gridgain.grid.kernal.processors.service.*;
import org.gridgain.grid.kernal.processors.streamer.*;
import org.gridgain.grid.p2p.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.testframework.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
index 495a576..60aba6a 100644
--- a/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/streamer/index/GridStreamerIndexSelfTest.java
@@ -10,10 +10,10 @@
package org.gridgain.grid.streamer.index;
import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.streamer.index.hash.*;
import org.gridgain.grid.streamer.index.tree.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.gridgain.testframework.junits.common.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
index f86ad71..4432255 100644
--- a/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
+++ b/modules/core/src/test/java/org/gridgain/grid/streamer/window/GridStreamerWindowSelfTest.java
@@ -11,6 +11,7 @@ package org.gridgain.grid.streamer.window;
import org.apache.ignite.lang.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.util.*;
import org.gridgain.grid.util.typedef.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/test/java/org/gridgain/loadtests/streamer/GridStreamerIndexLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/gridgain/loadtests/streamer/GridStreamerIndexLoadTest.java b/modules/core/src/test/java/org/gridgain/loadtests/streamer/GridStreamerIndexLoadTest.java
index 0d01c03..c118041 100644
--- a/modules/core/src/test/java/org/gridgain/loadtests/streamer/GridStreamerIndexLoadTest.java
+++ b/modules/core/src/test/java/org/gridgain/loadtests/streamer/GridStreamerIndexLoadTest.java
@@ -10,11 +10,11 @@
package org.gridgain.loadtests.streamer;
import org.apache.ignite.lang.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.streamer.index.*;
import org.gridgain.grid.streamer.index.hash.*;
import org.gridgain.grid.streamer.index.tree.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;