You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2014/12/05 11:03:25 UTC
[24/32] incubator-ignite git commit: # Renaming
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
new file mode 100644
index 0000000..140e057
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerBoundedTimeWindow.java
@@ -0,0 +1,454 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.util.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Window which is bounded by size and time interval.
+ */
+public class StreamerBoundedTimeWindow<E> extends StreamerWindowAdapter<E> {
+ /** Window structures holder. */
+ private AtomicReference<WindowHolder> ref = new AtomicReference<>();
+
+ /** Time interval. */
+ private long timeInterval;
+
+ /** Window maximum size. */
+ private int maxSize;
+
+ /** Unique flag. */
+ private boolean unique;
+
+ /** Event order counter. */
+ private AtomicLong orderCnt = new AtomicLong();
+
+ /**
+ * Gets window maximum size.
+ *
+ * @return Maximum size.
+ */
+ public int getMaximumSize() {
+ return maxSize;
+ }
+
+ /**
+ * Sets window maximum size.
+ *
+ * @param maxSize Max size.
+ */
+ public void setMaximumSize(int maxSize) {
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Gets window time interval.
+ *
+ * @return Time interval.
+ */
+ public long getTimeInterval() {
+ return timeInterval;
+ }
+
+ /**
+ * Sets window time interval.
+ *
+ * @param timeInterval Time interval.
+ */
+ public void setTimeInterval(long timeInterval) {
+ this.timeInterval = timeInterval;
+ }
+
+ /**
+ * Gets window unique flag.
+ *
+ * @return {@code True} if only unique events should be added to window.
+ */
+ public boolean isUnique() {
+ return unique;
+ }
+
+ /**
+ * Sets window unique flag.
+ *
+ * @param unique {@code True} if only unique events should be added to window.
+ */
+ public void setUnique(boolean unique) {
+ this.unique = unique;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkConfiguration() throws GridException {
+ if (timeInterval <= 0)
+ throw new GridException("Failed to initialize window (timeInterval must be positive): [windowClass=" +
+ getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
+ unique + ']');
+
+ if (maxSize < 0)
+ throw new GridException("Failed to initialize window (maximumSize cannot be negative): [windowClass=" +
+ getClass().getSimpleName() + ", maxSize=" + maxSize + ", timeInterval=" + timeInterval + ", unique=" +
+ unique + ']');
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void stop0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("RedundantCast")
+ @Override protected void reset0() {
+ ref.set(new WindowHolder(newQueue(), unique ? (Set<Object>)new GridConcurrentHashSet<>() : null,
+ new AtomicInteger()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return ref.get().size().get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ // Get estimate for eviction queue size.
+ WindowHolder tup = ref.get();
+
+ GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
+
+ boolean sizeCheck = maxSize != 0;
+
+ int overflow = tup.size().get() - maxSize;
+
+ long timeBound = U.currentTimeMillis() - timeInterval;
+
+ int idx = 0;
+ int cnt = 0;
+
+ for (Holder holder : evtsQueue) {
+ if ((idx < overflow && sizeCheck) || holder.ts < timeBound)
+ cnt++;
+ else if ((idx >= overflow && sizeCheck) && holder.ts >= timeBound)
+ break;
+ else if (!sizeCheck && holder.ts >= timeBound)
+ break;
+
+ idx++;
+ }
+
+ return cnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean enqueue0(E evt) {
+ add(evt, U.currentTimeMillis());
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvicted0(int cnt) {
+ Collection<E> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ E evicted = pollEvictedInternal();
+
+ if (evicted == null)
+ return res;
+
+ res.add(evicted);
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvictedBatch0() {
+ E res = pollEvictedInternal();
+
+ if (res == null)
+ return Collections.emptyList();
+
+ return Collections.singleton(res);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable private <T> T pollEvictedInternal() {
+ WindowHolder tup = ref.get();
+
+ AtomicInteger size = tup.size();
+
+ GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
+
+ long now = U.currentTimeMillis();
+
+ while (true) {
+ int curSize = size.get();
+
+ if (maxSize > 0 && curSize > maxSize) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ Holder hldr = evtsQueue.pollFirst();
+
+ if (hldr != null) {
+ if (unique)
+ tup.set().remove(hldr.val);
+
+ return (T)hldr.val;
+ }
+ else {
+ // No actual events in queue, it means that other thread is just adding event.
+ // return null as it is a concurrent add call.
+ size.incrementAndGet();
+
+ return null;
+ }
+ }
+ }
+ else {
+ // Check if first entry qualifies for eviction.
+ Holder first = evtsQueue.firstx();
+
+ if (first != null && first.ts < now - timeInterval) {
+ if (evtsQueue.remove(first)) {
+ if (unique)
+ tup.set().remove(first.val);
+
+ size.decrementAndGet();
+
+ return (T)first.val;
+ }
+ }
+ else
+ return null;
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> dequeue0(int cnt) {
+ WindowHolder tup = ref.get();
+
+ AtomicInteger size = tup.size();
+ GridConcurrentSkipListSet<Holder<E>> evtsQueue = tup.collection();
+
+ Collection<E> resCol = new ArrayList<>(cnt);
+
+ while (true) {
+ int curSize = size.get();
+
+ if (curSize > 0) {
+ if (size.compareAndSet(curSize, curSize - 1)) {
+ Holder<E> h = evtsQueue.pollLast();
+
+ if (h != null) {
+ resCol.add(h.val);
+
+ if (unique)
+ tup.set().remove(h.val);
+
+ if (resCol.size() >= cnt)
+ return resCol;
+ }
+ else {
+ size.incrementAndGet();
+
+ return resCol;
+ }
+ }
+ }
+ else
+ return resCol;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iterator0() {
+ final WindowHolder win = ref.get();
+
+ final GridConcurrentSkipListSet<Holder<E>> col = win.collection();
+ final Set<Object> set = win.set();
+
+ final Iterator<Holder<E>> it = col.iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ private Holder<E> lastRet;
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public E next() {
+ lastRet = it.next();
+
+ return lastRet.val;
+ }
+
+ @Override public E removex() {
+ if (lastRet == null)
+ throw new IllegalStateException();
+
+ if (col.remove(lastRet)) {
+ if (set != null)
+ set.remove(lastRet.val);
+
+ win.size().decrementAndGet();
+
+ return lastRet.val;
+ }
+ else
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Checks queue consistency. Used in tests.
+ */
+ void consistencyCheck() {
+ WindowHolder win = ref.get();
+
+ assert win.collection().size() == win.size().get();
+
+ if (win.set() != null) {
+ // Check no duplicates in collection.
+
+ Collection<Object> vals = new HashSet<>();
+
+ for (Object evt : win.collection())
+ assert vals.add(((Holder)evt).val);
+ }
+ }
+
+ /**
+ * @return New queue.
+ */
+ private GridConcurrentSkipListSet<Holder<E>> newQueue() {
+ return new GridConcurrentSkipListSet<>(new Comparator<Holder>() {
+ @Override public int compare(Holder h1, Holder h2) {
+ if (h1 == h2)
+ return 0;
+
+ if (h1.ts != h2.ts)
+ return h1.ts < h2.ts ? -1 : 1;
+
+ return h1.order < h2.order ? -1 : 1;
+ }
+ });
+ }
+
+ /**
+ * @param evt Event to add.
+ * @param ts Event timestamp.
+ */
+ private void add(E evt, long ts) {
+ WindowHolder tup = ref.get();
+
+ if (!unique) {
+ tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
+
+ tup.size().incrementAndGet();
+ }
+ else {
+ if (tup.set().add(evt)) {
+ tup.collection().add(new Holder<>(evt, ts, orderCnt.incrementAndGet()));
+
+ tup.size().incrementAndGet();
+ }
+ }
+ }
+
+ /**
+ * @param evts Events to add.
+ * @param ts Timestamp for added events.
+ */
+ private void addAll(Iterable<E> evts, long ts) {
+ for (E evt : evts)
+ add(evt, ts);
+ }
+
+ /**
+ * Holder.
+ */
+ private static class Holder<E> {
+ /** Value. */
+ private E val;
+
+ /** Event timestamp. */
+ private long ts;
+
+ /** Event order. */
+ private long order;
+
+ /**
+ * @param val Event.
+ * @param ts Timestamp.
+ * @param order Order.
+ */
+ private Holder(E val, long ts, long order) {
+ this.val = val;
+ this.ts = ts;
+ this.order = order;
+ }
+ }
+
+ /**
+ * Window holder.
+ */
+ @SuppressWarnings("ConstantConditions")
+ private class WindowHolder extends GridTuple3<GridConcurrentSkipListSet<Holder<E>>, Set<Object>, AtomicInteger> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public WindowHolder() {
+ // No-op.
+ }
+
+ /**
+ * @param col Collection.
+ * @param set Set if unique.
+ * @param size Size.
+ */
+ private WindowHolder(@Nullable GridConcurrentSkipListSet<Holder<E>> col,
+ @Nullable Set<Object> set, @Nullable AtomicInteger size) {
+ super(col, set, size);
+ }
+
+ /**
+ * @return Holders collection.
+ */
+ public GridConcurrentSkipListSet<Holder<E>> collection() {
+ return get1();
+ }
+
+ /**
+ * @return Uniqueness set.
+ */
+ public Set<Object> set() {
+ return get2();
+ }
+
+ /**
+ * @return Size counter.
+ */
+ public AtomicInteger size() {
+ return get3();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
new file mode 100644
index 0000000..e3265e9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerUnboundedWindow.java
@@ -0,0 +1,103 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.jdk8.backport.*;
+
+import java.util.*;
+
+/**
+ * Unbounded window which holds all events. Events can be evicted manually from window
+ * via any of the {@code dequeue(...)} methods.
+ */
+public class StreamerUnboundedWindow<E> extends StreamerWindowAdapter<E> {
+ /** Events. */
+ private ConcurrentLinkedDeque8<E> evts = new ConcurrentLinkedDeque8<>();
+
+ /** {@inheritDoc} */
+ @Override protected void stop0() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void checkConfiguration() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void reset0() {
+ evts.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return evts.sizex();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridStreamerWindowIterator<E> iterator0() {
+ final ConcurrentLinkedDeque8.IteratorEx<E> it = (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
+
+ return new GridStreamerWindowIterator<E>() {
+ private E lastRet;
+
+ @Override public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ @Override public E next() {
+ lastRet = it.next();
+
+ return lastRet;
+ }
+
+ @Override public E removex() {
+ return (it.removex()) ? lastRet : null;
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public int evictionQueueSize() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enqueue0(E evt) {
+ return evts.add(evt);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> dequeue0(int cnt) {
+ Collection<E> res = new ArrayList<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ E evicted = evts.pollLast();
+
+ if (evicted != null)
+ res.add(evicted);
+ else
+ break;
+ }
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvicted0(int cnt) {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Collection<E> pollEvictedBatch0() {
+ return Collections.emptyList();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
new file mode 100644
index 0000000..48e3f7d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/StreamerWindowAdapter.java
@@ -0,0 +1,529 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.apache.ignite.streamer.window;
+
+import org.apache.ignite.lang.*;
+import org.apache.ignite.lifecycle.*;
+import org.apache.ignite.streamer.*;
+import org.gridgain.grid.*;
+import org.gridgain.grid.kernal.processors.streamer.*;
+import org.gridgain.grid.streamer.index.*;
+import org.gridgain.grid.util.typedef.*;
+import org.gridgain.grid.util.typedef.internal.*;
+import org.gridgain.grid.util.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Streamer window adapter.
+ */
+public abstract class StreamerWindowAdapter<E> implements LifecycleAware, StreamerWindow<E>,
+ StreamerWindowMBean {
+ /** Default window name. */
+ private String name = getClass().getSimpleName();
+
+ /** Filter predicate. */
+ private IgnitePredicate<Object> filter;
+
+ /** Indexes. */
+ private Map<String, StreamerIndexProvider<E, ?, ?>> idxsAsMap;
+
+ /** */
+ private StreamerIndexProvider<E, ?, ?>[] idxs;
+
+ /** Lock for updates and snapshot. */
+ private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+
+ /** {@inheritDoc} */
+ @Override public String getClassName() {
+ return U.compact(getClass().getName());
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getSize() {
+ return size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getEvictionQueueSize() {
+ return evictionQueueSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<E> iterator() {
+ return new BoundedIterator(iterator0());
+ }
+
+ /**
+ * Returns an iterator over a set of elements of type T without check for iteration limit. That is,
+ * in case concurrent thread constantly adding new elements to the window we could iterate forever.
+ *
+ * @return Iterator.
+ */
+ protected abstract GridStreamerWindowIterator<E> iterator0();
+
+ /** {@inheritDoc} */
+ @Override public boolean enqueue(E evt) throws GridException {
+ lock.readLock();
+
+ try {
+ boolean res = (filter == null || filter.apply(evt));
+
+ if (res) {
+ updateIndexes(evt, false);
+
+ if (!enqueue0(evt))
+ updateIndexes(evt, true);
+ }
+
+ return res;
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enqueue(E... evts) throws GridException {
+ return enqueueAll(Arrays.asList(evts));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enqueueAll(Collection<E> evts) throws GridException {
+ lock.readLock();
+
+ try {
+ boolean ignoreFilter = filter == null || F.isAlwaysTrue(filter);
+
+ boolean res = true;
+
+ for (E evt : evts) {
+ if (ignoreFilter || filter.apply(evt)) {
+ updateIndexes(evt, false);
+
+ boolean added = enqueue0(evt);
+
+ if (!added)
+ updateIndexes(evt, true);
+
+ res &= added;
+ }
+ }
+
+ return res;
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /**
+ * Adds event to window.
+ *
+ * @param evt Event.
+ * @return {@code True} if event added.
+ */
+ protected abstract boolean enqueue0(E evt);
+
+ /** {@inheritDoc} */
+ @Override public E dequeue() throws GridException {
+ return F.first(dequeue(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> dequeueAll() throws GridException {
+ return dequeue(size());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> dequeue(int cnt) throws GridException {
+ lock.readLock();
+
+ try {
+ Collection<E> evts = dequeue0(cnt);
+
+ if (!evts.isEmpty() && idxs != null) {
+ for (E evt : evts)
+ updateIndexes(evt, true);
+ }
+
+ return evts;
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /**
+ * Dequeues up to cnt elements from window. If current window size is less than cnt, will dequeue all elements
+ * from window.
+ *
+ * @param cnt Count.
+ * @return Dequeued elements.
+ */
+ protected abstract Collection<E> dequeue0(int cnt);
+
+ /** {@inheritDoc} */
+ @Override public E pollEvicted() throws GridException {
+ return F.first(pollEvicted(1));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> pollEvictedAll() throws GridException {
+ return pollEvicted(evictionQueueSize());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> pollEvicted(int cnt) throws GridException {
+ lock.readLock();
+
+ try {
+ Collection<E> evts = pollEvicted0(cnt);
+
+ if (!evts.isEmpty() && idxs != null) {
+ for (E evt : evts)
+ updateIndexes(evt, true);
+ }
+
+ return evts;
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /**
+ * If window supports eviction, this method will return up to cnt evicted elements.
+ *
+ * @param cnt Count.
+ * @return Evicted elements.
+ */
+ protected abstract Collection<E> pollEvicted0(int cnt);
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> pollEvictedBatch() throws GridException {
+ lock.readLock();
+
+ try {
+ Collection<E> evts = pollEvictedBatch0();
+
+ if (!evts.isEmpty() && idxs != null) {
+ for (E evt : evts)
+ updateIndexes(evt, true);
+ }
+
+ return evts;
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+
+ /**
+ * If window supports batch eviction, this method will poll next evicted batch from window. If windows does not
+ * support batch eviction but supports eviction, will return collection of single last evicted element. If window
+ * does not support eviction, will return empty collection.
+ *
+ * @return Elements from evicted batch.
+ */
+ protected abstract Collection<E> pollEvictedBatch0();
+
+ /** {@inheritDoc} */
+ @Override public final void start() throws GridException {
+ checkConfiguration();
+
+ if (idxs != null) {
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs)
+ idx.initialize();
+ }
+
+ reset();
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void reset(){
+ lock.writeLock();
+
+ try {
+ if (idxs != null) {
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs)
+ idx.reset();
+ }
+
+ reset0();
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ * Check window configuration.
+ *
+ * @throws GridException If failed.
+ */
+ protected abstract void checkConfiguration() throws GridException;
+
+ /**
+ * Reset routine.
+ */
+ protected abstract void reset0();
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ lock.writeLock();
+
+ try {
+ stop0();
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ * Dispose window.
+ */
+ protected abstract void stop0();
+
+ /** {@inheritDoc} */
+ @Override public Collection<E> snapshot(boolean includeEvicted) {
+ lock.writeLock();
+
+ try {
+ int skip = includeEvicted ? 0 : evictionQueueSize();
+
+ List<E> res = new ArrayList<>(size() - skip);
+
+ Iterator<E> iter = iterator();
+
+ int i = 0;
+
+ while (iter.hasNext()) {
+ E next = iter.next();
+
+ if (i++ >= skip)
+ res.add(next);
+ }
+
+ return Collections.unmodifiableList(res);
+ }
+ finally {
+ lock.writeUnlock();
+ }
+ }
+
+ /**
+ * Sets window name.
+ *
+ * @param name Window name.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets optional event filter.
+ *
+ * @return Optional event filter.
+ */
+ @Nullable public IgnitePredicate<Object> getFilter() {
+ return filter;
+ }
+
+ /**
+ * Sets event filter.
+ *
+ * @param filter Event filter.
+ */
+ public void setFilter(@Nullable IgnitePredicate<Object> filter) {
+ this.filter = filter;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> StreamerIndex<E, K, V> index() {
+ return index(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> StreamerIndex<E, K, V> index(@Nullable String name) {
+ if (idxsAsMap != null) {
+ StreamerIndexProvider<E, K, V> idx = (StreamerIndexProvider<E, K, V>)idxsAsMap.get(name);
+
+ if (idx == null)
+ throw new IllegalArgumentException("Streamer index is not configured: " + name);
+
+ return idx.index();
+ }
+
+ throw new IllegalArgumentException("Streamer index is not configured: " + name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<StreamerIndex<E, ?, ?>> indexes() {
+ if (idxs != null) {
+ Collection<StreamerIndex<E, ?, ?>> res = new ArrayList<>(idxs.length);
+
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs)
+ res.add(idx.index());
+
+ return res;
+ }
+ else
+ return Collections.emptyList();
+ }
+
+ /**
+ * Get array of index providers.
+ *
+ * @return Index providers.
+ */
+ public StreamerIndexProvider<E, ?, ?>[] indexProviders() {
+ return idxs;
+ }
+
+ /**
+ * Set indexes.
+ *
+ * @param idxs Indexes.
+ * @throws IllegalArgumentException If some index names are not unique.
+ */
+ @SuppressWarnings("unchecked")
+ public void setIndexes(StreamerIndexProvider<E, ?, ?>... idxs) throws IllegalArgumentException {
+ A.ensure(!F.isEmpty(idxs), "!F.isEmpty(idxs)");
+
+ idxsAsMap = new HashMap<>(idxs.length, 1.0f);
+ this.idxs = new StreamerIndexProvider[idxs.length];
+
+ int i = 0;
+
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
+ StreamerIndexProvider<E, ?, ?> old = idxsAsMap.put(idx.getName(), idx);
+
+ if (old != null)
+ throw new IllegalArgumentException("Index name is not unique [idx1=" + old + ", idx2=" + idx + ']');
+
+ this.idxs[i++] = idx;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearEvicted() throws GridException {
+ pollEvictedAll();
+ }
+
+ /**
+ * Update indexes.
+ *
+ * @param evt Event.
+ * @param rmv Remove flag.
+ * @throws GridException If index update failed.
+ */
+ protected void updateIndexes(E evt, boolean rmv) throws GridException {
+ if (idxs != null) {
+ StreamerIndexUpdateSync sync = new StreamerIndexUpdateSync();
+
+ boolean rollback = true;
+
+ try {
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs) {
+ if (rmv)
+ idx.remove(sync, evt);
+ else
+ idx.add(sync, evt);
+ }
+
+ rollback = false;
+ }
+ finally {
+ for (StreamerIndexProvider<E, ?, ?> idx : idxs)
+ idx.endUpdate(sync, evt, rollback, rmv);
+
+ sync.finish(1);
+ }
+ }
+ }
+
+ /**
+ * Window iterator wrapper which prevent returning more elements that existed in the underlying collection by the
+ * time of iterator creation.
+ */
+ private class BoundedIterator implements Iterator<E> {
+ /** Iterator. */
+ private final GridStreamerWindowIterator<E> iter;
+
+ /** How many elements to return left (at most). */
+ private int left;
+
+ /**
+ * Constructor.
+ *
+ * @param iter Iterator.
+ */
+ private BoundedIterator(GridStreamerWindowIterator<E> iter) {
+ assert iter != null;
+ assert lock != null;
+
+ this.iter = iter;
+
+ left = size();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return left > 0 && iter.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public E next() {
+ left--;
+
+ if (left < 0)
+ throw new NoSuchElementException();
+
+ return iter.next();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ if (left < 0)
+ throw new IllegalStateException();
+
+ lock.readLock();
+
+ try {
+ E evt = iter.removex();
+
+ if (evt != null) {
+ try {
+ updateIndexes(evt, true);
+ }
+ catch (GridException e) {
+ throw new GridRuntimeException("Faied to remove event: " + evt, e);
+ }
+ }
+ }
+ finally {
+ lock.readUnlock();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/apache/ignite/streamer/window/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streamer/window/package.html b/modules/core/src/main/java/org/apache/ignite/streamer/window/package.html
new file mode 100644
index 0000000..859a458
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/streamer/window/package.html
@@ -0,0 +1,14 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<!--
+ @html.file.header
+ _________ _____ __________________ _____
+ __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+-->
+<html>
+<body>
+ Contains streamer window implementations.
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
index b28378d..52b67f3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamProcessor.java
@@ -12,12 +12,12 @@ package org.gridgain.grid.kernal.processors.streamer;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.streamer.*;
+import org.apache.ignite.streamer.window.*;
import org.gridgain.grid.*;
import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.*;
import org.gridgain.grid.kernal.processors.license.*;
import org.gridgain.grid.streamer.index.*;
-import org.gridgain.grid.streamer.window.*;
import org.gridgain.grid.util.typedef.*;
import org.gridgain.grid.util.typedef.internal.*;
import org.jetbrains.annotations.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
index 124cc18..cbe2c7f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
+++ b/modules/core/src/main/java/org/gridgain/grid/streamer/index/StreamerIndexProvider.java
@@ -16,7 +16,7 @@ import org.gridgain.grid.*;
* to perform event indexing.
* <p>
* To configure index for a streamer window, use
- * {@link org.gridgain.grid.streamer.window.StreamerWindowAdapter#setIndexes(StreamerIndexProvider[])}.
+ * {@link org.apache.ignite.streamer.window.StreamerWindowAdapter#setIndexes(StreamerIndexProvider[])}.
*/
public interface StreamerIndexProvider<E, K, V> extends StreamerIndexProviderMBean {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeBatchWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeBatchWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeBatchWindow.java
deleted file mode 100644
index c8ab1b6..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeBatchWindow.java
+++ /dev/null
@@ -1,795 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.typedef.internal.*;
-import org.gridgain.grid.util.lang.*;
-import org.gridgain.grid.util.tostring.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Window that is bounded by size and accumulates events to batches.
- */
-public class StreamerBoundedSizeBatchWindow<E> extends StreamerWindowAdapter<E> {
- /** Max size. */
- private int batchSize;
-
- /** Min size. */
- private int maxBatches;
-
- /** Reference for queue and size. */
- private volatile QueueHolder holder;
-
- /** Enqueue lock. */
- private ReadWriteLock enqueueLock = new ReentrantReadWriteLock();
-
- /**
- * Gets maximum number of batches can be stored in window.
- *
- * @return Maximum number of batches for window.
- */
- public int getMaximumBatches() {
- return maxBatches;
- }
-
- /**
- * Sets maximum number of batches can be stored in window.
- *
- * @param maxBatches Maximum number of batches for window.
- */
- public void setMaximumBatches(int maxBatches) {
- this.maxBatches = maxBatches;
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Sets batch size.
- *
- * @param batchSize Batch size.
- */
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (batchSize <= 0)
- throw new GridException("Failed to initialize window (batchSize size must be positive) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize + ']');
-
- if (maxBatches < 0)
- throw new GridException("Failed to initialize window (maximumBatches cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maximumBatches=" + maxBatches +
- ", batchSize=" + batchSize + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ConcurrentLinkedDeque8<Batch> first = new ConcurrentLinkedDeque8<>();
-
- Batch b = new Batch(batchSize);
-
- ConcurrentLinkedDeque8.Node<Batch> n = first.offerLastx(b);
-
- b.node(n);
-
- holder = new QueueHolder(first, new AtomicInteger(1), new AtomicInteger());
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return holder.totalQueueSize().get();
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- final QueueHolder win = holder;
-
- final Iterator<Batch> batchIt = win.batchQueue().iterator();
-
- return new GridStreamerWindowIterator<E>() {
- /** Current batch iterator. */
- private ConcurrentLinkedDeque8.IteratorEx<E> curBatchIt;
-
- /** Next batch iterator. Will be null if no more batches available. */
- private ConcurrentLinkedDeque8.IteratorEx<E> nextBatchIt;
-
- /** Last returned value. */
- private E lastRet;
-
- {
- curBatchIt = batchIt.hasNext() ? batchIt.next().iterator() : null;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("SimplifiableIfStatement")
- @Override public boolean hasNext() {
- if (curBatchIt != null) {
- if (curBatchIt.hasNext())
- return true;
-
- return nextBatchIt != null && nextBatchIt.hasNext();
- }
- else
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public E next() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (!curBatchIt.hasNext()) {
- if (nextBatchIt != null) {
- curBatchIt = nextBatchIt;
-
- nextBatchIt = null;
-
- lastRet = curBatchIt.next();
- }
- else
- throw new NoSuchElementException();
- }
- else {
- E next = curBatchIt.next();
-
- // Moved to last element in batch - check for next iterator.
- if (!curBatchIt.hasNext())
- advanceBatch();
-
- lastRet = next;
- }
-
- return lastRet;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public E removex() {
- if (curBatchIt == null)
- throw new NoSuchElementException();
-
- if (curBatchIt.removex()) {
- // Decrement global size if deleted.
- win.totalQueueSize().decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
-
- /**
- * Moves to the next batch.
- */
- private void advanceBatch() {
- if (batchIt.hasNext()) {
- Batch batch = batchIt.next();
-
- nextBatchIt = batch.iterator();
- }
- else
- nextBatchIt = null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- QueueHolder win = holder;
-
- int oversizeCnt = Math.max(0, win.batchQueueSize().get() - maxBatches);
-
- Iterator<Batch> it = win.batchQueue().iterator();
-
- int size = 0;
-
- int idx = 0;
-
- while (it.hasNext()) {
- Batch batch = it.next();
-
- if (idx++ < oversizeCnt)
- size += batch.size();
- }
-
- return size;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- try {
- return enqueueInternal(evt);
- }
- catch (GridInterruptedException ignored) {
- return false;
- }
- }
-
- /**
- * Enqueue event to window.
- *
- * @param evt Event to add.
- * @return {@code True} if event was added.
- *
- * @throws GridInterruptedException If thread was interrupted.
- */
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- private boolean enqueueInternal(E evt) throws GridInterruptedException {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- Batch last = evts.peekLast();
-
- if (last == null || !last.add(evt)) {
- // This call will ensure that last object is actually added to batch
- // before we add new batch to events queue.
- // If exception is thrown here, window will be left in consistent state.
- if (last != null)
- last.finish();
-
- // Add new batch to queue in write lock.
- if (enqueueLock.writeLock().tryLock()) {
- try {
- Batch first0 = evts.peekLast();
-
- if (first0 == last) {
- Batch batch = new Batch(batchSize);
-
- ConcurrentLinkedDeque8.Node<Batch> node = evts.offerLastx(batch);
-
- batch.node(node);
-
- size.incrementAndGet();
-
- if (batch.removed() && evts.unlinkx(node))
- size.decrementAndGet();
- }
- }
- finally {
- enqueueLock.writeLock().unlock();
- }
- }
- else {
- // Acquire read lock to wait for batch enqueue.
- enqueueLock.readLock().lock();
-
- enqueueLock.readLock().unlock();
- }
- }
- else {
- // Event was added, global size increment.
- tup.totalQueueSize().incrementAndGet();
-
- return true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxBatches) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- assert first.finished();
-
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- if (!polled.isEmpty())
- res.addAll(polled);
-
- if (first.isEmpty()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
- }
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
- else
- break;
- }
-
- // Removed entries, update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxBatches) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- Batch polled = evts.poll();
-
- if (polled != null) {
- assert polled.finished();
-
- // Mark batch deleted for consistency.
- polled.markRemoved();
-
- Collection<E> polled0 = polled.shrink();
-
- // Result of shrink is empty, must retry the poll.
- if (!polled0.isEmpty()) {
- // Update global size.
- tup.totalQueueSize().addAndGet(-polled0.size());
-
- return polled0;
- }
- }
- else {
- // Polled was zero, so we must restore counter and return.
- size.incrementAndGet();
-
- return Collections.emptyList();
- }
- }
- }
- else
- return Collections.emptyList();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- QueueHolder tup = holder;
-
- ConcurrentLinkedDeque8<Batch> evts = tup.batchQueue();
- AtomicInteger size = tup.batchQueueSize();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- while (true) {
- // Just peek the first batch.
- Batch first = evts.peekFirst();
-
- if (first != null) {
- Collection<E> polled = first.pollNonBatch(cnt - res.size());
-
- // We must check for finished before unlink as no elements
- // can be added to batch after it is finished.
- if (first.isEmpty() && first.emptyFinished()) {
- ConcurrentLinkedDeque8.Node<Batch> node = first.node();
-
- first.markRemoved();
-
- if (node != null && evts.unlinkx(node))
- size.decrementAndGet();
-
- assert first.isEmpty();
- }
- else if (polled.isEmpty())
- break;
-
- res.addAll(polled);
-
- if (res.size() == cnt)
- break;
- }
- else
- break;
- }
-
- // Update global size.
- tup.totalQueueSize().addAndGet(-res.size());
-
- return res;
- }
-
- /**
- * Consistency check, used for testing.
- */
- void consistencyCheck() {
- QueueHolder win = holder;
-
- Iterator<E> it = iterator();
-
- int cnt = 0;
-
- while (it.hasNext()) {
- it.next();
-
- cnt++;
- }
-
- int cnt0 = 0;
-
- for (Batch batch : win.batchQueue())
- cnt0 += batch.size();
-
- int sz = size();
-
- assert cnt0 == sz : "Batch size comparison failed [batchCnt=" + cnt0 + ", size=" + sz + ']';
- assert cnt == sz : "Queue size comparison failed [iterCnt=" + cnt + ", size=" + sz + ']';
- assert win.batchQueue().size() == win.batchQueueSize().get();
- }
-
- /**
- * Window structure.
- */
- private class QueueHolder extends GridTuple3<ConcurrentLinkedDeque8<Batch>, AtomicInteger, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public QueueHolder() {
- // No-op.
- }
-
- /**
- * @param batchQueue Batch queue.
- * @param batchQueueSize Batch queue size counter.
- * @param globalSize Global size counter.
- */
- private QueueHolder(ConcurrentLinkedDeque8<Batch> batchQueue,
- AtomicInteger batchQueueSize, @Nullable AtomicInteger globalSize) {
- super(batchQueue, batchQueueSize, globalSize);
-
- assert batchQueue.size() == 1;
- assert batchQueueSize.get() == 1;
- }
-
- /**
- * @return Events queue.
- */
- @SuppressWarnings("ConstantConditions")
- public ConcurrentLinkedDeque8<Batch> batchQueue() {
- return get1();
- }
-
- /**
- * @return Batch queue size.
- */
- @SuppressWarnings("ConstantConditions")
- public AtomicInteger batchQueueSize() {
- return get2();
- }
-
- /**
- * @return Global queue size.
- */
- @SuppressWarnings("ConstantConditions")
- public AtomicInteger totalQueueSize() {
- return get3();
- }
- }
-
- /**
- * Batch.
- */
- private class Batch extends ReentrantReadWriteLock implements Iterable<E> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Batch events. */
- private ConcurrentLinkedDeque8<E> evts;
-
- /** Capacity. */
- private AtomicInteger cap;
-
- /** Finished. */
- private volatile boolean finished;
-
- /** Queue node. */
- @GridToStringExclude
- private ConcurrentLinkedDeque8.Node<Batch> qNode;
-
- /** Node removed flag. */
- private volatile boolean rmvd;
-
- /**
- * @param batchSize Batch size.
- */
- private Batch(int batchSize) {
- cap = new AtomicInteger(batchSize);
-
- evts = new ConcurrentLinkedDeque8<>();
- }
-
- /**
- * @return {@code True} if batch is removed.
- */
- public boolean removed() {
- return rmvd;
- }
-
- /**
- * Marks batch as removed.
- */
- public void markRemoved() {
- rmvd = true;
- }
-
- /**
- * Adds event to batch.
- *
- * @param evt Event to add.
- * @return {@code True} if event was added, {@code false} if batch is full.
- */
- public boolean add(E evt) {
- readLock().lock();
-
- try {
- if (finished)
- return false;
-
- while (true) {
- int size = cap.get();
-
- if (size > 0) {
- if (cap.compareAndSet(size, size - 1)) {
- evts.add(evt);
-
- // Will go through write lock and finish batch.
- if (size == 1)
- finished = true;
-
- return true;
- }
- }
- else
- return false;
- }
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return Queue node.
- */
- public ConcurrentLinkedDeque8.Node<Batch> node() {
- return qNode;
- }
-
- /**
- * @param qNode Queue node.
- */
- public void node(ConcurrentLinkedDeque8.Node<Batch> qNode) {
- this.qNode = qNode;
- }
-
- /**
- * Waits for latch count down after last event was added.
- *
- * @throws GridInterruptedException If wait was interrupted.
- */
- public void finish() throws GridInterruptedException {
- writeLock().lock();
-
- try {
- // Safety.
- assert cap.get() == 0;
- assert finished;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is finished and no more events will be added to it.
- */
- public boolean finished() {
- readLock().lock();
-
- try {
- return finished;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Gets batch size.
- *
- * @return Batch size.
- */
- public int size() {
- readLock().lock();
-
- try {
- return evts == null ? 0 : evts.sizex();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * @return {@code True} if batch is empty.
- */
- public boolean isEmpty() {
- readLock().lock();
-
- try {
- return evts == null || evts.isEmpty();
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Checks if batch is empty and finished inside write lock. This will ensure that no more entries will
- * be added to batch and it can be safely unlinked from the queue.
- *
- * @return {@code True} if batch is empty and finished.
- */
- public boolean emptyFinished() {
- writeLock().lock();
-
- try {
- return finished && (evts == null || evts.isEmpty());
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public ConcurrentLinkedDeque8.IteratorEx<E> iterator() {
- readLock().lock();
-
- try {
- if (evts != null)
- return (ConcurrentLinkedDeque8.IteratorEx<E>)evts.iterator();
-
- return new ConcurrentLinkedDeque8.IteratorEx<E>() {
- @Override public boolean removex() {
- throw new NoSuchElementException();
- }
-
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public E next() {
- throw new NoSuchElementException();
- }
-
- @Override public void remove() {
- throw new NoSuchElementException();
- }
- };
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Polls up to {@code cnt} objects from batch in concurrent fashion.
- *
- * @param cnt Number of objects to poll.
- * @return Collection of polled elements or empty collection if nothing to poll.
- */
- public Collection<E> pollNonBatch(int cnt) {
- readLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evt = evts.poll();
-
- if (evt != null)
- res.add(evt);
- else
- return res;
- }
-
- return res;
- }
- finally {
- readLock().unlock();
- }
- }
-
- /**
- * Shrinks this batch. No events can be polled from it after this method.
- *
- * @return Collection of events contained in batch before shrink (empty collection in
- * case no events were present).
- */
- public Collection<E> shrink() {
- writeLock().lock();
-
- try {
- if (evts == null)
- return Collections.emptyList();
-
- // Since iterator can concurrently delete elements, we must poll here.
- Collection<E> res = new ArrayList<>(evts.sizex());
-
- E o;
-
- while ((o = evts.poll()) != null)
- res.add(o);
-
- // Nothing cal be polled after shrink.
- evts = null;
-
- return res;
- }
- finally {
- writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- ConcurrentLinkedDeque8<E> evts0 = evts;
-
- return S.toString(Batch.class, this, "evtQueueSize", evts0 == null ? 0 : evts0.sizex());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeSortedWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeSortedWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeSortedWindow.java
deleted file mode 100644
index c797976..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeSortedWindow.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Size-bounded sorted window. Unlike {@link StreamerBoundedSizeWindow}, which limits
- * window only on size, this window also provides events in sorted order.
- */
-public class StreamerBoundedSizeSortedWindow<E>
- extends StreamerBoundedSizeWindowAdapter<E, StreamerBoundedSizeSortedWindow.Holder<E>> {
- /** Comparator. */
- private Comparator<E> comp;
-
- /** Order counter. */
- private AtomicLong orderCnt = new AtomicLong();
-
- /**
- * Gets event comparator.
- *
- * @return Event comparator.
- */
- public Comparator<E> getComparator() {
- return comp;
- }
-
- /**
- * Sets event comparator.
- *
- * @param comp Comparator.
- */
- public void setComparator(Comparator<E> comp) {
- this.comp = comp;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override protected Collection<Holder<E>> newCollection() {
- final Comparator<E> comp0 = comp;
-
- Collection<Holder<E>> col = new GridConcurrentSkipListSet<>(new Comparator<Holder<E>>() {
- @Override public int compare(Holder<E> h1, Holder<E> h2) {
- if (h1 == h2)
- return 0;
-
- int diff = comp0 == null ?
- ((Comparable<E>)h1.val).compareTo(h2.val) : comp0.compare(h1.val, h2.val);
-
- if (diff != 0)
- return diff;
- else {
- assert h1.order != h2.order;
-
- return h1.order < h2.order ? -1 : 1;
- }
- }
- });
-
- return (Collection)col;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean addInternal(E evt, Collection<Holder<E>> col, @Nullable Set<E> set) {
- if (comp == null) {
- if (!(evt instanceof Comparable))
- throw new IllegalArgumentException("Failed to add object to window (object is not comparable and no " +
- "comparator is specified: " + evt);
- }
-
- if (set != null) {
- if (set.add(evt)) {
- col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
- return true;
- }
-
- return false;
- }
- else {
- col.add(new Holder<>(evt, orderCnt.getAndIncrement()));
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected int addAllInternal(Collection<E> evts, Collection<Holder<E>> col, @Nullable Set<E> set) {
- int cnt = 0;
-
- for (E evt : evts) {
- if (addInternal(evt, col, set))
- cnt++;
- }
-
- return cnt;
- }
-
- /** {@inheritDoc} */
- @Override protected E pollInternal(Collection<Holder<E>> col, Set<E> set) {
- Holder<E> h = (Holder<E>)((NavigableSet<E>)col).pollLast();
-
- if (set != null && h != null)
- set.remove(h.val);
-
- return h == null ? null : h.val;
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iteratorInternal(final Collection<Holder<E>> col,
- final Set<E> set, final AtomicInteger size) {
- final Iterator<Holder<E>> it = col.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private Holder<E> lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet.val;
- }
-
- @Override public E removex() {
- if (lastRet == null)
- throw new IllegalStateException();
-
- if (col.remove(lastRet)) {
- if (set != null)
- set.remove(lastRet.val);
-
- size.decrementAndGet();
-
- return lastRet.val;
- }
- else
- return null;
- }
- };
- }
-
- /**
- * Value wrapper.
- */
- @SuppressWarnings("PackageVisibleInnerClass")
- static class Holder<E> {
- /** Value. */
- private E val;
-
- /** Order to distinguish between objects for which comparator returns 0. */
- private long order;
-
- /**
- * @param val Value to hold.
- * @param order Adding order.
- */
- private Holder(E val, long order) {
- this.val = val;
- this.order = order;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return val.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- if (this == obj)
- return false;
-
- if (!(obj instanceof Holder))
- return false;
-
- Holder h = (Holder)obj;
-
- return F.eq(val, h.val) && order == h.order;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void consistencyCheck(Collection<Holder<E>> col, Set<E> set, AtomicInteger size) {
- assert col.size() == size.get();
-
- if (set != null) {
- // Check no duplicates in collection.
-
- Collection<Object> vals = new HashSet<>();
-
- for (Object evt : col)
- assert vals.add(((Holder)evt).val);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindow.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindow.java
deleted file mode 100644
index ff6fedc..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindow.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Queue window bounded by number of elements in queue. After adding elements to this window called
- * must check for evicted events.
- * <p>
- * It is guaranteed that window size will never get less then maximum size when poling from this window
- * concurrently from different threads.
- */
-public class StreamerBoundedSizeWindow<E> extends StreamerBoundedSizeWindowAdapter<E, E> {
- /** {@inheritDoc} */
- @Override protected Collection<E> newCollection() {
- return new ConcurrentLinkedDeque8<>();
- }
-
- /** {@inheritDoc} */
- @Override public GridStreamerWindowIterator<E> iteratorInternal(Collection<E> col, final Set<E> set,
- final AtomicInteger size) {
- final ConcurrentLinkedDeque8.IteratorEx<E> it =
- (ConcurrentLinkedDeque8.IteratorEx<E>)col.iterator();
-
- return new GridStreamerWindowIterator<E>() {
- private E lastRet;
-
- @Override public boolean hasNext() {
- return it.hasNext();
- }
-
- @Override public E next() {
- lastRet = it.next();
-
- return lastRet;
- }
-
- @Override public E removex() {
- if (it.removex()) {
- if (set != null)
- set.remove(lastRet);
-
- size.decrementAndGet();
-
- return lastRet;
- }
- else
- return null;
- }
- };
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("IfMayBeConditional")
- @Override protected boolean addInternal(E evt, Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
-
- // If unique.
- if (set != null) {
- if (set.add(evt)) {
- col.add(evt);
-
- return true;
- }
-
- return false;
- }
- else {
- col.add(evt);
-
- return true;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected int addAllInternal(Collection<E> evts, Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
- if (set != null) {
- int cnt = 0;
-
- for (E evt : evts) {
- if (set.add(evt)) {
- col.add(evt);
-
- cnt++;
- }
- }
-
- return cnt;
- }
- else {
- col.addAll(evts);
-
- return evts.size();
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override protected E pollInternal(Collection<E> col, Set<E> set) {
- assert col instanceof ConcurrentLinkedDeque8;
-
- E res = ((Queue<E>)col).poll();
-
- if (set != null && res != null)
- set.remove(res);
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected void consistencyCheck(Collection<E> col, Set<E> set, AtomicInteger size) {
- assert col.size() == size.get();
-
- if (set != null) {
- // Check no duplicates in collection.
-
- Collection<Object> vals = new HashSet<>();
-
- for (Object evt : col)
- assert vals.add(evt);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3299c52b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindowAdapter.java b/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindowAdapter.java
deleted file mode 100644
index 57110f7..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/streamer/window/StreamerBoundedSizeWindowAdapter.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/* @java.file.header */
-
-/* _________ _____ __________________ _____
- * __ ____/___________(_)______ /__ ____/______ ____(_)_______
- * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
- * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
- * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
- */
-
-package org.gridgain.grid.streamer.window;
-
-import org.gridgain.grid.*;
-import org.gridgain.grid.kernal.processors.streamer.*;
-import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-/**
- * Abstract non-public class for size-bound windows. Support reset.
- */
-abstract class StreamerBoundedSizeWindowAdapter<E, T> extends StreamerWindowAdapter<E> {
- /** Reference. */
- private AtomicReference<WindowHolder> ref = new AtomicReference<>();
-
- /** If true, only unique elements will be accepted. */
- private boolean unique;
-
- /** Window maximum size. */
- protected int maxSize;
-
- /**
- * Gets window maximum size.
- *
- * @return Maximum size.
- */
- public int getMaximumSize() {
- return maxSize;
- }
-
- /**
- * Sets window maximum size.
- *
- * @param maxSize Maximum size.
- */
- public void setMaximumSize(int maxSize) {
- this.maxSize = maxSize;
- }
-
- /**
- * @return True if only unique elements will be accepted.
- */
- public boolean isUnique() {
- return unique;
- }
-
- /**
- * @param unique If true, only unique elements will be accepted.
- */
- public void setUnique(boolean unique) {
- this.unique = unique;
- }
-
- /** {@inheritDoc} */
- @Override public void checkConfiguration() throws GridException {
- if (maxSize < 0)
- throw new GridException("Failed to initialize window (maximumSize cannot be negative) " +
- "[windowClass=" + getClass().getSimpleName() +
- ", maxSize=" + maxSize +
- ", unique=" + unique + ']');
- }
-
- /** {@inheritDoc} */
- @Override protected void stop0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- int size = ref.get().size().get();
-
- return size > 0 ? size : 0;
- }
-
- /** {@inheritDoc} */
- @Override public int evictionQueueSize() {
- int evictSize = size() - maxSize;
-
- return evictSize > 0 ? evictSize : 0;
- }
-
- /** {@inheritDoc} */
- @Override protected boolean enqueue0(E evt) {
- add(evt);
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvicted0(int cnt) {
- Collection<E> res = new ArrayList<>(cnt);
-
- for (int i = 0; i < cnt; i++) {
- E evicted = pollEvictedInternal();
-
- if (evicted == null)
- return res;
-
- res.add(evicted);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> pollEvictedBatch0() {
- E res = pollEvictedInternal();
-
- if (res == null)
- return Collections.emptyList();
-
- return Collections.singleton(res);
- }
-
- /**
- * Poll evicted internal implementation.
- *
- * @return Evicted element.
- */
- @Nullable private E pollEvictedInternal() {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > maxSize) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- E evt = pollInternal(tup.collection(), tup.set());
-
- if (evt != null)
- return evt;
- else {
- // No actual events in queue, it means that other thread is just adding event.
- // return null as it is a concurrent add call.
- size.incrementAndGet();
-
- return null;
- }
- }
- }
- else
- return null;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Collection<E> dequeue0(int cnt) {
- WindowHolder tup = ref.get();
-
- AtomicInteger size = tup.size();
- Collection<T> evts = tup.collection();
-
- Collection<E> resCol = new ArrayList<>(cnt);
-
- while (true) {
- int curSize = size.get();
-
- if (curSize > 0) {
- if (size.compareAndSet(curSize, curSize - 1)) {
- E res = pollInternal(evts, tup.set());
-
- if (res != null) {
- resCol.add(res);
-
- if (resCol.size() >= cnt)
- return resCol;
- }
- else {
- size.incrementAndGet();
-
- return resCol;
- }
- }
- }
- else
- return resCol;
- }
- }
-
- /** {@inheritDoc} */
- @Override protected GridStreamerWindowIterator<E> iterator0() {
- WindowHolder win = ref.get();
-
- return iteratorInternal(win.collection(), win.set(), win.size());
- }
-
- /** {@inheritDoc} */
- @Override protected void reset0() {
- ref.set(new WindowHolder(newCollection(),
- unique ? new GridConcurrentHashSet<E>() : null,
- new AtomicInteger()));
- }
-
- /**
- * @param evt Event to add.
- */
- private void add(E evt) {
- WindowHolder tup = ref.get();
-
- if (addInternal(evt, tup.collection(), tup.set()))
- tup.size().incrementAndGet();
- }
-
- /**
- * @param evts Events to add.
- */
- private void addAll(Collection<E> evts) {
- WindowHolder tup = ref.get();
-
- int cnt = addAllInternal(evts, tup.collection(), tup.set());
-
- tup.size().addAndGet(cnt);
- }
-
- /**
- * Checks window consistency. Used for testing.
- */
- void consistencyCheck() {
- WindowHolder win = ref.get();
-
- consistencyCheck(win.collection(), win.set(), win.size());
- }
-
- /**
- * Get underlying collection.
- *
- * @return Collection.
- */
- @SuppressWarnings("ConstantConditions")
- protected Collection<T> collection() {
- return ref.get().get1();
- }
-
- /**
- * Creates new collection specific for window implementation. This collection will be subsequently passed
- * to addInternal(...) and pollInternal() methods.
- *
- * @return Collection - holder.
- */
- protected abstract Collection<T> newCollection();
-
- /**
- * Adds event to queue implementation.
- *
- * @param evt Event to add.
- * @param col Collection to add to.
- * @param set Set to check.
- * @return {@code True} if event was added.
- */
- protected abstract boolean addInternal(E evt, Collection<T> col, @Nullable Set<E> set);
-
- /**
- * Adds all events to queue implementation.
- *
- * @param evts Events to add.
- * @param col Collection to add to.
- * @param set Set to check.
- * @return Added events number.
- */
- protected abstract int addAllInternal(Collection<E> evts, Collection<T> col, @Nullable Set<E> set);
-
- /**
- * @param col Collection to add to.
- * @param set Set to check.
- * @return Polled object.
- */
- @Nullable protected abstract E pollInternal(Collection<T> col, @Nullable Set<E> set);
-
- /**
- * Creates iterator based on implementation collection type.
- *
- * @param col Collection.
- * @param set Set to check.
- * @param size Size.
- * @return Iterator.
- */
- protected abstract GridStreamerWindowIterator<E> iteratorInternal(Collection<T> col, @Nullable Set<E> set,
- AtomicInteger size);
-
- /**
- * Checks consistency. Used in tests.
- *
- * @param col Collection.
- * @param set Set if unique.
- * @param size Size holder.
- */
- protected abstract void consistencyCheck(Collection<T> col, Set<E> set, AtomicInteger size);
-
- /**
- * Window holder.
- */
- @SuppressWarnings("ConstantConditions")
- private class WindowHolder extends GridTuple3<Collection<T>, Set<E>, AtomicInteger> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Empty constructor required by {@link Externalizable}.
- */
- public WindowHolder() {
- // No-op.
- }
-
- /**
- * @param col Collection.
- * @param set Set if unique.
- * @param size Window size counter.
- */
- WindowHolder(@Nullable Collection<T> col, @Nullable Set<E> set, @Nullable AtomicInteger size) {
- super(col, set, size);
- }
-
- /**
- * @return Collection.
- */
- public Collection<T> collection() {
- return get1();
- }
-
- /**
- * @return Set.
- */
- public Set<E> set() {
- return get2();
- }
-
- /**
- * @return Size.
- */
- public AtomicInteger size() {
- return get3();
- }
- }
-}