You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:48 UTC
[27/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
index bf8cf0d..221bc39 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
@@ -18,18 +18,28 @@
package org.apache.ignite.cache.eviction.fifo;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+
import org.jsr166.*;
import org.jsr166.ConcurrentLinkedDeque8.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
* Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction.
* <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
* {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
* <p>
* This implementation is very efficient since it does not create any additional
@@ -41,11 +51,17 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
/** Batch size. */
private volatile int batchSize = 1;
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
+
/** FIFO queue. */
private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
new ConcurrentLinkedDeque8<>();
@@ -63,7 +79,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
public FifoEvictionPolicy(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -75,7 +91,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param batchSize Batch size.
*/
public FifoEvictionPolicy(int max, int batchSize) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
A.ensure(batchSize > 0, "batchSize > 0");
this.max = max;
@@ -97,7 +113,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -119,6 +135,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
return queue.size();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
* Gets read-only view on internal {@code FIFO} queue in proper order.
*
@@ -141,8 +174,11 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
else {
Node<EvictableEntry<K, V>> node = entry.removeMeta();
- if (node != null)
+ if (node != null) {
queue.unlinkx(node);
+
+ memSize.add(-entry.size());
+ }
}
}
@@ -173,11 +209,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(node))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
@@ -189,38 +232,74 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* Shrinks FIFO queue to maximum allowed size.
*/
private void shrink() {
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
+
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
+
+ if (size == -1)
+ break;
+
+ i += size;
+ }
+ }
+
int max = this.max;
- int batchSize = this.batchSize;
+ if (max > 0) {
+ int startSize = queue.sizex();
+
+ // Shrink only if queue is full.
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+ for (int i = max; i < startSize && queue.sizex() > max; i++)
+ if (shrink0() == -1)
+ break;
+ }
+ }
- int startSize = queue.sizex();
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ EvictableEntry<K, V> entry = queue.poll();
- // Shrink only if queue is full.
- if (startSize >= max + batchSize) {
- for (int i = max; i < startSize && queue.sizex() > max; i++) {
- EvictableEntry<K, V> entry = queue.poll();
+ if (entry == null)
+ return -1;
- if (entry == null)
- break;
+ int size = 0;
- Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+ Node<EvictableEntry<K, V>> meta = entry.removeMeta();
- if (meta != null && !entry.evict())
- touch(entry);
- }
+ if (meta != null) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
+ touch(entry);
}
+
+ return size;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
batchSize = in.readInt();
+ maxMemSize = in.readLong();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
index 63a413e..793aa66 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface FifoEvictionPolicyMBean {
*/
@MXBeanDescription("Current FIFO queue size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current queue size in bytes.
+ *
+ * @return current queue size in bytes.
+ */
+ @MXBeanDescription("Current FIFO queue size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
index 309d577..0be26c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
@@ -18,26 +18,48 @@
package org.apache.ignite.cache.eviction.lru;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+
import org.jsr166.*;
import org.jsr166.ConcurrentLinkedDeque8.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
- * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This
- * implementation is very efficient since it is lock-free and does not
- * create any additional table-like data structures. The {@code LRU} ordering
- * information is maintained by attaching ordering metadata to cache entries.
+ * Eviction policy based on {@code Least Recently Used (LRU)} algorithm and supports batch eviction.
+ * <p>
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
+ * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
+
+ * This implementation is very efficient since it is lock-free and does not create any additional table-like
+ * data structures. The {@code LRU} ordering information is maintained by attaching ordering metadata to cache entries.
*/
public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictionPolicyMBean, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
+
+ /** Batch size. */
+ private volatile int batchSize = 1;
+
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
/** Queue. */
private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
@@ -56,7 +78,7 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
public LruEvictionPolicy(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -76,16 +98,45 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
/** {@inheritDoc} */
+ @Override public int getBatchSize() {
+ return batchSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBatchSize(int batchSize) {
+ A.ensure(batchSize > 0, "batchSize > 0");
+
+ this.batchSize = batchSize;
+ }
+
+ /** {@inheritDoc} */
@Override public int getCurrentSize() {
return queue.size();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
* Gets read-only view on internal {@code FIFO} queue in proper order.
*
@@ -107,8 +158,11 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
else {
Node<EvictableEntry<K, V>> node = entry.removeMeta();
- if (node != null)
+ if (node != null) {
queue.unlinkx(node);
+
+ memSize.add(-entry.size());
+ }
}
}
@@ -139,11 +193,18 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(node))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
else if (queue.unlinkx(node)) {
@@ -163,31 +224,73 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* Shrinks queue to maximum allowed size.
*/
private void shrink() {
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
+
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
+
+ if (size == -1)
+ break;
+
+ i += size;
+ }
+ }
+
int max = this.max;
- int startSize = queue.sizex();
+ if (max > 0) {
+ int startSize = queue.sizex();
- for (int i = 0; i < startSize && queue.sizex() > max; i++) {
- EvictableEntry<K, V> entry = queue.poll();
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+ for (int i = max; i < startSize && queue.sizex() > max; i++)
+ if (shrink0() == -1)
+ break;
+ }
+ }
- if (entry == null)
- break;
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ EvictableEntry<K, V> entry = queue.poll();
- Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+ if (entry == null)
+ return -1;
- if (meta != null && !entry.evict())
+ int size = 0;
+
+ Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+
+ if (meta != null) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
touch(entry);
}
+
+ return size;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
+ out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
+ batchSize = in.readInt();
+ maxMemSize = in.readLong();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
index c243374..e17c057 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
@@ -41,10 +41,48 @@ public interface LruEvictionPolicyMBean {
public void setMaxSize(int max);
/**
+ * Gets batch size.
+ *
+ * @return batch size.
+ */
+ @MXBeanDescription("Batch size.")
+ public int getBatchSize();
+
+ /**
+ * Sets batch size.
+ *
+ * @param batchSize Batch size.
+ */
+ @MXBeanDescription("Set batch size.")
+ public void setBatchSize(int batchSize);
+
+ /**
* Gets current queue size.
*
* @return Current queue size.
*/
@MXBeanDescription("Current queue size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current queue size in bytes.
+ *
+ * @return current queue size in bytes.
+ */
+ @MXBeanDescription("Current queue size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
index c88b31d..00a912f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
@@ -18,20 +18,22 @@
package org.apache.ignite.cache.eviction.random;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import javax.cache.*;
import java.io.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
* Cache eviction policy which will select random cache entry for eviction if cache
* size exceeds the {@link #getMaxSize()} parameter. This implementation is
* extremely light weight, lock-free, and does not create any data structures to maintain
* any order for eviction.
* <p>
- * Random eviction will provide the best performance over any key set in which every
+ * Random eviction will provide the best performance over any key queue in which every
* key has the same probability of being accessed.
*/
public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomEvictionPolicyMBean, Externalizable {
@@ -39,7 +41,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
/**
* Constructs random eviction policy with all defaults.
@@ -87,7 +89,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class);
- int size = cache.size();
+ int size = cache.localSize(CachePeekMode.ONHEAP);
for (int i = max; i < size; i++) {
Cache.Entry<K, V> e = cache.randomEntry();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
index 7965c97..b8b82fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
@@ -34,7 +34,15 @@ import static org.apache.ignite.configuration.CacheConfiguration.*;
/**
* Cache eviction policy which will select the minimum cache entry for eviction.
* <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
* {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
* <p>
* Entries comparison based on {@link Comparator} instance if provided.
@@ -48,18 +56,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max;
+ private volatile int max = DFLT_CACHE_SIZE;
/** Batch size. */
private volatile int batchSize = 1;
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
+
/** Comparator. */
private Comparator<Holder<K, V>> comp;
/** Order. */
private final AtomicLong orderCnt = new AtomicLong();
- /** Backed sorted set. */
+ /** Backed sorted queue. */
private final GridConcurrentSkipListSetEx<K, V> set;
/**
@@ -96,7 +110,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
* @param comp Entries comparator.
*/
public SortedEvictionPolicy(int max, int batchSize, @Nullable Comparator<EvictableEntry<K, V>> comp) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
A.ensure(batchSize > 0, "batchSize > 0");
this.max = max;
@@ -106,6 +120,16 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
/**
+ * Constructs sorted eviction policy with given maximum size and given entry comparator.
+ *
+ * @param comp Entries comparator.
+ */
+ public SortedEvictionPolicy(@Nullable Comparator<EvictableEntry<K, V>> comp) {
+ this.comp = comp == null ? new DefaultHolderComparator<K, V>() : new HolderComparator<>(comp);
+ this.set = new GridConcurrentSkipListSetEx<>(this.comp);
+ }
+
+ /**
* Gets maximum allowed size of cache before entry will start getting evicted.
*
* @return Maximum allowed size of cache before entry will start getting evicted.
@@ -120,7 +144,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -142,12 +166,29 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
return set.sizex();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
- * Gets read-only view of backed set in proper order.
+ * Gets read-only view of backed queue in proper order.
*
- * @return Read-only view of backed set.
+ * @return Read-only view of backed queue.
*/
- public Collection<EvictableEntry<K, V>> set() {
+ public Collection<EvictableEntry<K, V>> queue() {
Set<EvictableEntry<K, V>> cp = new LinkedHashSet<>();
for (Holder<K, V> holder : set)
@@ -168,19 +209,22 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
else {
Holder<K, V> holder = entry.removeMeta();
- if (holder != null)
+ if (holder != null) {
removeHolder(holder);
+
+ memSize.add(-entry.size());
+ }
}
}
/**
* @param entry Entry to touch.
- * @return {@code True} if backed set has been changed by this call.
+ * @return {@code True} if backed queue has been changed by this call.
*/
private boolean touch(EvictableEntry<K, V> entry) {
Holder<K, V> holder = entry.meta();
- // Entry has not been add yet to backed set..
+ // Entry has not been add yet to backed queue..
if (holder == null) {
while (true) {
holder = new Holder<>(entry, orderCnt.incrementAndGet());
@@ -188,7 +232,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
set.add(holder);
if (entry.putMetaIfAbsent(holder) != null) {
- // Was concurrently added, need to remove it from set.
+ // Was concurrently added, need to remove it from queue.
removeHolder(holder);
// Set has not been changed.
@@ -196,17 +240,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
else if (holder.order > 0) {
if (!entry.isCached()) {
- // Was concurrently evicted, need to remove it from set.
+ // Was concurrently evicted, need to remove it from queue.
removeHolder(holder);
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If holder was removed by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(holder))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
@@ -215,34 +266,71 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
/**
- * Shrinks backed set to maximum allowed size.
+ * Shrinks backed queue to maximum allowed size.
*/
private void shrink() {
- int max = this.max;
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
- int batchSize = this.batchSize;
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
- int startSize = set.sizex();
+ if (size == -1)
+ break;
- if (startSize >= max + batchSize) {
- for (int i = max; i < startSize && set.sizex() > max; i++) {
- Holder<K, V> h = set.pollFirst();
+ i += size;
+ }
+ }
- if (h == null)
- break;
+ int max = this.max;
- EvictableEntry<K, V> entry = h.entry;
+ if (max > 0) {
+ int startSize = set.sizex();
- if (h.order > 0 && entry.removeMeta(h) && !entry.evict())
- touch(entry);
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) {
+ for (int i = max; i < startSize && set.sizex() > max; i++) {
+ if (shrink0() == -1)
+ break;
+ }
}
}
}
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ Holder<K, V> h = set.pollFirst();
+
+ if (h == null)
+ return -1;
+
+ int size = 0;
+
+ EvictableEntry<K, V> entry = h.entry;
+
+ if (h.order > 0 && entry.removeMeta(h)) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
+ touch(entry);
+ }
+
+ return size;
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
out.writeObject(comp);
}
@@ -251,11 +339,12 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
batchSize = in.readInt();
+ maxMemSize = in.readLong();
comp = (Comparator<Holder<K, V>>)in.readObject();
}
/**
- * Removes holder from backed set and marks holder as removed.
+ * Removes holder from backed queue and marks holder as removed.
*
* @param holder Holder.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
index bc696ff..7283453 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface SortedEvictionPolicyMBean {
*/
@MXBeanDescription("Current sorted key set size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current sorted entries queue size in bytes.
+ *
+ * @return current sorted entries queue size in bytes.
+ */
+ @MXBeanDescription("Current sorted entries set size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
index e66b32d..ef8fc49 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
@@ -20,9 +20,9 @@ package org.apache.ignite.cache.query;
import org.apache.ignite.internal.processors.cache.query.*;
/**
- * Cache query metrics used to obtain statistics on query. You can get metrics for
- * particular query via {@link CacheQuery#metrics()} method or accumulated metrics
- * for all queries via {@link GridCacheQueryManager#metrics()}.
+ * Cache query metrics used to obtain statistics on query. Metrics for particular query
+ * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries
+ * via {@link CacheQuery#metrics()}.
*/
public interface QueryMetrics {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index d018298..5bfdda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
* @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases
* may bring cache transaction into {@link TransactionState#UNKNOWN} which will
* consequently cause all transacted entries to be invalidated.
+ * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details).
*/
+ @Deprecated
public void sessionEnd(boolean commit) throws CacheWriterException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
index 640d4a3..329e994 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.store;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -52,6 +53,27 @@ public interface CacheStoreSession {
public boolean isWithinTransaction();
/**
+ * Attaches the given object to this session.
+ * <p>
+ * An attached object may later be retrieved via the {@link #attachment()}
+ * method. Invoking this method causes any previous attachment to be
+ * discarded. To attach additional objects use {@link #properties()} map.
+ * <p>
+ * The current attachment may be discarded by attaching {@code null}.
+ *
+ * @param attachment The object to be attached (or {@code null} to discard current attachment).
+ * @return Previously attached object, if any.
+ */
+ @Nullable public <T> T attach(@Nullable Object attachment);
+
+ /**
+ * Retrieves the current attachment or {@code null} if there is no attachment.
+ *
+ * @return Currently attached object, if any.
+ */
+ @Nullable public <T> T attachment();
+
+ /**
* Gets current session properties. You can add properties directly to the
* returned map.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
new file mode 100644
index 0000000..1543bf9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.sql.*;
+
+/**
+ * Cache store session listener that allows to implement callbacks
+ * for session lifecycle.
+ * <p>
+ * The most common use case for session listeners is database
+ * connection and transaction management. Store can be invoked one
+ * or several times during one session, depending on whether it's
+ * executed within cache transaction or not. In any case, you have
+ * to create a connection when session is started and commit it or
+ * rollback when session is finished.
+ * <p>
+ * Cache store session listener allows to implement this and other
+ * scenarios providing to callback methods:
+ * <ul>
+ * <li>
+ * {@link #onSessionStart(CacheStoreSession)} - called
+ * before any store operation within a session is invoked.
+ * </li>
+ * <li>
+ * {@link #onSessionEnd(CacheStoreSession, boolean)} - called
+ * after all operations within a session are invoked.
+ * </li>
+ * </ul>
+ * <h2>Implementations</h2>
+ * Ignites provides several out-of-the-box implementations
+ * of session listener (refer to individual JavaDocs for more
+ * details):
+ * <ul>
+ * <li>
+ * {@link CacheJdbcStoreSessionListener} - JDBC-based session
+ * listener. For each session it gets a new JDBC connection from
+ * provided {@link DataSource} and commits (or rolls back) it
+ * when session ends.
+ * </li>
+ * <li>
+ * {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} -
+ * session listener based on Spring transaction management.
+ * It starts a new DB transaction for each session and commits
+ * (or rolls back) it when session ends. If there is no ongoing
+ * cache transaction, this listener is no-op.
+ * </li>
+ * <li>
+ * {@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} -
+ * Hibernate-based session listener. It creates a new Hibernate
+ * session for each Ignite session. If there is an ongoing cache
+ * transaction, a corresponding Hibernate transaction is created
+ * as well.
+ * </li>
+ * </ul>
+ * <h2>Configuration</h2>
+ * There are two ways to configure a session listener:
+ * <ul>
+ * <li>
+ * Provide a global listener for all caches via
+ * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property. This will we called for any store
+ * session, not depending on what caches participate in
+ * transaction.
+ * </li>
+ * <li>
+ * Provide a listener for a particular cache via
+ * {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property. This will be called only if the
+ * cache participates in transaction.
+ * </li>
+ * </ul>
+ * For example, here is how global {@link CacheJdbcStoreSessionListener}
+ * can be configured in Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ * ...
+ *
+ * <property name="CacheStoreSessionListenerFactories">
+ * <list>
+ * <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory">
+ * <constructor-arg>
+ * <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener">
+ * <!-- Inject external data source. -->
+ * <property name="dataSource" ref="jdbc-data-source"/>
+ * </bean>
+ * </constructor-arg>
+ * </bean>
+ * </list>
+ * </property>
+ * </bean>
+ * </pre>
+ */
+public interface CacheStoreSessionListener {
+ /**
+ * On session start callback.
+ * <p>
+ * Called before any store operation within a session is invoked.
+ *
+ * @param ses Current session.
+ */
+ public void onSessionStart(CacheStoreSession ses);
+
+ /**
+ * On session end callback.
+ * <p>
+ * Called after all operations within a session are invoked.
+ *
+ * @param ses Current session.
+ * @param commit {@code True} if persistence store transaction
+ * should commit, {@code false} for rollback.
+ */
+ public void onSessionEnd(CacheStoreSession ses, boolean commit);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
new file mode 100644
index 0000000..a20e535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+
+/**
+ * Cache store session listener based on JDBC connection.
+ * <p>
+ * For each session this listener gets a new JDBC connection
+ * from provided {@link DataSource} and commits (or rolls
+ * back) it when session ends.
+ * <p>
+ * The connection is saved as a store session
+ * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the connection will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will be committed or rolled back only when
+ * session ends.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
+ * method can be implemented if {@link CacheJdbcStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ * @CacheStoreSessionResource
+ * private CacheStoreSession ses;
+ *
+ * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+ * // Get connection from the current session.
+ * Connection conn = ses.attachment();
+ *
+ * // Execute update SQL query.
+ * try {
+ * conn.createStatement().executeUpdate("...");
+ * }
+ * catch (SQLException e) {
+ * throw new CacheWriterException("Failed to update the store.", e);
+ * }
+ * }
+ * }
+ * </pre>
+ * JDBC connection will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ */
+public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /**
+ * Sets data source.
+ * <p>
+ * This is a required parameter. If data source is not set,
+ * exception will be thrown on startup.
+ *
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Gets data source.
+ *
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ if (ses.attachment() == null) {
+ try {
+ Connection conn = dataSrc.getConnection();
+
+ conn.setAutoCommit(false);
+
+ ses.attach(conn);
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ Connection conn = ses.attach(null);
+
+ if (conn != null) {
+ try {
+ if (commit)
+ conn.commit();
+ else
+ conn.rollback();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 9cb5d3d..85fd08a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -18,7 +18,9 @@
package org.apache.ignite.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -33,7 +35,7 @@ import java.util.*;
* You can use cluster node attributes to provide static information about a node.
* This information is initialized once within a cluster, during the node startup, and
* remains the same throughout the lifetime of a node. Use
- * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method to initialize your custom
+ * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom
* node attributes at startup. Here is an example of how to assign an attribute to a node at startup:
* <pre name="code" class="xml">
* <bean class="org.apache.ignite.configuration.IgniteConfiguration">
@@ -114,7 +116,7 @@ public interface ClusterNode {
/**
* Gets a node attribute. Attributes are assigned to nodes at startup
- * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+ * via {@link IgniteConfiguration#getUserAttributes()} method.
* <p>
* The system adds the following attributes automatically:
* <ul>
@@ -149,7 +151,7 @@ public interface ClusterNode {
/**
* Gets all node attributes. Attributes are assigned to nodes at startup
- * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+ * via {@link IgniteConfiguration#getUserAttributes()} method.
* <p>
* The system adds the following attributes automatically:
* <ul>
@@ -167,7 +169,7 @@ public interface ClusterNode {
/**
* Gets collection of addresses this node is known by.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
+ * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
* address for all communications and returned collection will contain only that address.
* If it is {@code null} then local wildcard address will be used, and Ignite
* will make the best effort to supply all addresses of that node in returned collection.
@@ -179,12 +181,12 @@ public interface ClusterNode {
/**
* Gets collection of host names this node is known by.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
+ * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
* the host name of that resolved address for all communications and
* returned collection will contain only that host name.
* If that host name can not be resolved then ip address returned by method {@link #addresses()} is used.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
+ * If {@link IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
* and this method returns host names of all addresses of that node.
*
* @return Collection of host names.
@@ -238,9 +240,17 @@ public interface ClusterNode {
public boolean isDaemon();
/**
- * Tests whether or not this node is a client node.
+ * Tests whether or not this node is connected to cluster as a client.
+ * <p>
+ * Do not confuse client in terms of
+ * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
+ * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
+ * while topology clients connect to topology in a different way.
*
* @return {@code True} if this node is a client node, {@code false} otherwise.
+ * @see IgniteConfiguration#isClientMode()
+ * @see Ignition#isClientMode()
+ * @see DiscoverySpi#isClientMode()
*/
public boolean isClient();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*;
import org.apache.ignite.cache.query.annotations.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.*;
@@ -145,9 +144,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default value for 'readFromBackup' flag. */
public static final boolean DFLT_READ_FROM_BACKUP = true;
- /** Filter that accepts only server nodes. */
- public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgniteServerNodePredicate();
-
/** Filter that accepts all nodes. */
public static final IgnitePredicate<ClusterNode> ALL_NODES = new IgniteAllNodesPredicate();
@@ -316,6 +312,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Cache topology validator. */
private TopologyValidator topValidator;
+ /** Cache store session listeners. */
+ private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs;
+
/** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() {
/* No-op. */
@@ -389,6 +388,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize();
startSize = cc.getStartSize();
storeFactory = cc.getCacheStoreFactory();
+ storeSesLsnrs = cc.getCacheStoreSessionListenerFactories();
swapEnabled = cc.isSwapEnabled();
tmLookupClsName = cc.getTransactionManagerLookupClassName();
topValidator = cc.getTopologyValidator();
@@ -1664,7 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- this.indexedTypes = indexedTypes;
+ if (indexedTypes != null) {
+ int len = indexedTypes.length;
+
+ Class<?>[] newIndexedTypes = new Class<?>[len];
+
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+ this.indexedTypes = newIndexedTypes;
+ }
+ else
+ this.indexedTypes = null;
return this;
}
@@ -1734,30 +1745,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheConfiguration.class, this);
+ /**
+ * Gets cache store session listener factories.
+ *
+ * @return Cache store session listener factories.
+ * @see CacheStoreSessionListener
+ */
+ public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+ return storeSesLsnrs;
}
/**
- * Filter that accepts only server nodes.
+ * Cache store session listener factories.
+ * <p>
+ * These listeners override global listeners provided in
+ * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property.
+ *
+ * @param storeSesLsnrs Cache store session listener factories.
+ * @return {@code this} for chaining.
+ * @see CacheStoreSessionListener
*/
- public static class IgniteServerNodePredicate implements IgnitePredicate<ClusterNode> {
- /** */
- private static final long serialVersionUID = 0L;
-
- @Override public boolean apply(ClusterNode n) {
- Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
-
- return attr != null && !attr;
- }
+ public CacheConfiguration setCacheStoreSessionListenerFactories(
+ Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) {
+ this.storeSesLsnrs = storeSesLsnrs;
- @Override public boolean equals(Object obj) {
- if (obj == null)
- return false;
+ return this;
+ }
- return obj.getClass().equals(this.getClass());
- }
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheConfiguration.class, this);
}
/**
@@ -1767,10 +1785,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** */
private static final long serialVersionUID = 0L;
+ /** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
return true;
}
+ /** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (obj == null)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ebe2b8e..2d36c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.ignite.configuration;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.events.*;
@@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*;
import org.apache.ignite.spi.swapspace.*;
import org.apache.ignite.spi.swapspace.file.*;
+import javax.cache.configuration.*;
import javax.cache.event.*;
import javax.cache.expiry.*;
import javax.cache.integration.*;
@@ -334,9 +336,6 @@ public class IgniteConfiguration {
/** Cache configurations. */
private CacheConfiguration[] cacheCfg;
- /** Client cache configurations. */
- private NearCacheConfiguration[] nearCacheCfg;
-
/** Client mode flag. */
private Boolean clientMode;
@@ -398,6 +397,9 @@ public class IgniteConfiguration {
/** User's class loader. */
private ClassLoader classLdr;
+ /** Cache store session listeners. */
+ private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
+
/**
* Creates valid grid configuration with all default values.
*/
@@ -478,6 +480,7 @@ public class IgniteConfiguration {
segResolvers = cfg.getSegmentationResolvers();
sndRetryCnt = cfg.getNetworkSendRetryCount();
sndRetryDelay = cfg.getNetworkSendRetryDelay();
+ storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
svcCfgs = cfg.getServiceConfiguration();
sysPoolSize = cfg.getSystemThreadPoolSize();
timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -1823,9 +1826,11 @@ public class IgniteConfiguration {
}
/**
- * Gets client mode flag.
+ * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use
+ * {@link DiscoverySpi} in client mode if this property is {@code true}.
*
* @return Client mode flag.
+ * @see TcpDiscoverySpi#setForceServerMode(boolean)
*/
public Boolean isClientMode() {
return clientMode;
@@ -2188,15 +2193,21 @@ public class IgniteConfiguration {
}
/**
+ * Gets plugin configurations.
+ *
* @return Plugin configurations.
+ * @see PluginProvider
*/
public PluginConfiguration[] getPluginConfigurations() {
return pluginCfgs;
}
/**
+ * Sets plugin configurations.
+ *
* @param pluginCfgs Plugin configurations.
* @return {@code this} for chaining.
+ * @see PluginProvider
*/
public IgniteConfiguration setPluginConfigurations(PluginConfiguration... pluginCfgs) {
this.pluginCfgs = pluginCfgs;
@@ -2242,6 +2253,35 @@ public class IgniteConfiguration {
return classLdr;
}
+ /**
+ * Gets cache store session listener factories.
+ *
+ * @return Cache store session listener factories.
+ * @see CacheStoreSessionListener
+ */
+ public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+ return storeSesLsnrs;
+ }
+
+ /**
+ * Cache store session listener factories.
+ * <p>
+ * These are global store session listeners, so they are applied to
+ * all caches. If you need to override listeners for a
+ * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property.
+ *
+ * @param storeSesLsnrs Cache store session listener factories.
+ * @return {@code this} for chaining.
+ * @see CacheStoreSessionListener
+ */
+ public IgniteConfiguration setCacheStoreSessionListenerFactories(
+ Factory<CacheStoreSessionListener>... storeSesLsnrs) {
+ this.storeSesLsnrs = storeSesLsnrs;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+ /** Thread local to hold the current user context. */
+ private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+ /**
+ * Executes given callable in the given user context.
+ * The main contract of this method is that {@link #currentUser()} method invoked
+ * inside closure always returns 'user' this callable executed with.
+ * @param user the user name to invoke closure on behalf of.
+ * @param clo the closure to execute
+ * @param <T> The type of closure result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clo.apply(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clo.apply();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+ * callable that throws checked Exception.
+ * The Exception is not ever wrapped anyhow.
+ * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+ * <pre name="code" class="java">
+ * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+ * try {
+ * return IgfsUserContext.doAs(user, new Callable<Foo>() {
+ * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+ * return makeSomeFoo(); // do the job
+ * }
+ * });
+ * }
+ * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+ * throw e;
+ * }
+ * catch (Exception e) {
+ * throw new AssertionError("Must never go there.");
+ * }
+ * }
+ * </pre>
+ * @param user the user name to invoke closure on behalf of.
+ * @param clbl the Callable to execute
+ * @param <T> The type of callable result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clbl.call(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clbl.call();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Gets the current context user.
+ * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+ * return null. Otherwise it will return the user name set in the most lower
+ * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+ * @return The current user, may be null.
+ */
+ @Nullable public static String currentUser() {
+ return userStackThreadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
* @return Map of properties.
*/
public Map<String,String> properties();
+
+
+ /**
+ * Closes the secondary file system.
+ * @throws IgniteException in case of an error.
+ */
+ public void close() throws IgniteException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
index 4d5d146..6da45ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
@@ -1247,6 +1247,20 @@ public class ClusterMetricsSnapshot implements ClusterMetrics {
/**
* Serializes node metrics into byte array.
*
+ * @param metrics Node metrics to serialize.
+ * @return New offset.
+ */
+ public static byte[] serialize(ClusterMetrics metrics) {
+ byte[] buf = new byte[METRICS_SIZE];
+
+ serialize(buf, 0, metrics);
+
+ return buf;
+ }
+
+ /**
+ * Serializes node metrics into byte array.
+ *
* @param data Byte array.
* @param off Offset into byte array.
* @param metrics Node metrics to serialize.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 505204d..f33fa39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -20,9 +20,9 @@ package org.apache.ignite.internal;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
+import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.interop.*;
import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.continuous.*;
@@ -131,41 +131,91 @@ class GridEventConsumeHandler implements GridContinuousHandler {
final boolean loc = nodeId.equals(ctx.localNodeId());
lsnr = new GridLocalEventListener() {
+ /** node ID, routine ID, event */
+ private final Queue<T3<UUID, UUID, Event>> notificationQueue = new LinkedList<>();
+
+ private boolean notificationInProgress;
+
@Override public void onEvent(Event evt) {
- if (filter == null || filter.apply(evt)) {
- if (loc) {
- if (!cb.apply(nodeId, evt))
- ctx.continuous().stopRoutine(routineId);
- }
- else {
- GridDiscoveryManager disco = ctx.discovery();
+ if (filter != null && !filter.apply(evt))
+ return;
+
+ if (loc) {
+ if (!cb.apply(nodeId, evt))
+ ctx.continuous().stopRoutine(routineId);
+ }
+ else {
+ if (ctx.discovery().node(nodeId) == null)
+ return;
+
+ synchronized (notificationQueue) {
+ notificationQueue.add(new T3<>(nodeId, routineId, evt));
+
+ if (!notificationInProgress) {
+ ctx.getSystemExecutorService().submit(new Runnable() {
+ @Override public void run() {
+ if (!ctx.continuous().lockStopping())
+ return;
- ClusterNode node = disco.node(nodeId);
+ try {
+ while (true) {
+ T3<UUID, UUID, Event> t3;
- if (node != null) {
- try {
- EventWrapper wrapper = new EventWrapper(evt);
+ synchronized (notificationQueue) {
+ t3 = notificationQueue.poll();
- if (evt instanceof CacheEvent) {
- String cacheName = ((CacheEvent)evt).cacheName();
+ if (t3 == null) {
+ notificationInProgress = false;
- if (ctx.config().isPeerClassLoadingEnabled() && disco.cacheNode(node, cacheName)) {
- wrapper.p2pMarshal(ctx.config().getMarshaller());
+ return;
+ }
+ }
- wrapper.cacheName = cacheName;
+ try {
+ Event evt = t3.get3();
- GridCacheDeploymentManager depMgr =
- ctx.cache().internalCache(cacheName).context().deploy();
+ EventWrapper wrapper = new EventWrapper(evt);
- depMgr.prepare(wrapper);
+ if (evt instanceof CacheEvent) {
+ String cacheName = ((CacheEvent)evt).cacheName();
+
+ ClusterNode node = ctx.discovery().node(t3.get1());
+
+ if (node == null)
+ continue;
+
+ if (ctx.config().isPeerClassLoadingEnabled()
+ && ctx.discovery().cacheNode(node, cacheName)) {
+ wrapper.p2pMarshal(ctx.config().getMarshaller());
+
+ wrapper.cacheName = cacheName;
+
+ GridCacheDeploymentManager depMgr = ctx.cache()
+ .internalCache(cacheName).context().deploy();
+
+ depMgr.prepare(wrapper);
+ }
+ }
+
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
+ false);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ // No-op.
+ }
+ catch (Throwable e) {
+ U.error(ctx.log(GridEventConsumeHandler.class),
+ "Failed to send event notification to node: " + nodeId, e);
+ }
+ }
+ }
+ finally {
+ ctx.continuous().unlockStopping();
}
}
+ });
- ctx.continuous().addNotification(nodeId, routineId, wrapper, null, false, false);
- }
- catch (IgniteCheckedException e) {
- U.error(ctx.log(getClass()), "Failed to send event notification to node: " + nodeId, e);
- }
+ notificationInProgress = true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ad7d562..d6542f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -552,4 +552,9 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Marshaller context.
*/
public MarshallerContextImpl marshallerContext();
+
+ /**
+ * @return {@code True} if local node is client node (has flag {@link IgniteConfiguration#isClientMode()} set).
+ */
+ public boolean clientNode();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 1ff483e..f921d49 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -894,6 +894,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public boolean clientNode() {
+ return cfg.isClientMode();
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridKernalContextImpl.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index c4b93b8..4f5e365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.job.*;
import org.apache.ignite.internal.processors.jobmetrics.*;
+import org.apache.ignite.internal.processors.nodevalidation.*;
import org.apache.ignite.internal.processors.offheap.*;
import org.apache.ignite.internal.processors.plugin.*;
import org.apache.ignite.internal.processors.port.*;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.security.*;
import org.apache.ignite.internal.processors.segmentation.*;
import org.apache.ignite.internal.processors.service.*;
import org.apache.ignite.internal.processors.session.*;
-import org.apache.ignite.internal.processors.nodevalidation.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
@@ -169,11 +169,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
/** */
@GridToStringExclude
- private Timer starveTimer;
+ private GridTimeoutProcessor.CancelableTask starveTask;
/** */
@GridToStringExclude
- private Timer metricsLogTimer;
+ private GridTimeoutProcessor.CancelableTask metricsLogTask;
/** Indicate error on grid stop. */
@GridToStringExclude
@@ -867,13 +867,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (starveCheck) {
final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
- starveTimer = new Timer("ignite-starvation-checker");
-
- starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+ starveTask = ctx.timeout().schedule(new Runnable() {
/** Last completed task count. */
private long lastCompletedCnt;
- @Override protected void safeRun() {
+ @Override public void run() {
if (!(execSvc instanceof ThreadPoolExecutor))
return;
@@ -896,13 +894,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
long metricsLogFreq = cfg.getMetricsLogFrequency();
if (metricsLogFreq > 0) {
- metricsLogTimer = new Timer("ignite-metrics-logger");
-
- metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
- /** */
+ metricsLogTask = ctx.timeout().schedule(new Runnable() {
private final DecimalFormat dblFmt = new DecimalFormat("#.##");
- @Override protected void safeRun() {
+ @Override public void run() {
if (log.isInfoEnabled()) {
ClusterMetrics m = cluster().localNode().metrics();
@@ -963,8 +958,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
sysPoolQSize = exec.getQueue().size();
}
+ String id = U.id8(localNode().id());
+
String msg = NL +
"Metrics for local node (to disable set 'metricsLogFrequency' to 0)" + NL +
+ " ^-- Node [id=" + id + ", name=" + name() + "]" + NL +
" ^-- H/N/C [hosts=" + hosts + ", nodes=" + nodes + ", CPUs=" + cpus + "]" + NL +
" ^-- CPU [cur=" + dblFmt.format(cpuLoadPct) + "%, avg=" +
dblFmt.format(avgCpuLoadPct) + "%, GC=" + dblFmt.format(gcPct) + "%]" + NL +
@@ -1165,6 +1163,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
add(ATTR_CLIENT_MODE, cfg.isClientMode());
+ add(ATTR_CONSISTENCY_CHECK_SKIPPED, getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK));
+
// Build a string from JVM arguments, because parameters with spaces are split.
SB jvmArgs = new SB(512);
@@ -1550,7 +1550,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
">>> Grid name: " + gridName + NL +
">>> Local node [" +
"ID=" + locNode.id().toString().toUpperCase() +
- ", order=" + locNode.order() +
+ ", order=" + locNode.order() + ", clientMode=" + ctx.clientNode() +
"]" + NL +
">>> Local node addresses: " + U.addressesAsString(locNode) + NL +
">>> Local ports: " + sb + NL;
@@ -1713,12 +1713,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (updateNtfTimer != null)
updateNtfTimer.cancel();
- if (starveTimer != null)
- starveTimer.cancel();
+ if (starveTask != null)
+ starveTask.close();
- // Cancel metrics log timer.
- if (metricsLogTimer != null)
- metricsLogTimer.cancel();
+ if (metricsLogTask != null)
+ metricsLogTask.close();
boolean interrupted = false;
@@ -2370,7 +2369,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
try {
ctx.cache().dynamicStartCache(null, cacheName, nearCfg, true).get();
- return ctx.cache().publicJCache(cacheName);
+ IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+ checkNearCacheStarted(cache);
+
+ return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2397,7 +2400,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ctx.cache().dynamicStartCache(null, cacheName, nearCfg, false).get();
}
- return ctx.cache().publicJCache(cacheName);
+ IgniteCacheProxy<K, V> cache = ctx.cache().publicJCache(cacheName);
+
+ checkNearCacheStarted(cache);
+
+ return cache;
}
catch (IgniteCheckedException e) {
throw CU.convertToCacheException(e);
@@ -2407,6 +2414,15 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
+ /**
+ * @param cache Cache.
+ */
+ private void checkNearCacheStarted(IgniteCacheProxy<?, ?> cache) {
+ if (!cache.context().isNear())
+ throw new IgniteException("Failed to start near cache " +
+ "(a cache with the same name without near cache is already started)");
+ }
+
/** {@inheritDoc} */
@Override public void destroyCache(String cacheName) {
guard();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
index 98cc3a7..928db5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java
@@ -126,9 +126,12 @@ public final class IgniteNodeAttributes {
/** Security subject for authenticated node. */
public static final String ATTR_SECURITY_SUBJECT = ATTR_PREFIX + ".security.subject";
- /** Cache interceptors. */
+ /** Client mode flag. */
public static final String ATTR_CLIENT_MODE = ATTR_PREFIX + ".cache.client";
+ /** Configuration consistency check disabled flag. */
+ public static final String ATTR_CONSISTENCY_CHECK_SKIPPED = ATTR_PREFIX + ".consistency.check.skipped";
+
/**
* Enforces singleton.
*/