You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 18:27:52 UTC
[29/31] incubator-ignite git commit: ignite-471-2: huge merge from
sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
index 0d87326..799aace 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java
@@ -30,21 +30,21 @@ public interface CacheMetrics {
/**
* The number of get requests that were satisfied by the cache.
*
- * @return the number of hits
+ * @return The number of hits.
*/
public long getCacheHits();
/**
* This is a measure of cache efficiency.
*
- * @return the percentage of successful hits, as a decimal e.g 75.
+ * @return The percentage of successful hits, as a decimal e.g 75.
*/
public float getCacheHitPercentage();
/**
* A miss is a get request that is not satisfied.
*
- * @return the number of misses
+ * @return The number of misses.
*/
public long getCacheMisses();
@@ -52,7 +52,7 @@ public interface CacheMetrics {
* Returns the percentage of cache accesses that did not find a requested entry
* in the cache.
*
- * @return the percentage of accesses that failed to find anything
+ * @return The percentage of accesses that failed to find anything.
*/
public float getCacheMissPercentage();
@@ -60,14 +60,14 @@ public interface CacheMetrics {
* The total number of requests to the cache. This will be equal to the sum of
* the hits and misses.
*
- * @return the number of gets
+ * @return The number of gets.
*/
public long getCacheGets();
/**
* The total number of puts to the cache.
*
- * @return the number of puts
+ * @return The number of puts.
*/
public long getCachePuts();
@@ -75,7 +75,7 @@ public interface CacheMetrics {
* The total number of removals from the cache. This does not include evictions,
* where the cache itself initiates the removal to make space.
*
- * @return the number of removals
+ * @return The number of removals.
*/
public long getCacheRemovals();
@@ -84,28 +84,28 @@ public interface CacheMetrics {
* initiated by the cache itself to free up space. An eviction is not treated as
* a removal and does not appear in the removal counts.
*
- * @return the number of evictions
+ * @return The number of evictions.
*/
public long getCacheEvictions();
/**
* The mean time to execute gets.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageGetTime();
/**
* The mean time to execute puts.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAveragePutTime();
/**
* The mean time to execute removes.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageRemoveTime();
@@ -113,7 +113,7 @@ public interface CacheMetrics {
/**
* The mean time to execute tx commit.
*
- * @return the time in µs
+ * @return The time in µs.
*/
public float getAverageTxCommitTime();
@@ -124,7 +124,6 @@ public interface CacheMetrics {
*/
public float getAverageTxRollbackTime();
-
/**
* Gets total number of transaction commits.
*
@@ -154,6 +153,62 @@ public interface CacheMetrics {
public long getOverflowSize();
/**
+ * The total number of get requests to the off-heap memory.
+ *
+ * @return The number of gets.
+ */
+ public long getOffHeapGets();
+
+ /**
+ * The total number of put requests to the off-heap memory.
+ *
+ * @return The number of puts.
+ */
+ public long getOffHeapPuts();
+
+ /**
+ * The total number of removals from the off-heap memory. This does not include evictions.
+ *
+ * @return The number of removals.
+ */
+ public long getOffHeapRemovals();
+
+ /**
+ * The total number of evictions from the off-heap memory.
+ *
+ * @return The number of evictions.
+ */
+ public long getOffHeapEvictions();
+
+ /**
+ * The number of get requests that were satisfied by the off-heap memory.
+ *
+ * @return The off-heap hits number.
+ */
+ public long getOffHeapHits();
+
+ /**
+ * Gets the percentage of hits on off-heap memory.
+ *
+ * @return The percentage of hits on off-heap memory.
+ */
+ public float getOffHeapHitPercentage();
+
+ /**
+ * A miss is a get request that is not satisfied by off-heap memory.
+ *
+ * @return The off-heap misses number.
+ */
+ public long getOffHeapMisses();
+
+ /**
+ * Gets the percentage of misses on off-heap memory.
+ *
+ * @return The percentage of misses on off-heap memory.
+ */
+ public float getOffHeapMissPercentage();
+
+ /**
* Gets number of entries stored in off-heap memory.
*
* @return Number of entries stored in off-heap memory.
@@ -161,6 +216,20 @@ public interface CacheMetrics {
public long getOffHeapEntriesCount();
/**
+ * Gets number of primary entries stored in off-heap memory.
+ *
+ * @return Number of primary entries stored in off-heap memory.
+ */
+ public long getOffHeapPrimaryEntriesCount();
+
+ /**
+ * Gets number of backup entries stored in off-heap memory.
+ *
+ * @return Number of backup entries stored in off-heap memory.
+ */
+ public long getOffHeapBackupEntriesCount();
+
+ /**
* Gets memory size allocated in off-heap.
*
* @return Memory size allocated in off-heap.
@@ -168,6 +237,76 @@ public interface CacheMetrics {
public long getOffHeapAllocatedSize();
/**
+ * Gets off-heap memory maximum size.
+ *
+ * @return Off-heap memory maximum size.
+ */
+ public long getOffHeapMaxSize();
+
+ /**
+ * The total number of get requests to the swap.
+ *
+ * @return The number of gets.
+ */
+ public long getSwapGets();
+
+ /**
+ * The total number of put requests to the swap.
+ *
+ * @return The number of puts.
+ */
+ public long getSwapPuts();
+
+ /**
+ * The total number of removals from the swap.
+ *
+ * @return The number of removals.
+ */
+ public long getSwapRemovals();
+
+ /**
+ * The number of get requests that were satisfied by the swap.
+ *
+ * @return The swap hits number.
+ */
+ public long getSwapHits();
+
+ /**
+ * A miss is a get request that is not satisfied by swap.
+ *
+ * @return The swap misses number.
+ */
+ public long getSwapMisses();
+
+ /**
+ * Gets number of entries stored in swap.
+ *
+ * @return Number of entries stored in swap.
+ */
+ public long getSwapEntriesCount();
+
+ /**
+ * Gets size of swap.
+ *
+ * @return Size of swap.
+ */
+ public long getSwapSize();
+
+ /**
+ * Gets the percentage of hits on swap.
+ *
+ * @return The percentage of hits on swap.
+ */
+ public float getSwapHitPercentage();
+
+ /**
+ * Gets the percentage of misses on swap.
+ *
+ * @return The percentage of misses on swap.
+ */
+ public float getSwapMissPercentage();
+
+ /**
* Gets number of non-{@code null} values in the cache.
*
* @return Number of non-{@code null} values in the cache.
@@ -184,7 +323,7 @@ public interface CacheMetrics {
/**
* Returns {@code true} if this cache is empty.
*
- * @return {@code true} if this cache is empty.
+ * @return {@code True} if this cache is empty.
*/
public boolean isEmpty();
@@ -294,7 +433,7 @@ public interface CacheMetrics {
public int getTxDhtRolledbackVersionsSize();
/**
- * Returns {@code True} if write-behind is enabled.
+ * Returns {@code true} if write-behind is enabled.
*
* @return {@code True} if write-behind is enabled.
*/
@@ -372,16 +511,16 @@ public interface CacheMetrics {
/**
* Determines the required type of keys for this {@link Cache}, if any.
*
- * @return the fully qualified class name of the key type,
- * or "java.lang.Object" if the type is undefined.
+ * @return The fully qualified class name of the key type,
+ * or {@code "java.lang.Object"} if the type is undefined.
*/
public String getKeyType();
/**
* Determines the required type of values for this {@link Cache}, if any.
*
- * @return the fully qualified class name of the value type,
- * or "java.lang.Object" if the type is undefined.
+ * @return The fully qualified class name of the value type,
+ * or {@code "java.lang.Object"} if the type is undefined.
*/
public String getValueType();
@@ -407,7 +546,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code true}.
*
- * @return true if the cache is store by value
+ * @return {@code True} if the cache is store by value.
*/
public boolean isStoreByValue();
@@ -416,7 +555,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}.
*
- * @return true if statistics collection is enabled
+ * @return {@code True} if statistics collection is enabled.
*/
public boolean isStatisticsEnabled();
@@ -425,7 +564,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}.
*
- * @return true if management is enabled
+ * @return {@code true} if management is enabled.
*/
public boolean isManagementEnabled();
@@ -434,7 +573,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}
*
- * @return {@code true} when a {@link Cache} is in
+ * @return {@code True} when a {@link Cache} is in
* "read-through" mode.
* @see CacheLoader
*/
@@ -448,7 +587,7 @@ public interface CacheMetrics {
* <p>
* The default value is {@code false}
*
- * @return {@code true} when a {@link Cache} is in "write-through" mode.
+ * @return {@code True} when a {@link Cache} is in "write-through" mode.
* @see CacheWriter
*/
public boolean isWriteThrough();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
index d87109f..9f1889a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictableEntry.java
@@ -46,6 +46,13 @@ public interface EvictableEntry<K, V> extends Cache.Entry<K, V> {
public boolean isCached();
/**
+ * Returns entry size in bytes.
+ *
+ * @return entry size in bytes.
+ */
+ public int size();
+
+ /**
* Gets metadata added by eviction policy.
*
* @return Metadata value or {@code null}.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
index f409e9b..07c269d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/EvictionPolicy.java
@@ -20,6 +20,7 @@ package org.apache.ignite.cache.eviction;
import org.apache.ignite.cache.eviction.fifo.*;
import org.apache.ignite.cache.eviction.lru.*;
import org.apache.ignite.cache.eviction.random.*;
+import org.apache.ignite.cache.eviction.sorted.*;
/**
* Pluggable cache eviction policy. Usually, implementations will internally order
@@ -32,6 +33,7 @@ import org.apache.ignite.cache.eviction.random.*;
* <li>{@link LruEvictionPolicy}</li>
* <li>{@link RandomEvictionPolicy}</li>
* <li>{@link FifoEvictionPolicy}</li>
+ * <li>{@link SortedEvictionPolicy}</li>
* </ul>
* <p>
* The eviction policy thread-safety is ensured by Ignition. Implementations of this interface should
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
index bf8cf0d..221bc39 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicy.java
@@ -18,18 +18,28 @@
package org.apache.ignite.cache.eviction.fifo;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+
import org.jsr166.*;
import org.jsr166.ConcurrentLinkedDeque8.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
* Eviction policy based on {@code First In First Out (FIFO)} algorithm and supports batch eviction.
* <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
* {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
* <p>
* This implementation is very efficient since it does not create any additional
@@ -41,11 +51,17 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
/** Batch size. */
private volatile int batchSize = 1;
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
+
/** FIFO queue. */
private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
new ConcurrentLinkedDeque8<>();
@@ -63,7 +79,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
public FifoEvictionPolicy(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -75,7 +91,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param batchSize Batch size.
*/
public FifoEvictionPolicy(int max, int batchSize) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
A.ensure(batchSize > 0, "batchSize > 0");
this.max = max;
@@ -97,7 +113,7 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -119,6 +135,23 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
return queue.size();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
* Gets read-only view on internal {@code FIFO} queue in proper order.
*
@@ -141,8 +174,11 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
else {
Node<EvictableEntry<K, V>> node = entry.removeMeta();
- if (node != null)
+ if (node != null) {
queue.unlinkx(node);
+
+ memSize.add(-entry.size());
+ }
}
}
@@ -173,11 +209,18 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(node))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
@@ -189,38 +232,74 @@ public class FifoEvictionPolicy<K, V> implements EvictionPolicy<K, V>, FifoEvict
* Shrinks FIFO queue to maximum allowed size.
*/
private void shrink() {
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
+
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
+
+ if (size == -1)
+ break;
+
+ i += size;
+ }
+ }
+
int max = this.max;
- int batchSize = this.batchSize;
+ if (max > 0) {
+ int startSize = queue.sizex();
+
+ // Shrink only if queue is full.
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+ for (int i = max; i < startSize && queue.sizex() > max; i++)
+ if (shrink0() == -1)
+ break;
+ }
+ }
- int startSize = queue.sizex();
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ EvictableEntry<K, V> entry = queue.poll();
- // Shrink only if queue is full.
- if (startSize >= max + batchSize) {
- for (int i = max; i < startSize && queue.sizex() > max; i++) {
- EvictableEntry<K, V> entry = queue.poll();
+ if (entry == null)
+ return -1;
- if (entry == null)
- break;
+ int size = 0;
- Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+ Node<EvictableEntry<K, V>> meta = entry.removeMeta();
- if (meta != null && !entry.evict())
- touch(entry);
- }
+ if (meta != null) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
+ touch(entry);
}
+
+ return size;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
batchSize = in.readInt();
+ maxMemSize = in.readLong();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
index 63a413e..793aa66 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/fifo/FifoEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface FifoEvictionPolicyMBean {
*/
@MXBeanDescription("Current FIFO queue size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current queue size in bytes.
+ *
+ * @return current queue size in bytes.
+ */
+ @MXBeanDescription("Current FIFO queue size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
index 309d577..0be26c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicy.java
@@ -18,26 +18,48 @@
package org.apache.ignite.cache.eviction.lru;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+
import org.jsr166.*;
import org.jsr166.ConcurrentLinkedDeque8.*;
import java.io.*;
import java.util.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
- * Eviction policy based on {@code Least Recently Used (LRU)} algorithm. This
- * implementation is very efficient since it is lock-free and does not
- * create any additional table-like data structures. The {@code LRU} ordering
- * information is maintained by attaching ordering metadata to cache entries.
+ * Eviction policy based on {@code Least Recently Used (LRU)} algorithm and supports batch eviction.
+ * <p>
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
+ * {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
+
+ * This implementation is very efficient since it is lock-free and does not create any additional table-like
+ * data structures. The {@code LRU} ordering information is maintained by attaching ordering metadata to cache entries.
*/
public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictionPolicyMBean, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
+
+ /** Batch size. */
+ private volatile int batchSize = 1;
+
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
/** Queue. */
private final ConcurrentLinkedDeque8<EvictableEntry<K, V>> queue =
@@ -56,7 +78,7 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
public LruEvictionPolicy(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -76,16 +98,45 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
/** {@inheritDoc} */
+ @Override public int getBatchSize() {
+ return batchSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setBatchSize(int batchSize) {
+ A.ensure(batchSize > 0, "batchSize > 0");
+
+ this.batchSize = batchSize;
+ }
+
+ /** {@inheritDoc} */
@Override public int getCurrentSize() {
return queue.size();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
* Gets read-only view on internal {@code FIFO} queue in proper order.
*
@@ -107,8 +158,11 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
else {
Node<EvictableEntry<K, V>> node = entry.removeMeta();
- if (node != null)
+ if (node != null) {
queue.unlinkx(node);
+
+ memSize.add(-entry.size());
+ }
}
}
@@ -139,11 +193,18 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If node was unlinked by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(node))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
else if (queue.unlinkx(node)) {
@@ -163,31 +224,73 @@ public class LruEvictionPolicy<K, V> implements EvictionPolicy<K, V>, LruEvictio
* Shrinks queue to maximum allowed size.
*/
private void shrink() {
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
+
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
+
+ if (size == -1)
+ break;
+
+ i += size;
+ }
+ }
+
int max = this.max;
- int startSize = queue.sizex();
+ if (max > 0) {
+ int startSize = queue.sizex();
- for (int i = 0; i < startSize && queue.sizex() > max; i++) {
- EvictableEntry<K, V> entry = queue.poll();
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize))
+ for (int i = max; i < startSize && queue.sizex() > max; i++)
+ if (shrink0() == -1)
+ break;
+ }
+ }
- if (entry == null)
- break;
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ EvictableEntry<K, V> entry = queue.poll();
- Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+ if (entry == null)
+ return -1;
- if (meta != null && !entry.evict())
+ int size = 0;
+
+ Node<EvictableEntry<K, V>> meta = entry.removeMeta();
+
+ if (meta != null) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
touch(entry);
}
+
+ return size;
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
+ out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
+ batchSize = in.readInt();
+ maxMemSize = in.readLong();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
index c243374..e17c057 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/lru/LruEvictionPolicyMBean.java
@@ -41,10 +41,48 @@ public interface LruEvictionPolicyMBean {
public void setMaxSize(int max);
/**
+ * Gets batch size.
+ *
+ * @return batch size.
+ */
+ @MXBeanDescription("Batch size.")
+ public int getBatchSize();
+
+ /**
+ * Sets batch size.
+ *
+ * @param batchSize Batch size.
+ */
+ @MXBeanDescription("Set batch size.")
+ public void setBatchSize(int batchSize);
+
+ /**
* Gets current queue size.
*
* @return Current queue size.
*/
@MXBeanDescription("Current queue size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current queue size in bytes.
+ *
+ * @return current queue size in bytes.
+ */
+ @MXBeanDescription("Current queue size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
index c88b31d..00a912f 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/random/RandomEvictionPolicy.java
@@ -18,20 +18,22 @@
package org.apache.ignite.cache.eviction.random;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cache.eviction.*;
-import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import javax.cache.*;
import java.io.*;
+import static org.apache.ignite.configuration.CacheConfiguration.*;
+
/**
* Cache eviction policy which will select random cache entry for eviction if cache
* size exceeds the {@link #getMaxSize()} parameter. This implementation is
* extremely light weight, lock-free, and does not create any data structures to maintain
* any order for eviction.
* <p>
- * Random eviction will provide the best performance over any key set in which every
+ * Random eviction will provide the best performance over any key queue in which every
* key has the same probability of being accessed.
*/
public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomEvictionPolicyMBean, Externalizable {
@@ -39,7 +41,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max = CacheConfiguration.DFLT_CACHE_SIZE;
+ private volatile int max = DFLT_CACHE_SIZE;
/**
* Constructs random eviction policy with all defaults.
@@ -87,7 +89,7 @@ public class RandomEvictionPolicy<K, V> implements EvictionPolicy<K, V>, RandomE
IgniteCache<K, V> cache = entry.unwrap(IgniteCache.class);
- int size = cache.size();
+ int size = cache.localSize(CachePeekMode.ONHEAP);
for (int i = max; i < size; i++) {
Cache.Entry<K, V> e = cache.randomEntry();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
index 7965c97..b8b82fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicy.java
@@ -34,7 +34,15 @@ import static org.apache.ignite.configuration.CacheConfiguration.*;
/**
* Cache eviction policy which will select the minimum cache entry for eviction.
* <p>
- * The eviction starts when the cache size becomes {@code batchSize} elements greater than the maximum size.
+ * The eviction starts in the following cases:
+ * <ul>
+ * <li>The cache size becomes {@code batchSize} elements greater than the maximum size.</li>
+ * <li>
+ * The size of cache entries in bytes becomes greater than the maximum memory size.
+ * The size of cache entry calculates as sum of key size and value size.
+ * </li>
+ * </ul>
+ * <b>Note:</b>Batch eviction is enabled only if maximum memory limit isn't set ({@code maxMemSize == 0}).
* {@code batchSize} elements will be evicted in this case. The default {@code batchSize} value is {@code 1}.
* <p>
* Entries comparison based on {@link Comparator} instance if provided.
@@ -48,18 +56,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
private static final long serialVersionUID = 0L;
/** Maximum size. */
- private volatile int max;
+ private volatile int max = DFLT_CACHE_SIZE;
/** Batch size. */
private volatile int batchSize = 1;
+ /** Max memory size. */
+ private volatile long maxMemSize;
+
+ /** Memory size. */
+ private final LongAdder8 memSize = new LongAdder8();
+
/** Comparator. */
private Comparator<Holder<K, V>> comp;
/** Order. */
private final AtomicLong orderCnt = new AtomicLong();
- /** Backed sorted set. */
+ /** Backed sorted queue. */
private final GridConcurrentSkipListSetEx<K, V> set;
/**
@@ -96,7 +110,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
* @param comp Entries comparator.
*/
public SortedEvictionPolicy(int max, int batchSize, @Nullable Comparator<EvictableEntry<K, V>> comp) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
A.ensure(batchSize > 0, "batchSize > 0");
this.max = max;
@@ -106,6 +120,16 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
/**
+ * Constructs sorted eviction policy with given maximum size and given entry comparator.
+ *
+ * @param comp Entries comparator.
+ */
+ public SortedEvictionPolicy(@Nullable Comparator<EvictableEntry<K, V>> comp) {
+ this.comp = comp == null ? new DefaultHolderComparator<K, V>() : new HolderComparator<>(comp);
+ this.set = new GridConcurrentSkipListSetEx<>(this.comp);
+ }
+
+ /**
* Gets maximum allowed size of cache before entry will start getting evicted.
*
* @return Maximum allowed size of cache before entry will start getting evicted.
@@ -120,7 +144,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
* @param max Maximum allowed size of cache before entry will start getting evicted.
*/
@Override public void setMaxSize(int max) {
- A.ensure(max > 0, "max > 0");
+ A.ensure(max >= 0, "max >= 0");
this.max = max;
}
@@ -142,12 +166,29 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
return set.sizex();
}
+ /** {@inheritDoc} */
+ @Override public long getMaxMemorySize() {
+ return maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMaxMemorySize(long maxMemSize) {
+ A.ensure(maxMemSize >= 0, "maxMemSize >= 0");
+
+ this.maxMemSize = maxMemSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getCurrentMemorySize() {
+ return memSize.longValue();
+ }
+
/**
- * Gets read-only view of backed set in proper order.
+ * Gets read-only view of backed queue in proper order.
*
- * @return Read-only view of backed set.
+ * @return Read-only view of backed queue.
*/
- public Collection<EvictableEntry<K, V>> set() {
+ public Collection<EvictableEntry<K, V>> queue() {
Set<EvictableEntry<K, V>> cp = new LinkedHashSet<>();
for (Holder<K, V> holder : set)
@@ -168,19 +209,22 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
else {
Holder<K, V> holder = entry.removeMeta();
- if (holder != null)
+ if (holder != null) {
removeHolder(holder);
+
+ memSize.add(-entry.size());
+ }
}
}
/**
* @param entry Entry to touch.
- * @return {@code True} if backed set has been changed by this call.
+ * @return {@code True} if backed queue has been changed by this call.
*/
private boolean touch(EvictableEntry<K, V> entry) {
Holder<K, V> holder = entry.meta();
- // Entry has not been add yet to backed set..
+ // Entry has not been add yet to backed queue..
if (holder == null) {
while (true) {
holder = new Holder<>(entry, orderCnt.incrementAndGet());
@@ -188,7 +232,7 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
set.add(holder);
if (entry.putMetaIfAbsent(holder) != null) {
- // Was concurrently added, need to remove it from set.
+ // Was concurrently added, need to remove it from queue.
removeHolder(holder);
// Set has not been changed.
@@ -196,17 +240,24 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
else if (holder.order > 0) {
if (!entry.isCached()) {
- // Was concurrently evicted, need to remove it from set.
+ // Was concurrently evicted, need to remove it from queue.
removeHolder(holder);
return false;
}
+ memSize.add(entry.size());
+
return true;
}
// If holder was removed by concurrent shrink() call, we must repeat the whole cycle.
else if (!entry.removeMeta(holder))
return false;
+ else {
+ memSize.add(-entry.size());
+
+ return true;
+ }
}
}
@@ -215,34 +266,71 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
}
/**
- * Shrinks backed set to maximum allowed size.
+ * Shrinks backed queue to maximum allowed size.
*/
private void shrink() {
- int max = this.max;
+ long maxMem = this.maxMemSize;
+
+ if (maxMem > 0) {
+ long startMemSize = memSize.longValue();
- int batchSize = this.batchSize;
+ if (startMemSize >= maxMem)
+ for (long i = maxMem; i < startMemSize && memSize.longValue() > maxMem;) {
+ int size = shrink0();
- int startSize = set.sizex();
+ if (size == -1)
+ break;
- if (startSize >= max + batchSize) {
- for (int i = max; i < startSize && set.sizex() > max; i++) {
- Holder<K, V> h = set.pollFirst();
+ i += size;
+ }
+ }
- if (h == null)
- break;
+ int max = this.max;
- EvictableEntry<K, V> entry = h.entry;
+ if (max > 0) {
+ int startSize = set.sizex();
- if (h.order > 0 && entry.removeMeta(h) && !entry.evict())
- touch(entry);
+ if (startSize >= max + (maxMem > 0 ? 1 : this.batchSize)) {
+ for (int i = max; i < startSize && set.sizex() > max; i++) {
+ if (shrink0() == -1)
+ break;
+ }
}
}
}
+ /**
+ * Tries to remove one item from queue.
+ *
+ * @return number of bytes that was free. {@code -1} if queue is empty.
+ */
+ private int shrink0() {
+ Holder<K, V> h = set.pollFirst();
+
+ if (h == null)
+ return -1;
+
+ int size = 0;
+
+ EvictableEntry<K, V> entry = h.entry;
+
+ if (h.order > 0 && entry.removeMeta(h)) {
+ size = entry.size();
+
+ memSize.add(-size);
+
+ if (!entry.evict())
+ touch(entry);
+ }
+
+ return size;
+ }
+
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(max);
out.writeInt(batchSize);
+ out.writeLong(maxMemSize);
out.writeObject(comp);
}
@@ -251,11 +339,12 @@ public class SortedEvictionPolicy<K, V> implements EvictionPolicy<K, V>, SortedE
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
max = in.readInt();
batchSize = in.readInt();
+ maxMemSize = in.readLong();
comp = (Comparator<Holder<K, V>>)in.readObject();
}
/**
- * Removes holder from backed set and marks holder as removed.
+ * Removes holder from backed queue and marks holder as removed.
*
* @param holder Holder.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
index bc696ff..7283453 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/eviction/sorted/SortedEvictionPolicyMBean.java
@@ -63,4 +63,26 @@ public interface SortedEvictionPolicyMBean {
*/
@MXBeanDescription("Current sorted key set size.")
public int getCurrentSize();
+
+ /**
+ * Gets maximum allowed cache size in bytes.
+ *
+ * @return maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Maximum allowed cache size in bytes.")
+ public long getMaxMemorySize();
+
+ /**
+ * Sets maximum allowed cache size in bytes.
+ */
+ @MXBeanDescription("Set maximum allowed cache size in bytes.")
+ public void setMaxMemorySize(long maxMemSize);
+
+ /**
+ * Gets current sorted entries queue size in bytes.
+ *
+ * @return current sorted entries queue size in bytes.
+ */
+ @MXBeanDescription("Current sorted entries set size in bytes.")
+ public long getCurrentMemorySize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
index e66b32d..ef8fc49 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/query/QueryMetrics.java
@@ -20,9 +20,9 @@ package org.apache.ignite.cache.query;
import org.apache.ignite.internal.processors.cache.query.*;
/**
- * Cache query metrics used to obtain statistics on query. You can get metrics for
- * particular query via {@link CacheQuery#metrics()} method or accumulated metrics
- * for all queries via {@link GridCacheQueryManager#metrics()}.
+ * Cache query metrics used to obtain statistics on query. Metrics for particular query
+ * can be get via {@link CacheQuery#metrics()} method or aggregated metrics for all queries
+ * via {@link CacheQuery#metrics()}.
*/
public interface QueryMetrics {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
index d018298..5bfdda1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStore.java
@@ -94,6 +94,8 @@ public interface CacheStore<K, V> extends CacheLoader<K, V>, CacheWriter<K, V> {
* @throws CacheWriterException If commit or rollback failed. Note that commit failure in some cases
* may bring cache transaction into {@link TransactionState#UNKNOWN} which will
* consequently cause all transacted entries to be invalidated.
+ * @deprecated Use {@link CacheStoreSessionListener} instead (refer to its JavaDoc for details).
*/
+ @Deprecated
public void sessionEnd(boolean commit) throws CacheWriterException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
index 640d4a3..329e994 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSession.java
@@ -19,6 +19,7 @@ package org.apache.ignite.cache.store;
import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
+import org.jetbrains.annotations.*;
import java.util.*;
@@ -52,6 +53,27 @@ public interface CacheStoreSession {
public boolean isWithinTransaction();
/**
+ * Attaches the given object to this session.
+ * <p>
+ * An attached object may later be retrieved via the {@link #attachment()}
+ * method. Invoking this method causes any previous attachment to be
+ * discarded. To attach additional objects use {@link #properties()} map.
+ * <p>
+ * The current attachment may be discarded by attaching {@code null}.
+ *
+ * @param attachment The object to be attached (or {@code null} to discard current attachment).
+ * @return Previously attached object, if any.
+ */
+ @Nullable public <T> T attach(@Nullable Object attachment);
+
+ /**
+ * Retrieves the current attachment or {@code null} if there is no attachment.
+ *
+ * @return Currently attached object, if any.
+ */
+ @Nullable public <T> T attachment();
+
+ /**
* Gets current session properties. You can add properties directly to the
* returned map.
*
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
new file mode 100644
index 0000000..1543bf9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheStoreSessionListener.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import org.apache.ignite.cache.store.jdbc.*;
+import org.apache.ignite.configuration.*;
+
+import javax.cache.configuration.*;
+import javax.sql.*;
+
+/**
+ * Cache store session listener that allows to implement callbacks
+ * for session lifecycle.
+ * <p>
+ * The most common use case for session listeners is database
+ * connection and transaction management. Store can be invoked one
+ * or several times during one session, depending on whether it's
+ * executed within cache transaction or not. In any case, you have
+ * to create a connection when session is started and commit it or
+ * rollback when session is finished.
+ * <p>
+ * Cache store session listener allows to implement this and other
+ * scenarios providing to callback methods:
+ * <ul>
+ * <li>
+ * {@link #onSessionStart(CacheStoreSession)} - called
+ * before any store operation within a session is invoked.
+ * </li>
+ * <li>
+ * {@link #onSessionEnd(CacheStoreSession, boolean)} - called
+ * after all operations within a session are invoked.
+ * </li>
+ * </ul>
+ * <h2>Implementations</h2>
+ * Ignites provides several out-of-the-box implementations
+ * of session listener (refer to individual JavaDocs for more
+ * details):
+ * <ul>
+ * <li>
+ * {@link CacheJdbcStoreSessionListener} - JDBC-based session
+ * listener. For each session it gets a new JDBC connection from
+ * provided {@link DataSource} and commits (or rolls back) it
+ * when session ends.
+ * </li>
+ * <li>
+ * {@ignitelink org.apache.ignite.cache.store.spring.CacheSpringStoreSessionListener} -
+ * session listener based on Spring transaction management.
+ * It starts a new DB transaction for each session and commits
+ * (or rolls back) it when session ends. If there is no ongoing
+ * cache transaction, this listener is no-op.
+ * </li>
+ * <li>
+ * {@ignitelink org.apache.ignite.cache.store.hibernate.CacheHibernateStoreSessionListener} -
+ * Hibernate-based session listener. It creates a new Hibernate
+ * session for each Ignite session. If there is an ongoing cache
+ * transaction, a corresponding Hibernate transaction is created
+ * as well.
+ * </li>
+ * </ul>
+ * <h2>Configuration</h2>
+ * There are two ways to configure a session listener:
+ * <ul>
+ * <li>
+ * Provide a global listener for all caches via
+ * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property. This will we called for any store
+ * session, not depending on what caches participate in
+ * transaction.
+ * </li>
+ * <li>
+ * Provide a listener for a particular cache via
+ * {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property. This will be called only if the
+ * cache participates in transaction.
+ * </li>
+ * </ul>
+ * For example, here is how global {@link CacheJdbcStoreSessionListener}
+ * can be configured in Spring XML configuration file:
+ * <pre name="code" class="xml">
+ * <bean class="org.apache.ignite.configuration.IgniteConfiguration">
+ * ...
+ *
+ * <property name="CacheStoreSessionListenerFactories">
+ * <list>
+ * <bean class="javax.cache.configuration.FactoryBuilder$SingletonFactory">
+ * <constructor-arg>
+ * <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListener">
+ * <!-- Inject external data source. -->
+ * <property name="dataSource" ref="jdbc-data-source"/>
+ * </bean>
+ * </constructor-arg>
+ * </bean>
+ * </list>
+ * </property>
+ * </bean>
+ * </pre>
+ */
+public interface CacheStoreSessionListener {
+ /**
+ * On session start callback.
+ * <p>
+ * Called before any store operation within a session is invoked.
+ *
+ * @param ses Current session.
+ */
+ public void onSessionStart(CacheStoreSession ses);
+
+ /**
+ * On session end callback.
+ * <p>
+ * Called after all operations within a session are invoked.
+ *
+ * @param ses Current session.
+ * @param commit {@code True} if persistence store transaction
+ * should commit, {@code false} for rollback.
+ */
+ public void onSessionEnd(CacheStoreSession ses, boolean commit);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
new file mode 100644
index 0000000..a20e535
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcStoreSessionListener.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.jdbc;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lifecycle.*;
+
+import javax.cache.*;
+import javax.cache.integration.*;
+import javax.sql.*;
+import java.sql.*;
+
+/**
+ * Cache store session listener based on JDBC connection.
+ * <p>
+ * For each session this listener gets a new JDBC connection
+ * from provided {@link DataSource} and commits (or rolls
+ * back) it when session ends.
+ * <p>
+ * The connection is saved as a store session
+ * {@link CacheStoreSession#attachment() attachment}.
+ * The listener guarantees that the connection will be
+ * available for any store operation. If there is an
+ * ongoing cache transaction, all operations within this
+ * transaction will be committed or rolled back only when
+ * session ends.
+ * <p>
+ * As an example, here is how the {@link CacheStore#write(Cache.Entry)}
+ * method can be implemented if {@link CacheJdbcStoreSessionListener}
+ * is configured:
+ * <pre name="code" class="java">
+ * private static class Store extends CacheStoreAdapter<Integer, Integer> {
+ * @CacheStoreSessionResource
+ * private CacheStoreSession ses;
+ *
+ * @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) throws CacheWriterException {
+ * // Get connection from the current session.
+ * Connection conn = ses.attachment();
+ *
+ * // Execute update SQL query.
+ * try {
+ * conn.createStatement().executeUpdate("...");
+ * }
+ * catch (SQLException e) {
+ * throw new CacheWriterException("Failed to update the store.", e);
+ * }
+ * }
+ * }
+ * </pre>
+ * JDBC connection will be automatically created by the listener
+ * at the start of the session and closed when it ends.
+ */
+public class CacheJdbcStoreSessionListener implements CacheStoreSessionListener, LifecycleAware {
+ /** Data source. */
+ private DataSource dataSrc;
+
+ /**
+ * Sets data source.
+ * <p>
+ * This is a required parameter. If data source is not set,
+ * exception will be thrown on startup.
+ *
+ * @param dataSrc Data source.
+ */
+ public void setDataSource(DataSource dataSrc) {
+ this.dataSrc = dataSrc;
+ }
+
+ /**
+ * Gets data source.
+ *
+ * @return Data source.
+ */
+ public DataSource getDataSource() {
+ return dataSrc;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteException {
+ if (dataSrc == null)
+ throw new IgniteException("Data source is required by " + getClass().getSimpleName() + '.');
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionStart(CacheStoreSession ses) {
+ if (ses.attachment() == null) {
+ try {
+ Connection conn = dataSrc.getConnection();
+
+ conn.setAutoCommit(false);
+
+ ses.attach(conn);
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionEnd(CacheStoreSession ses, boolean commit) {
+ Connection conn = ses.attach(null);
+
+ if (conn != null) {
+ try {
+ if (commit)
+ conn.commit();
+ else
+ conn.rollback();
+ }
+ catch (SQLException e) {
+ throw new CacheWriterException("Failed to start store session [tx=" + ses.transaction() + ']', e);
+ }
+ finally {
+ U.closeQuiet(conn);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
index 9cb5d3d..85fd08a 100644
--- a/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cluster/ClusterNode.java
@@ -18,7 +18,9 @@
package org.apache.ignite.cluster;
import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.*;
import org.jetbrains.annotations.*;
import java.util.*;
@@ -33,7 +35,7 @@ import java.util.*;
* You can use cluster node attributes to provide static information about a node.
* This information is initialized once within a cluster, during the node startup, and
* remains the same throughout the lifetime of a node. Use
- * {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method to initialize your custom
+ * {@link IgniteConfiguration#getUserAttributes()} method to initialize your custom
* node attributes at startup. Here is an example of how to assign an attribute to a node at startup:
* <pre name="code" class="xml">
* <bean class="org.apache.ignite.configuration.IgniteConfiguration">
@@ -114,7 +116,7 @@ public interface ClusterNode {
/**
* Gets a node attribute. Attributes are assigned to nodes at startup
- * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+ * via {@link IgniteConfiguration#getUserAttributes()} method.
* <p>
* The system adds the following attributes automatically:
* <ul>
@@ -149,7 +151,7 @@ public interface ClusterNode {
/**
* Gets all node attributes. Attributes are assigned to nodes at startup
- * via {@link org.apache.ignite.configuration.IgniteConfiguration#getUserAttributes()} method.
+ * via {@link IgniteConfiguration#getUserAttributes()} method.
* <p>
* The system adds the following attributes automatically:
* <ul>
@@ -167,7 +169,7 @@ public interface ClusterNode {
/**
* Gets collection of addresses this node is known by.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
+ * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use that
* address for all communications and returned collection will contain only that address.
* If it is {@code null} then local wildcard address will be used, and Ignite
* will make the best effort to supply all addresses of that node in returned collection.
@@ -179,12 +181,12 @@ public interface ClusterNode {
/**
* Gets collection of host names this node is known by.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
+ * If {@link IgniteConfiguration#getLocalHost()} value isn't {@code null} node will try to use
* the host name of that resolved address for all communications and
* returned collection will contain only that host name.
* If that host name can not be resolved then ip address returned by method {@link #addresses()} is used.
* <p>
- * If {@link org.apache.ignite.configuration.IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
+ * If {@link IgniteConfiguration#getLocalHost()} value is {@code null} then local wildcard address will be used,
* and this method returns host names of all addresses of that node.
*
* @return Collection of host names.
@@ -238,9 +240,17 @@ public interface ClusterNode {
public boolean isDaemon();
/**
- * Tests whether or not this node is a client node.
+ * Tests whether or not this node is connected to cluster as a client.
+ * <p>
+ * Do not confuse client in terms of
+ * discovery {@link DiscoverySpi#isClientMode()} and client in terms of cache
+ * {@link IgniteConfiguration#isClientMode()}. Cache clients cannot carry data,
+ * while topology clients connect to topology in a different way.
*
* @return {@code True} if this node is a client node, {@code false} otherwise.
+ * @see IgniteConfiguration#isClientMode()
+ * @see Ignition#isClientMode()
+ * @see DiscoverySpi#isClientMode()
*/
public boolean isClient();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index df6b2ee..1aa4fd6 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -25,7 +25,6 @@ import org.apache.ignite.cache.eviction.*;
import org.apache.ignite.cache.query.annotations.*;
import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.apache.ignite.plugin.*;
@@ -145,9 +144,6 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Default value for 'readFromBackup' flag. */
public static final boolean DFLT_READ_FROM_BACKUP = true;
- /** Filter that accepts only server nodes. */
- public static final IgnitePredicate<ClusterNode> SERVER_NODES = new IgniteServerNodePredicate();
-
/** Filter that accepts all nodes. */
public static final IgnitePredicate<ClusterNode> ALL_NODES = new IgniteAllNodesPredicate();
@@ -316,6 +312,9 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** Cache topology validator. */
private TopologyValidator topValidator;
+ /** Cache store session listeners. */
+ private Factory<? extends CacheStoreSessionListener>[] storeSesLsnrs;
+
/** Empty constructor (all values are initialized to their defaults). */
public CacheConfiguration() {
/* No-op. */
@@ -389,6 +388,7 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
sqlOnheapRowCacheSize = cc.getSqlOnheapRowCacheSize();
startSize = cc.getStartSize();
storeFactory = cc.getCacheStoreFactory();
+ storeSesLsnrs = cc.getCacheStoreSessionListenerFactories();
swapEnabled = cc.isSwapEnabled();
tmLookupClsName = cc.getTransactionManagerLookupClassName();
topValidator = cc.getTopologyValidator();
@@ -1664,7 +1664,18 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
A.ensure(indexedTypes == null || (indexedTypes.length & 1) == 0,
"Number of indexed types is expected to be even. Refer to method javadoc for details.");
- this.indexedTypes = indexedTypes;
+ if (indexedTypes != null) {
+ int len = indexedTypes.length;
+
+ Class<?>[] newIndexedTypes = new Class<?>[len];
+
+ for (int i = 0; i < len; i++)
+ newIndexedTypes[i] = U.box(indexedTypes[i]);
+
+ this.indexedTypes = newIndexedTypes;
+ }
+ else
+ this.indexedTypes = null;
return this;
}
@@ -1734,30 +1745,37 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(CacheConfiguration.class, this);
+ /**
+ * Gets cache store session listener factories.
+ *
+ * @return Cache store session listener factories.
+ * @see CacheStoreSessionListener
+ */
+ public Factory<? extends CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+ return storeSesLsnrs;
}
/**
- * Filter that accepts only server nodes.
+ * Cache store session listener factories.
+ * <p>
+ * These listeners override global listeners provided in
+ * {@link IgniteConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property.
+ *
+ * @param storeSesLsnrs Cache store session listener factories.
+ * @return {@code this} for chaining.
+ * @see CacheStoreSessionListener
*/
- public static class IgniteServerNodePredicate implements IgnitePredicate<ClusterNode> {
- /** */
- private static final long serialVersionUID = 0L;
-
- @Override public boolean apply(ClusterNode n) {
- Boolean attr = n.attribute(IgniteNodeAttributes.ATTR_CLIENT_MODE);
-
- return attr != null && !attr;
- }
+ public CacheConfiguration setCacheStoreSessionListenerFactories(
+ Factory<? extends CacheStoreSessionListener>... storeSesLsnrs) {
+ this.storeSesLsnrs = storeSesLsnrs;
- @Override public boolean equals(Object obj) {
- if (obj == null)
- return false;
+ return this;
+ }
- return obj.getClass().equals(this.getClass());
- }
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CacheConfiguration.class, this);
}
/**
@@ -1767,10 +1785,12 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
/** */
private static final long serialVersionUID = 0L;
+ /** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
return true;
}
+ /** {@inheritDoc} */
@Override public boolean equals(Object obj) {
if (obj == null)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index ebe2b8e..2d36c7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -18,6 +18,7 @@
package org.apache.ignite.configuration;
import org.apache.ignite.*;
+import org.apache.ignite.cache.store.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.events.*;
@@ -52,6 +53,7 @@ import org.apache.ignite.spi.loadbalancing.roundrobin.*;
import org.apache.ignite.spi.swapspace.*;
import org.apache.ignite.spi.swapspace.file.*;
+import javax.cache.configuration.*;
import javax.cache.event.*;
import javax.cache.expiry.*;
import javax.cache.integration.*;
@@ -334,9 +336,6 @@ public class IgniteConfiguration {
/** Cache configurations. */
private CacheConfiguration[] cacheCfg;
- /** Client cache configurations. */
- private NearCacheConfiguration[] nearCacheCfg;
-
/** Client mode flag. */
private Boolean clientMode;
@@ -398,6 +397,9 @@ public class IgniteConfiguration {
/** User's class loader. */
private ClassLoader classLdr;
+ /** Cache store session listeners. */
+ private Factory<CacheStoreSessionListener>[] storeSesLsnrs;
+
/**
* Creates valid grid configuration with all default values.
*/
@@ -478,6 +480,7 @@ public class IgniteConfiguration {
segResolvers = cfg.getSegmentationResolvers();
sndRetryCnt = cfg.getNetworkSendRetryCount();
sndRetryDelay = cfg.getNetworkSendRetryDelay();
+ storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
svcCfgs = cfg.getServiceConfiguration();
sysPoolSize = cfg.getSystemThreadPoolSize();
timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -1823,9 +1826,11 @@ public class IgniteConfiguration {
}
/**
- * Gets client mode flag.
+ * Gets client mode flag. Client node cannot hold data in the caches. It's recommended to use
+ * {@link DiscoverySpi} in client mode if this property is {@code true}.
*
* @return Client mode flag.
+ * @see TcpDiscoverySpi#setForceServerMode(boolean)
*/
public Boolean isClientMode() {
return clientMode;
@@ -2188,15 +2193,21 @@ public class IgniteConfiguration {
}
/**
+ * Gets plugin configurations.
+ *
* @return Plugin configurations.
+ * @see PluginProvider
*/
public PluginConfiguration[] getPluginConfigurations() {
return pluginCfgs;
}
/**
+ * Sets plugin configurations.
+ *
* @param pluginCfgs Plugin configurations.
* @return {@code this} for chaining.
+ * @see PluginProvider
*/
public IgniteConfiguration setPluginConfigurations(PluginConfiguration... pluginCfgs) {
this.pluginCfgs = pluginCfgs;
@@ -2242,6 +2253,35 @@ public class IgniteConfiguration {
return classLdr;
}
+ /**
+ * Gets cache store session listener factories.
+ *
+ * @return Cache store session listener factories.
+ * @see CacheStoreSessionListener
+ */
+ public Factory<CacheStoreSessionListener>[] getCacheStoreSessionListenerFactories() {
+ return storeSesLsnrs;
+ }
+
+ /**
+ * Cache store session listener factories.
+ * <p>
+ * These are global store session listeners, so they are applied to
+ * all caches. If you need to override listeners for a
+ * particular cache, use {@link CacheConfiguration#setCacheStoreSessionListenerFactories(Factory[])}
+ * configuration property.
+ *
+ * @param storeSesLsnrs Cache store session listener factories.
+ * @return {@code this} for chaining.
+ * @see CacheStoreSessionListener
+ */
+ public IgniteConfiguration setCacheStoreSessionListenerFactories(
+ Factory<CacheStoreSessionListener>... storeSesLsnrs) {
+ this.storeSesLsnrs = storeSesLsnrs;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
new file mode 100644
index 0000000..5a65bdb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.igfs;
+
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.util.concurrent.*;
+
+/**
+ * Provides ability to execute IGFS code in a context of a specific user.
+ */
+public abstract class IgfsUserContext {
+ /** Thread local to hold the current user context. */
+ private static final ThreadLocal<String> userStackThreadLocal = new ThreadLocal<>();
+
+ /**
+ * Executes given callable in the given user context.
+ * The main contract of this method is that {@link #currentUser()} method invoked
+ * inside closure always returns 'user' this callable executed with.
+ * @param user the user name to invoke closure on behalf of.
+ * @param clo the closure to execute
+ * @param <T> The type of closure result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final IgniteOutClosure<T> clo) {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clo.apply(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clo.apply();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Same contract that {@link #doAs(String, IgniteOutClosure)} has, but accepts
+ * callable that throws checked Exception.
+ * The Exception is not ever wrapped anyhow.
+ * If your Callable throws Some specific checked Exceptions, the recommended usage pattern is:
+ * <pre name="code" class="java">
+ * public Foo myOperation() throws MyCheckedException1, MyCheckedException2 {
+ * try {
+ * return IgfsUserContext.doAs(user, new Callable<Foo>() {
+ * @Override public Foo call() throws MyCheckedException1, MyCheckedException2 {
+ * return makeSomeFoo(); // do the job
+ * }
+ * });
+ * }
+ * catch (MyCheckedException1 | MyCheckedException2 | RuntimeException | Error e) {
+ * throw e;
+ * }
+ * catch (Exception e) {
+ * throw new AssertionError("Must never go there.");
+ * }
+ * }
+ * </pre>
+ * @param user the user name to invoke closure on behalf of.
+ * @param clbl the Callable to execute
+ * @param <T> The type of callable result.
+ * @return the result of closure execution.
+ * @throws IllegalArgumentException if user name is null or empty String or if the closure is null.
+ */
+ public static <T> T doAs(String user, final Callable<T> clbl) throws Exception {
+ if (F.isEmpty(user))
+ throw new IllegalArgumentException("Failed to use null or empty user name.");
+
+ final String ctxUser = userStackThreadLocal.get();
+
+ if (F.eq(ctxUser, user))
+ return clbl.call(); // correct context is already there
+
+ userStackThreadLocal.set(user);
+
+ try {
+ return clbl.call();
+ }
+ finally {
+ userStackThreadLocal.set(ctxUser);
+ }
+ }
+
+ /**
+ * Gets the current context user.
+ * If this method is invoked outside of any {@link #doAs(String, IgniteOutClosure)} on the call stack, it will
+ * return null. Otherwise it will return the user name set in the most lower
+ * {@link #doAs(String, IgniteOutClosure)} call on the call stack.
+ * @return The current user, may be null.
+ */
+ @Nullable public static String currentUser() {
+ return userStackThreadLocal.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 9026eac..cb69352 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -198,4 +198,11 @@ public interface IgfsSecondaryFileSystem {
* @return Map of properties.
*/
public Map<String,String> properties();
+
+
+ /**
+ * Closes the secondary file system.
+ * @throws IgniteException in case of an error.
+ */
+ public void close() throws IgniteException;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3d78aa15/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
index 4d5d146..6da45ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ClusterMetricsSnapshot.java
@@ -1247,6 +1247,20 @@ public class ClusterMetricsSnapshot implements ClusterMetrics {
/**
* Serializes node metrics into byte array.
*
+ * @param metrics Node metrics to serialize.
+ * @return New offset.
+ */
+ public static byte[] serialize(ClusterMetrics metrics) {
+ byte[] buf = new byte[METRICS_SIZE];
+
+ serialize(buf, 0, metrics);
+
+ return buf;
+ }
+
+ /**
+ * Serializes node metrics into byte array.
+ *
* @param data Byte array.
* @param off Offset into byte array.
* @param metrics Node metrics to serialize.