You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/10 16:11:46 UTC
[25/28] incubator-ignite git commit: ignite-545: merge from sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..74ba100 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,12 +19,15 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.store.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import java.util.concurrent.atomic.*;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+
/**
* Adapter for cache metrics.
*/
@@ -63,7 +66,7 @@ public class CacheMetricsImpl implements CacheMetrics {
private AtomicLong getTimeNanos = new AtomicLong();
/** Remove time taken nanos. */
- private AtomicLong removeTimeNanos = new AtomicLong();
+ private AtomicLong rmvTimeNanos = new AtomicLong();
/** Commit transaction time taken nanos. */
private AtomicLong commitTimeNanos = new AtomicLong();
@@ -71,6 +74,39 @@ public class CacheMetricsImpl implements CacheMetrics {
/** Commit transaction time taken nanos. */
private AtomicLong rollbackTimeNanos = new AtomicLong();
+ /** Number of reads from off-heap memory. */
+ private AtomicLong offHeapGets = new AtomicLong();
+
+ /** Number of writes to off-heap memory. */
+ private AtomicLong offHeapPuts = new AtomicLong();
+
+ /** Number of removed entries from off-heap memory. */
+ private AtomicLong offHeapRemoves = new AtomicLong();
+
+ /** Number of evictions from off-heap memory. */
+ private AtomicLong offHeapEvicts = new AtomicLong();
+
+ /** Number of off-heap hits. */
+ private AtomicLong offHeapHits = new AtomicLong();
+
+ /** Number of off-heap misses. */
+ private AtomicLong offHeapMisses = new AtomicLong();
+
+ /** Number of reads from swap. */
+ private AtomicLong swapGets = new AtomicLong();
+
+ /** Number of writes to swap. */
+ private AtomicLong swapPuts = new AtomicLong();
+
+ /** Number of removed entries from swap. */
+ private AtomicLong swapRemoves = new AtomicLong();
+
+ /** Number of swap hits. */
+ private AtomicLong swapHits = new AtomicLong();
+
+ /** Number of swap misses. */
+ private AtomicLong swapMisses = new AtomicLong();
+
/** Cache metrics. */
@GridToStringExclude
private transient CacheMetricsImpl delegate;
@@ -118,7 +154,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public long getOverflowSize() {
try {
- return cctx.cache().overflowSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.overflowSize() : -1;
}
catch (IgniteCheckedException ignored) {
return -1;
@@ -126,35 +164,192 @@ public class CacheMetricsImpl implements CacheMetrics {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return offHeapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return offHeapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return offHeapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return offHeapEvicts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return offHeapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ long hits0 = offHeapHits.get();
+ long gets0 = offHeapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return offHeapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ long misses0 = offHeapMisses.get();
+ long reads0 = offHeapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return cctx.cache().offHeapEntriesCount();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapEntriesCount() : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(true, false, cctx.affinity().affinityTopologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ try {
+ return cctx.swap().offheapEntriesCount(false, true, cctx.affinity().affinityTopologyVersion());
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
}
/** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
- return cctx.cache().offHeapAllocatedSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapAllocatedSize() : -1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return cctx.config().getOffHeapMaxMemory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return swapGets.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return swapPuts.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return swapRemoves.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return swapHits.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return swapMisses.get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ try {
+ return cctx.cache().swapKeys();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ try {
+ return cctx.cache().swapSize();
+ }
+ catch (IgniteCheckedException e) {
+ return 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ long hits0 = swapHits.get();
+ long gets0 = swapGets.get();
+
+ if (hits0 == 0)
+ return 0;
+
+ return (float) hits0 / gets0 * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ long misses0 = swapMisses.get();
+ long reads0 = swapGets.get();
+
+ if (misses0 == 0)
+ return 0;
+
+ return (float) misses0 / reads0 * 100.0f;
}
/** {@inheritDoc} */
@Override public int getSize() {
- return cctx.cache().size();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.size() : 0;
}
/** {@inheritDoc} */
@Override public int getKeySize() {
- return cctx.cache().size();
+ return getSize();
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
- return cctx.cache().isEmpty();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache == null || cache.isEmpty();
}
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- return cctx.isNear() ?
- dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1
- : cctx.evicts().evictQueueSize();
+ GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
+
+ if (ctx == null)
+ return -1;
+
+ GridCacheEvictionManager evictMgr = ctx.evicts();
+
+ return evictMgr != null ? evictMgr.evictQueueSize() : -1;
}
/** {@inheritDoc} */
@@ -317,11 +512,24 @@ public class CacheMetricsImpl implements CacheMetrics {
txCommits.set(0);
txRollbacks.set(0);
putTimeNanos.set(0);
- removeTimeNanos.set(0);
+ rmvTimeNanos.set(0);
getTimeNanos.set(0);
commitTimeNanos.set(0);
rollbackTimeNanos.set(0);
+ offHeapGets.set(0);
+ offHeapPuts.set(0);
+ offHeapRemoves.set(0);
+ offHeapHits.set(0);
+ offHeapMisses.set(0);
+ offHeapEvicts.set(0);
+
+ swapGets.set(0);
+ swapPuts.set(0);
+ swapRemoves.set(0);
+ swapHits.set(0);
+ swapMisses.set(0);
+
if (delegate != null)
delegate.clear();
}
@@ -402,7 +610,7 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public float getAverageRemoveTime() {
- long timeNanos = removeTimeNanos.get();
+ long timeNanos = rmvTimeNanos.get();
long removesCnt = rmCnt.get();
if (timeNanos == 0 || removesCnt == 0)
@@ -483,7 +691,6 @@ public class CacheMetricsImpl implements CacheMetrics {
delegate.onTxRollback(duration);
}
-
/**
* Increments the get time accumulator.
*
@@ -514,7 +721,7 @@ public class CacheMetricsImpl implements CacheMetrics {
* @param duration the time taken in nanoseconds.
*/
public void addRemoveTimeNanos(long duration) {
- removeTimeNanos.addAndGet(duration);
+ rmvTimeNanos.addAndGet(duration);
if (delegate != null)
delegate.addRemoveTimeNanos(duration);
@@ -526,7 +733,7 @@ public class CacheMetricsImpl implements CacheMetrics {
* @param duration the time taken in nanoseconds.
*/
public void addRemoveAndGetTimeNanos(long duration) {
- removeTimeNanos.addAndGet(duration);
+ rmvTimeNanos.addAndGet(duration);
getTimeNanos.addAndGet(duration);
if (delegate != null)
@@ -548,37 +755,153 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public String getKeyType() {
- return cctx.config().getKeyType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getKeyType().getName() : null;
}
/** {@inheritDoc} */
@Override public String getValueType() {
- return cctx.config().getValueType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getValueType().getName() : null;
}
/** {@inheritDoc} */
@Override public boolean isReadThrough() {
- return cctx.config().isReadThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isReadThrough();
}
/** {@inheritDoc} */
@Override public boolean isWriteThrough() {
- return cctx.config().isWriteThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isWriteThrough();
}
/** {@inheritDoc} */
@Override public boolean isStoreByValue() {
- return cctx.config().isStoreByValue();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStoreByValue();
}
/** {@inheritDoc} */
@Override public boolean isStatisticsEnabled() {
- return cctx.config().isStatisticsEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStatisticsEnabled();
}
/** {@inheritDoc} */
@Override public boolean isManagementEnabled() {
- return cctx.config().isManagementEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isManagementEnabled();
+ }
+
+ /**
+ * Off-heap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onOffHeapRead(boolean hit) {
+ offHeapGets.incrementAndGet();
+
+ if (hit)
+ offHeapHits.incrementAndGet();
+ else
+ offHeapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRead(hit);
+ }
+
+ /**
+ * Off-heap write callback.
+ */
+ public void onOffHeapWrite() {
+ offHeapPuts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapWrite();
+ }
+
+ /**
+ * Off-heap remove callback.
+ */
+ public void onOffHeapRemove() {
+ offHeapRemoves.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Off-heap evict callback.
+ */
+ public void onOffHeapEvict() {
+ offHeapEvicts.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onOffHeapRemove();
+ }
+
+ /**
+ * Swap read callback.
+ *
+ * @param hit Hit or miss flag.
+ */
+ public void onSwapRead(boolean hit) {
+ swapGets.incrementAndGet();
+
+ if (hit)
+ swapHits.incrementAndGet();
+ else
+ swapMisses.incrementAndGet();
+
+ if (delegate != null)
+ delegate.onSwapRead(hit);
+ }
+
+ /**
+ * Swap write callback.
+ */
+ public void onSwapWrite() {
+ onSwapWrite(1);
+ }
+
+ /**
+ * Swap write callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapWrite(int cnt) {
+ swapPuts.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapWrite(cnt);
+ }
+
+ /**
+ * Swap remove callback.
+ */
+ public void onSwapRemove() {
+ onSwapRemove(1);
+ }
+
+ /**
+ * Swap remove callback.
+ *
+ * @param cnt Amount of entries.
+ */
+ public void onSwapRemove(int cnt) {
+ swapRemoves.addAndGet(cnt);
+
+ if (delegate != null)
+ delegate.onSwapRemove(cnt);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
index e9d547c..966027a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsMXBeanImpl.java
@@ -49,16 +49,116 @@ class CacheMetricsMXBeanImpl implements CacheMetricsMXBean {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return cache.metrics0().getOffHeapGets();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return cache.metrics0().getOffHeapPuts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return cache.metrics0().getOffHeapRemovals();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return cache.metrics0().getOffHeapEvictions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return cache.metrics0().getOffHeapHits();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ return cache.metrics0().getOffHeapHitPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return cache.metrics0().getOffHeapMisses();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ return cache.metrics0().getOffHeapMissPercentage();
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
return cache.metrics0().getOffHeapEntriesCount();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ return cache.metrics0().getOffHeapPrimaryEntriesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ return cache.metrics0().getOffHeapBackupEntriesCount();
+ }
+
+ /** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
return cache.metrics0().getOffHeapAllocatedSize();
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return cache.metrics0().getOffHeapMaxSize();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return cache.metrics0().getSwapGets();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return cache.metrics0().getSwapPuts();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return cache.metrics0().getSwapRemovals();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return cache.metrics0().getSwapHits();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return cache.metrics0().getSwapMisses();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ return cache.metrics0().getSwapHitPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ return cache.metrics0().getSwapMissPercentage();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ return cache.metrics0().getSwapEntriesCount();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ return cache.metrics0().getSwapSize();
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
return cache.metrics0().getSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
index 4fe152a..cf16d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java
@@ -61,7 +61,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private float getAvgTimeNanos = 0;
/** Remove time taken nanos. */
- private float removeAvgTimeNanos = 0;
+ private float rmvAvgTimeNanos = 0;
/** Commit transaction time taken nanos. */
private float commitAvgTimeNanos = 0;
@@ -75,12 +75,60 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** Number of entries that was swapped to disk. */
private long overflowSize;
+ /** Number of reads from off-heap. */
+ private long offHeapGets;
+
+ /** Number of writes to off-heap. */
+ private long offHeapPuts;
+
+ /** Number of removed entries from off-heap. */
+ private long offHeapRemoves;
+
+ /** Number of evictions from off-heap. */
+ private long offHeapEvicts;
+
+ /** Off-heap hits number. */
+ private long offHeapHits;
+
+ /** Off-heap misses number. */
+ private long offHeapMisses;
+
/** Number of entries stored in off-heap memory. */
- private long offHeapEntriesCount;
+ private long offHeapEntriesCnt;
+
+ /** Number of primary entries stored in off-heap memory. */
+ private long offHeapPrimaryEntriesCnt;
+
+ /** Number of backup entries stored in off-heap memory. */
+ private long offHeapBackupEntriesCnt;
/** Memory size allocated in off-heap. */
private long offHeapAllocatedSize;
+ /** Off-heap memory maximum size*/
+ private long offHeapMaxSize;
+
+ /** Number of reads from swap. */
+ private long swapGets;
+
+ /** Number of writes to swap. */
+ private long swapPuts;
+
+ /** Number of removed entries from swap. */
+ private long swapRemoves;
+
+ /** Number of entries stored in swap. */
+ private long swapEntriesCnt;
+
+ /** Swap hits number. */
+ private long swapHits;
+
+ /** Swap misses number. */
+ private long swapMisses;
+
+ /** Swap size. */
+ private long swapSize;
+
/** Number of non-{@code null} values in the cache. */
private int size;
@@ -91,7 +139,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private boolean isEmpty;
/** Gets current size of evict queue used to batch up evictions. */
- private int dhtEvictQueueCurrentSize;
+ private int dhtEvictQueueCurrSize;
/** Transaction per-thread map size. */
private int txThreadMapSize;
@@ -106,7 +154,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int txPrepareQueueSize;
/** Start version counts map size. */
- private int txStartVersionCountsSize;
+ private int txStartVerCountsSize;
/** Number of cached committed transaction IDs. */
private int txCommittedVersionsSize;
@@ -127,7 +175,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int txDhtPrepareQueueSize;
/** DHT start version counts map size. */
- private int txDhtStartVersionCountsSize;
+ private int txDhtStartVerCountsSize;
/** Number of cached committed DHT transaction IDs. */
private int txDhtCommittedVersionsSize;
@@ -142,34 +190,34 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
private int writeBehindFlushSize;
/** Count of worker threads. */
- private int writeBehindFlushThreadCount;
+ private int writeBehindFlushThreadCnt;
/** Flush frequency in milliseconds. */
- private long writeBehindFlushFrequency;
+ private long writeBehindFlushFreq;
/** Maximum size of batch. */
private int writeBehindStoreBatchSize;
/** Count of cache overflow events since start. */
- private int writeBehindTotalCriticalOverflowCount;
+ private int writeBehindTotalCriticalOverflowCnt;
/** Count of cache overflow events since start. */
- private int writeBehindCriticalOverflowCount;
+ private int writeBehindCriticalOverflowCnt;
/** Count of entries in store-retry state. */
- private int writeBehindErrorRetryCount;
+ private int writeBehindErrorRetryCnt;
/** Total count of entries in cache store internal buffer. */
- private int writeBehindBufferSize;
+ private int writeBehindBufSize;
/** */
private String keyType;
/** */
- private String valueType;
+ private String valType;
/** */
- private boolean isStoreByValue;
+ private boolean isStoreByVal;
/** */
private boolean isStatisticsEnabled;
@@ -207,45 +255,64 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos = m.getAveragePutTime();
getAvgTimeNanos = m.getAverageGetTime();
- removeAvgTimeNanos = m.getAverageRemoveTime();
+ rmvAvgTimeNanos = m.getAverageRemoveTime();
commitAvgTimeNanos = m.getAverageTxCommitTime();
rollbackAvgTimeNanos = m.getAverageTxRollbackTime();
cacheName = m.name();
overflowSize = m.getOverflowSize();
- offHeapEntriesCount = m.getOffHeapEntriesCount();
+
+ offHeapGets = m.getOffHeapGets();
+ offHeapPuts = m.getOffHeapPuts();
+ offHeapRemoves = m.getOffHeapRemovals();
+ offHeapEvicts = m.getOffHeapEvictions();
+ offHeapHits = m.getOffHeapHits();
+ offHeapMisses = m.getOffHeapMisses();
+ offHeapEntriesCnt = m.getOffHeapEntriesCount();
+ offHeapPrimaryEntriesCnt = m.getOffHeapPrimaryEntriesCount();
+ offHeapBackupEntriesCnt = m.getOffHeapBackupEntriesCount();
offHeapAllocatedSize = m.getOffHeapAllocatedSize();
+ offHeapMaxSize = m.getOffHeapMaxSize();
+
+ swapGets = m.getSwapGets();
+ swapPuts = m.getSwapPuts();
+ swapRemoves = m.getSwapRemovals();
+ swapHits = m.getSwapHits();
+ swapMisses = m.getSwapMisses();
+ swapEntriesCnt = m.getSwapEntriesCount();
+ swapSize = m.getSwapSize();
+
size = m.getSize();
keySize = m.getKeySize();
isEmpty = m.isEmpty();
- dhtEvictQueueCurrentSize = m.getDhtEvictQueueCurrentSize();
+ dhtEvictQueueCurrSize = m.getDhtEvictQueueCurrentSize();
txThreadMapSize = m.getTxThreadMapSize();
txXidMapSize = m.getTxXidMapSize();
txCommitQueueSize = m.getTxCommitQueueSize();
txPrepareQueueSize = m.getTxPrepareQueueSize();
- txStartVersionCountsSize = m.getTxStartVersionCountsSize();
+ txStartVerCountsSize = m.getTxStartVersionCountsSize();
txCommittedVersionsSize = m.getTxCommittedVersionsSize();
txRolledbackVersionsSize = m.getTxRolledbackVersionsSize();
txDhtThreadMapSize = m.getTxDhtThreadMapSize();
txDhtXidMapSize = m.getTxDhtXidMapSize();
txDhtCommitQueueSize = m.getTxDhtCommitQueueSize();
txDhtPrepareQueueSize = m.getTxDhtPrepareQueueSize();
- txDhtStartVersionCountsSize = m.getTxDhtStartVersionCountsSize();
+ txDhtStartVerCountsSize = m.getTxDhtStartVersionCountsSize();
txDhtCommittedVersionsSize = m.getTxDhtCommittedVersionsSize();
txDhtRolledbackVersionsSize = m.getTxDhtRolledbackVersionsSize();
isWriteBehindEnabled = m.isWriteBehindEnabled();
writeBehindFlushSize = m.getWriteBehindFlushSize();
- writeBehindFlushThreadCount = m.getWriteBehindFlushThreadCount();
- writeBehindFlushFrequency = m.getWriteBehindFlushFrequency();
+ writeBehindFlushThreadCnt = m.getWriteBehindFlushThreadCount();
+ writeBehindFlushFreq = m.getWriteBehindFlushFrequency();
writeBehindStoreBatchSize = m.getWriteBehindStoreBatchSize();
- writeBehindTotalCriticalOverflowCount = m.getWriteBehindTotalCriticalOverflowCount();
- writeBehindCriticalOverflowCount = m.getWriteBehindCriticalOverflowCount();
- writeBehindErrorRetryCount = m.getWriteBehindErrorRetryCount();
- writeBehindBufferSize = m.getWriteBehindBufferSize();
+ writeBehindTotalCriticalOverflowCnt = m.getWriteBehindTotalCriticalOverflowCount();
+ writeBehindCriticalOverflowCnt = m.getWriteBehindCriticalOverflowCount();
+ writeBehindErrorRetryCnt = m.getWriteBehindErrorRetryCount();
+ writeBehindBufSize = m.getWriteBehindBufferSize();
keyType = m.getKeyType();
- valueType = m.getValueType();
- isStoreByValue = m.isStoreByValue();
+ valType = m.getValueType();
+ isStoreByVal = m.isStoreByValue();
isStatisticsEnabled = m.isStatisticsEnabled();
isManagementEnabled = m.isManagementEnabled();
isReadThrough = m.isReadThrough();
@@ -263,21 +330,23 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
isEmpty = loc.isEmpty();
isWriteBehindEnabled = loc.isWriteBehindEnabled();
writeBehindFlushSize = loc.getWriteBehindFlushSize();
- writeBehindFlushThreadCount = loc.getWriteBehindFlushThreadCount();
- writeBehindFlushFrequency = loc.getWriteBehindFlushFrequency();
+ writeBehindFlushThreadCnt = loc.getWriteBehindFlushThreadCount();
+ writeBehindFlushFreq = loc.getWriteBehindFlushFrequency();
writeBehindStoreBatchSize = loc.getWriteBehindStoreBatchSize();
- writeBehindBufferSize = loc.getWriteBehindBufferSize();
+ writeBehindBufSize = loc.getWriteBehindBufferSize();
size = loc.getSize();
keySize = loc.getKeySize();
keyType = loc.getKeyType();
- valueType = loc.getValueType();
- isStoreByValue = loc.isStoreByValue();
+ valType = loc.getValueType();
+ isStoreByVal = loc.isStoreByValue();
isStatisticsEnabled = loc.isStatisticsEnabled();
isManagementEnabled = loc.isManagementEnabled();
isReadThrough = loc.isReadThrough();
isWriteThrough = loc.isWriteThrough();
+ offHeapMaxSize = loc.getOffHeapMaxSize();
+
for (CacheMetrics e : metrics) {
reads += e.getCacheGets();
puts += e.getCachePuts();
@@ -290,7 +359,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos += e.getAveragePutTime();
getAvgTimeNanos += e.getAverageGetTime();
- removeAvgTimeNanos += e.getAverageRemoveTime();
+ rmvAvgTimeNanos += e.getAverageRemoveTime();
commitAvgTimeNanos += e.getAverageTxCommitTime();
rollbackAvgTimeNanos += e.getAverageTxRollbackTime();
@@ -299,19 +368,35 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
else
overflowSize = -1;
- offHeapEntriesCount += e.getOffHeapEntriesCount();
+ offHeapGets += e.getOffHeapGets();
+ offHeapPuts += e.getOffHeapPuts();
+ offHeapRemoves += e.getOffHeapRemovals();
+ offHeapEvicts += e.getOffHeapEvictions();
+ offHeapHits += e.getOffHeapHits();
+ offHeapMisses += e.getOffHeapMisses();
+ offHeapEntriesCnt += e.getOffHeapEntriesCount();
+ offHeapPrimaryEntriesCnt += e.getOffHeapPrimaryEntriesCount();
+ offHeapBackupEntriesCnt += e.getOffHeapBackupEntriesCount();
offHeapAllocatedSize += e.getOffHeapAllocatedSize();
+ swapGets += e.getSwapGets();
+ swapPuts += e.getSwapPuts();
+ swapRemoves += e.getSwapRemovals();
+ swapHits += e.getSwapHits();
+ swapMisses += e.getSwapMisses();
+ swapEntriesCnt += e.getSwapEntriesCount();
+ swapSize += e.getSwapSize();
+
if (e.getDhtEvictQueueCurrentSize() > -1)
- dhtEvictQueueCurrentSize += e.getDhtEvictQueueCurrentSize();
+ dhtEvictQueueCurrSize += e.getDhtEvictQueueCurrentSize();
else
- dhtEvictQueueCurrentSize = -1;
+ dhtEvictQueueCurrSize = -1;
txThreadMapSize += e.getTxThreadMapSize();
txXidMapSize += e.getTxXidMapSize();
txCommitQueueSize += e.getTxCommitQueueSize();
txPrepareQueueSize += e.getTxPrepareQueueSize();
- txStartVersionCountsSize += e.getTxStartVersionCountsSize();
+ txStartVerCountsSize += e.getTxStartVersionCountsSize();
txCommittedVersionsSize += e.getTxCommittedVersionsSize();
txRolledbackVersionsSize += e.getTxRolledbackVersionsSize();
@@ -336,9 +421,9 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
txDhtPrepareQueueSize = -1;
if (e.getTxDhtStartVersionCountsSize() > -1)
- txDhtStartVersionCountsSize += e.getTxDhtStartVersionCountsSize();
+ txDhtStartVerCountsSize += e.getTxDhtStartVersionCountsSize();
else
- txDhtStartVersionCountsSize = -1;
+ txDhtStartVerCountsSize = -1;
if (e.getTxDhtCommittedVersionsSize() > -1)
txDhtCommittedVersionsSize += e.getTxDhtCommittedVersionsSize();
@@ -351,19 +436,19 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
txDhtRolledbackVersionsSize = -1;
if (e.getWriteBehindTotalCriticalOverflowCount() > -1)
- writeBehindTotalCriticalOverflowCount += e.getWriteBehindTotalCriticalOverflowCount();
+ writeBehindTotalCriticalOverflowCnt += e.getWriteBehindTotalCriticalOverflowCount();
else
- writeBehindTotalCriticalOverflowCount = -1;
+ writeBehindTotalCriticalOverflowCnt = -1;
if (e.getWriteBehindCriticalOverflowCount() > -1)
- writeBehindCriticalOverflowCount += e.getWriteBehindCriticalOverflowCount();
+ writeBehindCriticalOverflowCnt += e.getWriteBehindCriticalOverflowCount();
else
- writeBehindCriticalOverflowCount = -1;
+ writeBehindCriticalOverflowCnt = -1;
if (e.getWriteBehindErrorRetryCount() > -1)
- writeBehindErrorRetryCount += e.getWriteBehindErrorRetryCount();
+ writeBehindErrorRetryCnt += e.getWriteBehindErrorRetryCount();
else
- writeBehindErrorRetryCount = -1;
+ writeBehindErrorRetryCnt = -1;
}
int size = metrics.size();
@@ -371,7 +456,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
if (size > 1) {
putAvgTimeNanos /= size;
getAvgTimeNanos /= size;
- removeAvgTimeNanos /= size;
+ rmvAvgTimeNanos /= size;
commitAvgTimeNanos /= size;
rollbackAvgTimeNanos /= size;
}
@@ -435,7 +520,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public float getAverageRemoveTime() {
- return removeAvgTimeNanos;
+ return rmvAvgTimeNanos;
}
/** {@inheritDoc} */
@@ -469,8 +554,63 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapGets() {
+ return offHeapGets;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPuts() {
+ return offHeapPuts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapRemovals() {
+ return offHeapRemoves;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapEvictions() {
+ return offHeapEvicts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapHits() {
+ return offHeapHits;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapHitPercentage() {
+ if (offHeapHits == 0 || offHeapGets == 0)
+ return 0;
+
+ return (float) offHeapHits / offHeapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapMisses() {
+ return offHeapMisses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getOffHeapMissPercentage() {
+ if (offHeapMisses == 0 || offHeapGets == 0)
+ return 0;
+
+ return (float) offHeapMisses / offHeapGets * 100.0f;
+ }
+ /** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return offHeapEntriesCount;
+ return offHeapEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapPrimaryEntriesCount() {
+ return offHeapPrimaryEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getOffHeapBackupEntriesCount() {
+ return offHeapBackupEntriesCnt;
}
/** {@inheritDoc} */
@@ -479,6 +619,62 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
}
/** {@inheritDoc} */
+ @Override public long getOffHeapMaxSize() {
+ return offHeapMaxSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapGets() {
+ return swapGets;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapPuts() {
+ return swapPuts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapRemovals() {
+ return swapRemoves;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapHits() {
+ return swapHits;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapMisses() {
+ return swapMisses;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapHitPercentage() {
+ if (swapHits == 0 || swapGets == 0)
+ return 0;
+
+ return (float) swapHits / swapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public float getSwapMissPercentage() {
+ if (swapMisses == 0 || swapGets == 0)
+ return 0;
+
+ return (float) swapMisses / swapGets * 100.0f;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapEntriesCount() {
+ return swapEntriesCnt;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getSwapSize() {
+ return swapSize;
+ }
+
+ /** {@inheritDoc} */
@Override public int getSize() {
return size;
}
@@ -495,7 +691,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- return dhtEvictQueueCurrentSize;
+ return dhtEvictQueueCurrSize;
}
/** {@inheritDoc} */
@@ -520,7 +716,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getTxStartVersionCountsSize() {
- return txStartVersionCountsSize;
+ return txStartVerCountsSize;
}
/** {@inheritDoc} */
@@ -555,7 +751,7 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getTxDhtStartVersionCountsSize() {
- return txDhtStartVersionCountsSize;
+ return txDhtStartVerCountsSize;
}
/** {@inheritDoc} */
@@ -580,12 +776,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getWriteBehindFlushThreadCount() {
- return writeBehindFlushThreadCount;
+ return writeBehindFlushThreadCnt;
}
/** {@inheritDoc} */
@Override public long getWriteBehindFlushFrequency() {
- return writeBehindFlushFrequency;
+ return writeBehindFlushFreq;
}
/** {@inheritDoc} */
@@ -595,22 +791,22 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public int getWriteBehindTotalCriticalOverflowCount() {
- return writeBehindTotalCriticalOverflowCount;
+ return writeBehindTotalCriticalOverflowCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindCriticalOverflowCount() {
- return writeBehindCriticalOverflowCount;
+ return writeBehindCriticalOverflowCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindErrorRetryCount() {
- return writeBehindErrorRetryCount;
+ return writeBehindErrorRetryCnt;
}
/** {@inheritDoc} */
@Override public int getWriteBehindBufferSize() {
- return writeBehindBufferSize;
+ return writeBehindBufSize;
}
/** {@inheritDoc} */
@@ -620,12 +816,12 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
/** {@inheritDoc} */
@Override public String getValueType() {
- return valueType;
+ return valType;
}
/** {@inheritDoc} */
@Override public boolean isStoreByValue() {
- return isStoreByValue;
+ return isStoreByVal;
}
/** {@inheritDoc} */
@@ -666,31 +862,49 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
out.writeFloat(putAvgTimeNanos);
out.writeFloat(getAvgTimeNanos);
- out.writeFloat(removeAvgTimeNanos);
+ out.writeFloat(rmvAvgTimeNanos);
out.writeFloat(commitAvgTimeNanos);
out.writeFloat(rollbackAvgTimeNanos);
out.writeLong(overflowSize);
- out.writeLong(offHeapEntriesCount);
+ out.writeLong(offHeapGets);
+ out.writeLong(offHeapPuts);
+ out.writeLong(offHeapRemoves);
+ out.writeLong(offHeapEvicts);
+ out.writeLong(offHeapHits);
+ out.writeLong(offHeapMisses);
+ out.writeLong(offHeapEntriesCnt);
+ out.writeLong(offHeapPrimaryEntriesCnt);
+ out.writeLong(offHeapBackupEntriesCnt);
out.writeLong(offHeapAllocatedSize);
- out.writeInt(dhtEvictQueueCurrentSize);
+ out.writeLong(offHeapMaxSize);
+
+ out.writeLong(swapGets);
+ out.writeLong(swapPuts);
+ out.writeLong(swapRemoves);
+ out.writeLong(swapHits);
+ out.writeLong(swapMisses);
+ out.writeLong(swapEntriesCnt);
+ out.writeLong(swapSize);
+
+ out.writeInt(dhtEvictQueueCurrSize);
out.writeInt(txThreadMapSize);
out.writeInt(txXidMapSize);
out.writeInt(txCommitQueueSize);
out.writeInt(txPrepareQueueSize);
- out.writeInt(txStartVersionCountsSize);
+ out.writeInt(txStartVerCountsSize);
out.writeInt(txCommittedVersionsSize);
out.writeInt(txRolledbackVersionsSize);
out.writeInt(txDhtThreadMapSize);
out.writeInt(txDhtXidMapSize);
out.writeInt(txDhtCommitQueueSize);
out.writeInt(txDhtPrepareQueueSize);
- out.writeInt(txDhtStartVersionCountsSize);
+ out.writeInt(txDhtStartVerCountsSize);
out.writeInt(txDhtCommittedVersionsSize);
out.writeInt(txDhtRolledbackVersionsSize);
- out.writeInt(writeBehindTotalCriticalOverflowCount);
- out.writeInt(writeBehindCriticalOverflowCount);
- out.writeInt(writeBehindErrorRetryCount);
+ out.writeInt(writeBehindTotalCriticalOverflowCnt);
+ out.writeInt(writeBehindCriticalOverflowCnt);
+ out.writeInt(writeBehindErrorRetryCnt);
}
/** {@inheritDoc} */
@@ -706,30 +920,48 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable {
putAvgTimeNanos = in.readFloat();
getAvgTimeNanos = in.readFloat();
- removeAvgTimeNanos = in.readFloat();
+ rmvAvgTimeNanos = in.readFloat();
commitAvgTimeNanos = in.readFloat();
rollbackAvgTimeNanos = in.readFloat();
overflowSize = in.readLong();
- offHeapEntriesCount = in.readLong();
+ offHeapGets = in.readLong();
+ offHeapPuts = in.readLong();
+ offHeapRemoves = in.readLong();
+ offHeapEvicts = in.readLong();
+ offHeapHits = in.readLong();
+ offHeapMisses = in.readLong();
+ offHeapEntriesCnt = in.readLong();
+ offHeapPrimaryEntriesCnt = in.readLong();
+ offHeapBackupEntriesCnt = in.readLong();
offHeapAllocatedSize = in.readLong();
- dhtEvictQueueCurrentSize = in.readInt();
+ offHeapMaxSize = in.readLong();
+
+ swapGets = in.readLong();
+ swapPuts = in.readLong();
+ swapRemoves = in.readLong();
+ swapHits = in.readLong();
+ swapMisses = in.readLong();
+ swapEntriesCnt = in.readLong();
+ swapSize = in.readLong();
+
+ dhtEvictQueueCurrSize = in.readInt();
txThreadMapSize = in.readInt();
txXidMapSize = in.readInt();
txCommitQueueSize = in.readInt();
txPrepareQueueSize = in.readInt();
- txStartVersionCountsSize = in.readInt();
+ txStartVerCountsSize = in.readInt();
txCommittedVersionsSize = in.readInt();
txRolledbackVersionsSize = in.readInt();
txDhtThreadMapSize = in.readInt();
txDhtXidMapSize = in.readInt();
txDhtCommitQueueSize = in.readInt();
txDhtPrepareQueueSize = in.readInt();
- txDhtStartVersionCountsSize = in.readInt();
+ txDhtStartVerCountsSize = in.readInt();
txDhtCommittedVersionsSize = in.readInt();
txDhtRolledbackVersionsSize = in.readInt();
- writeBehindTotalCriticalOverflowCount = in.readInt();
- writeBehindCriticalOverflowCount = in.readInt();
- writeBehindErrorRetryCount = in.readInt();
+ writeBehindTotalCriticalOverflowCnt = in.readInt();
+ writeBehindCriticalOverflowCnt = in.readInt();
+ writeBehindErrorRetryCnt = in.readInt();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
index ac3660e..dfc39c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeBatch.java
@@ -17,16 +17,18 @@
package org.apache.ignite.internal.processors.cache;
+import org.apache.ignite.internal.managers.discovery.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
-import java.io.*;
import java.util.*;
/**
* Cache change batch.
*/
-public class DynamicCacheChangeBatch implements Serializable {
+public class DynamicCacheChangeBatch implements DiscoveryCustomMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -38,6 +40,9 @@ public class DynamicCacheChangeBatch implements Serializable {
@GridToStringInclude
private Map<String, Map<UUID, Boolean>> clientNodes;
+ /** Custom message ID. */
+ private IgniteUuid id = IgniteUuid.randomUuid();
+
/**
* @param reqs Requests.
*/
@@ -47,6 +52,11 @@ public class DynamicCacheChangeBatch implements Serializable {
this.reqs = reqs;
}
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
/**
* @return Collection of change requests.
*/
@@ -69,6 +79,21 @@ public class DynamicCacheChangeBatch implements Serializable {
}
/** {@inheritDoc} */
+ @Override public boolean incrementMinorTopologyVersion() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheChangeBatch.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
index 6f6f422..9c6cc43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheDescriptor.java
@@ -61,7 +61,11 @@ public class DynamicCacheDescriptor {
/** Cache plugin manager. */
private final CachePluginManager pluginMgr;
+ /** */
+ private boolean updatesAllowed = true;
+
/**
+ * @param ctx Context.
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param template {@code True} if this is template configuration.
@@ -76,6 +80,7 @@ public class DynamicCacheDescriptor {
this.cacheType = cacheType;
this.template = template;
this.deploymentId = deploymentId;
+
pluginMgr = new CachePluginManager(ctx, cacheCfg);
}
@@ -206,6 +211,20 @@ public class DynamicCacheDescriptor {
rmtCfgs = null;
}
+ /**
+ * @return Updates allowed flag.
+ */
+ public boolean updatesAllowed() {
+ return updatesAllowed;
+ }
+
+ /**
+ * @param updatesAllowed Updates allowed flag.
+ */
+ public void updatesAllowed(boolean updatesAllowed) {
+ this.updatesAllowed = updatesAllowed;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DynamicCacheDescriptor.class, this, "cacheName", U.maskName(cacheCfg.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index d390037..d8d029e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -348,7 +348,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/**
* @return Preloader.
*/
- public abstract GridCachePreloader<K, V> preloader();
+ public abstract GridCachePreloader preloader();
/** {@inheritDoc} */
@Override public Affinity<K> affinity() {
@@ -395,6 +395,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ assert !CU.isUtilityCache(ctx.name());
+ assert !CU.isAtomicsCache(ctx.name());
+ assert !CU.isMarshallerCache(ctx.name());
+
CacheOperationContext opCtx = new CacheOperationContext(false, null, false, plc);
return new GridCacheProxyImpl<>(ctx, this, opCtx);
@@ -902,7 +906,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public Set<K> keySet() {
- return keySet((CacheEntryPredicate[]) null);
+ return keySet((CacheEntryPredicate[])null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<K> keySetx() {
+ return keySetx((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -1215,11 +1224,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/** {@inheritDoc} */
- @Nullable @Override public Map<K, V> getAllOutTx(List<K> keys) throws IgniteCheckedException {
+ @Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
+ return getAllOutTxAsync(keys).get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
String taskName = ctx.kernalContext().job().currentTaskName();
- return getAllAsync(keys, !ctx.config().isReadFromBackup(), /*skip tx*/true, null, null, taskName, true, false)
- .get();
+ return getAllAsync(keys,
+ !ctx.config().isReadFromBackup(),
+ /*skip tx*/true,
+ null,
+ null,
+ taskName,
+ !ctx.keepPortable(),
+ false);
}
/**
@@ -3249,7 +3269,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long overflowSize() throws IgniteCheckedException {
- return ctx.swap().swapSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.swapSize() : -1;
}
/**
@@ -3802,12 +3824,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
- return ctx.swap().offHeapEntriesCount();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
- return ctx.swap().offHeapAllocatedSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
@@ -4299,6 +4325,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * @param filter Filters to evaluate.
+ * @return Key set including internal keys.
+ */
+ public Set<K> keySetx(@Nullable CacheEntryPredicate... filter) {
+ return map.keySetx(filter);
+ }
+
+ /**
* @param filter Primary key set.
* @return Primary key set.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index fe7efd5..ea17df1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -140,6 +140,7 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
*
* @param topVer Topology version to calculate affinity for.
* @param discoEvt Discovery event that causes this topology change.
+ * @return Affinity assignments.
*/
public List<List<ClusterNode>> calculateAffinity(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
assert !cctx.isLocal();
@@ -148,6 +149,19 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter {
}
/**
+ * Copies previous affinity assignment when discovery event does not cause affinity assignment changes
+ * (e.g. client node joins on leaves).
+ *
+ * @param evt Event.
+ * @param topVer Topology version.
+ */
+ public void clientEventTopologyChange(DiscoveryEvent evt, AffinityTopologyVersion topVer) {
+ assert !cctx.isLocal();
+
+ aff.clientEventTopologyChange(evt, topVer);
+ }
+
+ /**
* @return Partition count.
*/
public int partitions() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index bd3e0f2..db5eed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -626,7 +626,19 @@ public class GridCacheConcurrentMap {
public <K, V> Set<K> keySet(CacheEntryPredicate... filter) {
checkWeakQueue();
- return new KeySet<>(this, filter);
+ return new KeySet<>(this, filter, false);
+ }
+
+ /**
+ * Key set including internal keys.
+ *
+ * @param filter Filter.
+ * @return Set of the keys contained in this map.
+ */
+ public <K, V> Set<K> keySetx(CacheEntryPredicate... filter) {
+ checkWeakQueue();
+
+ return new KeySet<>(this, filter, true);
}
/**
@@ -1921,7 +1933,7 @@ public class GridCacheConcurrentMap {
/** {@inheritDoc} */
@Override public void clear() {
- ctx.cache().clearLocally0(new KeySet<K, V>(map, filter));
+ ctx.cache().clearLocally0(new KeySet<K, V>(map, filter, false));
}
/** {@inheritDoc} */
@@ -2171,11 +2183,12 @@ public class GridCacheConcurrentMap {
/**
* @param map Base map.
* @param filter Key filter.
+ * @param internal Whether to allow internal keys.
*/
- private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter) {
+ private KeySet(GridCacheConcurrentMap map, CacheEntryPredicate[] filter, boolean internal) {
assert map != null;
- set = new Set0<>(map, nonInternal(filter));
+ set = new Set0<>(map, internal ? filter : nonInternal(filter));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2eeaed6..8a4e3b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -196,6 +196,9 @@ public class GridCacheContext<K, V> implements Externalizable {
/** Dynamic cache deployment ID. */
private IgniteUuid dynamicDeploymentId;
+ /** Updates allowed flag. */
+ private boolean updatesAllowed;
+
/**
* Empty constructor required for {@link Externalizable}.
*/
@@ -209,6 +212,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @param cacheCfg Cache configuration.
* @param cacheType Cache type.
* @param affNode {@code True} if local node is affinity node.
+ * @param updatesAllowed Updates allowed flag.
* @param evtMgr Cache event manager.
* @param swapMgr Cache swap manager.
* @param storeMgr Store manager.
@@ -230,6 +234,7 @@ public class GridCacheContext<K, V> implements Externalizable {
CacheConfiguration cacheCfg,
CacheType cacheType,
boolean affNode,
+ boolean updatesAllowed,
/*
* Managers in starting order!
@@ -271,6 +276,7 @@ public class GridCacheContext<K, V> implements Externalizable {
this.cacheCfg = cacheCfg;
this.cacheType = cacheType;
this.affNode = affNode;
+ this.updatesAllowed = updatesAllowed;
/*
* Managers in starting order!
@@ -348,7 +354,7 @@ public class GridCacheContext<K, V> implements Externalizable {
public void awaitStarted() throws IgniteCheckedException {
U.await(startLatch);
- GridCachePreloader<K, V> prldr = preloader();
+ GridCachePreloader prldr = preloader();
if (prldr != null)
prldr.startFuture().get();
@@ -361,7 +367,7 @@ public class GridCacheContext<K, V> implements Externalizable {
if (startLatch.getCount() != 0)
return false;
- GridCachePreloader<K, V> prldr = preloader();
+ GridCachePreloader prldr = preloader();
return prldr == null || prldr.startFuture().isDone();
}
@@ -682,7 +688,7 @@ public class GridCacheContext<K, V> implements Externalizable {
/**
* @return Preloader.
*/
- public GridCachePreloader<K, V> preloader() {
+ public GridCachePreloader preloader() {
return cache().preloader();
}
@@ -1469,9 +1475,6 @@ public class GridCacheContext<K, V> implements Externalizable {
Collection<ClusterNode> dhtNodeIds = new ArrayList<>(dhtRemoteNodes);
Collection<ClusterNode> nearNodeIds = F.isEmpty(nearRemoteNodes) ? null : new ArrayList<>(nearRemoteNodes);
- if (!F.isEmpty(nearNodeIds))
- U.dumpStack("Added near mapped nodes: " + entry + ", " + nearNodeIds);
-
entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds);
}
@@ -1809,6 +1812,13 @@ public class GridCacheContext<K, V> implements Externalizable {
}
/**
+ * @return Updates allowed.
+ */
+ public boolean updatesAllowed() {
+ return updatesAllowed;
+ }
+
+ /**
* Nulling references to potentially leak-prone objects.
*/
public void cleanup() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 8d3d089..3857b35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -21,6 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.cache.eviction.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.distributed.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.dr.*;
@@ -943,4 +944,9 @@ public interface GridCacheEntryEx {
* @return {@code True} if value was removed, {@code false} otherwise.
*/
public <V> boolean removeMeta(UUID name, V val);
+
+ /**
+ * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition.
+ */
+ public void onUnlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
index 9d680ef..d9d151c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java
@@ -127,7 +127,7 @@ public class GridCacheGateway<K, V> {
try {
GridCacheAdapter<K, V> cache = ctx.cache();
- GridCachePreloader<K, V> preldr = cache != null ? cache.preloader() : null;
+ GridCachePreloader preldr = cache != null ? cache.preloader() : null;
if (preldr == null)
throw new IllegalStateException("Grid is in invalid state to perform this operation. " +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index 02f16c0..eef9fde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -472,7 +472,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.miniId(),
false,
0,
- req.classError());
+ req.classError(),
+ null);
sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy());
}
@@ -488,7 +489,10 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter {
req.miniId(),
req.version(),
req.version(),
- null, null, null);
+ null,
+ null,
+ null,
+ null);
res.error(req.classError());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 92035af..4680994 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -61,6 +61,9 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
private static final byte IS_UNSWAPPED_MASK = 0x02;
/** */
+ private static final byte IS_OFFHEAP_PTR_MASK = 0x04;
+
+ /** */
public static final GridCacheAtomicVersionComparator ATOMIC_VER_COMPARATOR = new GridCacheAtomicVersionComparator();
/**
@@ -433,6 +436,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (e.offheapPointer() > 0) {
offHeapPointer(e.offheapPointer());
+ flags |= IS_OFFHEAP_PTR_MASK;
+
if (needVal) {
CacheObject val = cctx.fromOffheap(offHeapPointer(), false);
@@ -498,7 +503,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return;
}
- if (val == null && cctx.offheapTiered() && hasOffHeapPointer()) {
+ if (cctx.offheapTiered() && hasOffHeapPointer()) {
if (log.isDebugEnabled())
log.debug("Value did not change, skip write swap entry: " + this);
@@ -509,10 +514,16 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
IgniteUuid valClsLdrId = null;
+ IgniteUuid keyClsLdrId = null;
- if (val != null) {
- valClsLdrId = cctx.deploy().getClassLoaderId(
- val.value(cctx.cacheObjectContext(), false).getClass().getClassLoader());
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
+ if (val != null) {
+ valClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ }
+
+ keyClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false)));
}
IgniteBiTuple<byte[], Byte> valBytes = valueBytes0();
@@ -523,7 +534,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
ver,
ttlExtras(),
expireTime,
- cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))),
+ keyClsLdrId,
valClsLdrId);
if (log.isDebugEnabled())
@@ -3617,6 +3628,8 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return true;
}
+ else
+ evictFailed(prev);
}
}
else {
@@ -3660,8 +3673,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return true;
}
- else
+ else {
+ evictFailed(prevVal);
+
return false;
+ }
}
}
}
@@ -3680,6 +3696,27 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
return false;
}
+ /**
+ * @param prevVal Previous value.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void evictFailed(@Nullable CacheObject prevVal) throws IgniteCheckedException {
+ if (cctx.offheapTiered() && ((flags & IS_OFFHEAP_PTR_MASK) != 0)) {
+ flags &= ~IS_OFFHEAP_PTR_MASK;
+
+ if (prevVal != null) {
+ cctx.swap().removeOffheap(key());
+
+ value(prevVal);
+
+ GridCacheQueryManager qryMgr = cctx.queries();
+
+ if (qryMgr != null)
+ qryMgr.onUnswap(key, prevVal);
+ }
+ }
+ }
+
/** {@inheritDoc} */
@Override public GridCacheBatchSwapEntry evictInBatchInternal(GridCacheVersion obsoleteVer)
throws IgniteCheckedException {
@@ -3692,10 +3729,17 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (!isStartVersion() && hasValueUnlocked()) {
IgniteUuid valClsLdrId = null;
+ IgniteUuid keyClsLdrId = null;
- if (val != null)
- valClsLdrId = cctx.deploy().getClassLoaderId(
- U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ if (cctx.kernalContext().config().isPeerClassLoadingEnabled()) {
+ if (val != null) {
+ valClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(val.value(cctx.cacheObjectContext(), false)));
+ }
+
+ keyClsLdrId = cctx.deploy().getClassLoaderId(
+ U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false)));
+ }
IgniteBiTuple<byte[], Byte> valBytes = valueBytes0();
@@ -3706,7 +3750,7 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
ver,
ttlExtras(),
expireTimeExtras(),
- cctx.deploy().getClassLoaderId(U.detectObjectClassLoader(key.value(cctx.cacheObjectContext(), false))),
+ keyClsLdrId,
valClsLdrId);
}
@@ -4100,6 +4144,11 @@ public abstract class GridCacheMapEntry implements GridCacheEntryEx {
}
/** {@inheritDoc} */
+ @Override public void onUnlock() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public boolean equals(Object o) {
// Identity comparison left on purpose.
return o == this;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
index c05e4b4..c528e08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java
@@ -955,6 +955,21 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @param topVer Topology version.
+ * @return Locked keys.
+ */
+ public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) {
+ Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>();
+
+ for (FinishLockFuture fut : finishFuts) {
+ if (fut.topologyVersion().equals(topVer))
+ cands.putAll(fut.pendingLocks());
+ }
+
+ return cands;
+ }
+
+ /**
* Creates a future that will wait for all explicit locks acquired on given topology
* version to be released.
*
@@ -1041,8 +1056,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
finishFuts.add(finishFut);
finishFut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override
- public void apply(IgniteInternalFuture<?> e) {
+ @Override public void apply(IgniteInternalFuture<?> e) {
finishFuts.remove(finishFut);
// This call is required to make sure that the concurrent queue
@@ -1117,6 +1131,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter {
}
/**
+ * @return Topology version.
+ */
+ AffinityTopologyVersion topologyVersion() {
+ return topVer;
+ }
+
+ /**
+ * @return Pending locks.
+ */
+ Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> pendingLocks() {
+ return pendingLocks;
+ }
+
+ /**
* @return Filter.
*/
private IgnitePredicate<GridCacheMvccCandidate> versionFilter() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 0ecaf97..3236bb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -97,7 +97,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
private final AtomicReference<AffinityTopologyVersion> readyTopVer =
new AtomicReference<>(AffinityTopologyVersion.NONE);
-
/**
* Partition map futures.
* This set also contains already completed exchange futures to address race conditions when coordinator
@@ -150,8 +149,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
else {
DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)e;
- if (customEvt.data() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.data();
+ if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
@@ -554,7 +553,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* Partition refresh callback.
*/
void refreshPartitions() {
- ClusterNode oldest = CU.oldest(cctx);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE);
+
+ if (oldest == null) {
+ if (log.isDebugEnabled())
+ log.debug("Skip partitions refresh, there are no server nodes [loc=" + cctx.localNodeId() + ']');
+
+ return;
+ }
if (log.isDebugEnabled())
log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']');
@@ -564,7 +570,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
try {
// If this is the oldest node.
if (oldest.id().equals(cctx.localNodeId())) {
- rmts = CU.remoteNodes(cctx);
+ rmts = CU.remoteNodes(cctx, AffinityTopologyVersion.NONE);
if (log.isDebugEnabled())
log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId());
@@ -641,7 +647,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private boolean sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id)
throws IgniteCheckedException {
- GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, cctx.versions().last());
+ GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id,
+ cctx.kernalContext().clientNode(),
+ cctx.versions().last());
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal()) {
@@ -687,6 +695,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
+ * @param reqs Cache change requests.
* @return Exchange future.
*/
GridDhtPartitionsExchangeFuture exchangeFuture(GridDhtPartitionExchangeId exchId,
@@ -696,9 +705,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
GridDhtPartitionsExchangeFuture old = exchFuts.addx(
fut = new GridDhtPartitionsExchangeFuture(cctx, busyLock, exchId, reqs));
- if (old != null)
+ if (old != null) {
fut = old;
+ if (reqs != null)
+ fut.cacheChangeRequests(reqs);
+ }
+
if (discoEvt != null)
fut.onEvent(exchId, discoEvt);
@@ -827,7 +840,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @param node Node ID.
* @param msg Message.
*/
- private void processSinglePartitionUpdate(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
+ private void processSinglePartitionUpdate(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
if (!enterBusy())
return;
@@ -858,8 +871,22 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (updated)
scheduleResendPartitions();
}
- else
- exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+ else {
+ if (msg.client()) {
+ final GridDhtPartitionsExchangeFuture exchFut = exchangeFuture(msg.exchangeId(),
+ null,
+ null);
+
+ exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ // Finished future should reply only to sender client node.
+ exchFut.onReceive(node.id(), msg);
+ }
+ });
+ }
+ else
+ exchangeFuture(msg.exchangeId(), null, null).onReceive(node.id(), msg);
+ }
}
finally {
leaveBusy();
@@ -982,7 +1009,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
busy = true;
- Map<Integer, GridDhtPreloaderAssignments<K, V>> assignsMap = new HashMap<>();
+ Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
boolean dummyReassign = exchFut.dummyReassign();
boolean forcePreload = exchFut.forcePreload();
@@ -1017,7 +1044,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
changed |= cacheCtx.topology().afterExchange(exchFut);
// Preload event notification.
- if (cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
+ if (!exchFut.skipPreload() && cacheCtx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED)) {
if (!cacheCtx.isReplicated() || !startEvtFired) {
DiscoveryEvent discoEvt = exchFut.discoveryEvent();
@@ -1043,16 +1070,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
}
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- long delay = cacheCtx.config().getRebalanceDelay();
+ if (!exchFut.skipPreload()) {
+ assignsMap = new HashMap<>();
- GridDhtPreloaderAssignments<K, V> assigns = null;
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ long delay = cacheCtx.config().getRebalanceDelay();
+
+ GridDhtPreloaderAssignments assigns = null;
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if (delay == 0 || forcePreload)
- assigns = cacheCtx.preloader().assign(exchFut);
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ if (delay == 0 || forcePreload)
+ assigns = cacheCtx.preloader().assign(exchFut);
- assignsMap.put(cacheCtx.cacheId(), assigns);
+ assignsMap.put(cacheCtx.cacheId(), assigns);
+ }
}
}
finally {
@@ -1061,7 +1092,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
if (assignsMap != null) {
- for (Map.Entry<Integer, GridDhtPreloaderAssignments<K, V>> e : assignsMap.entrySet()) {
+ for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
int cacheId = e.getKey();
GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
@@ -1113,20 +1144,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** {@inheritDoc} */
@Override public void onTimeout() {
- if (!busyLock.readLock().tryLock())
- return;
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ if (!busyLock.readLock().tryLock())
+ return;
- try {
- if (started.compareAndSet(false, true))
- refreshPartitions();
- }
- finally {
- busyLock.readLock().unlock();
+ try {
+ if (started.compareAndSet(false, true))
+ refreshPartitions();
+ }
+ finally {
+ busyLock.readLock().unlock();
- cctx.time().removeTimeoutObject(this);
+ cctx.time().removeTimeoutObject(ResendTimeoutObject.this);
- pendingResend.compareAndSet(this, null);
- }
+ pendingResend.compareAndSet(ResendTimeoutObject.this, null);
+ }
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index 2e181f9..e0f6181 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -30,7 +30,7 @@ import java.util.*;
* Cache preloader that is responsible for loading cache entries either from remote
* nodes (for distributed cache) or anywhere else at cache startup.
*/
-public interface GridCachePreloader<K, V> {
+public interface GridCachePreloader {
/**
* Starts preloading.
*
@@ -78,7 +78,7 @@ public interface GridCachePreloader<K, V> {
* @param exchFut Exchange future to assign.
* @return Assignments.
*/
- public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut);
+ public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut);
/**
* Adds assignments to preloader.
@@ -86,7 +86,7 @@ public interface GridCachePreloader<K, V> {
* @param assignments Assignments to add.
* @param forcePreload Force preload flag.
*/
- public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload);
+ public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload);
/**
* @param p Preload predicate.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1652fd18/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index 80d3d6b..b4f386f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -31,9 +31,9 @@ import java.util.*;
/**
* Adapter for preloading which always assumes that preloading finished.
*/
-public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V> {
+public class GridCachePreloaderAdapter implements GridCachePreloader {
/** Cache context. */
- protected final GridCacheContext<K, V> cctx;
+ protected final GridCacheContext<?, ?> cctx;
/** Logger.*/
protected final IgniteLogger log;
@@ -50,7 +50,7 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
/**
* @param cctx Cache context.
*/
- public GridCachePreloaderAdapter(GridCacheContext<K, V> cctx) {
+ public GridCachePreloaderAdapter(GridCacheContext<?, ?> cctx) {
assert cctx != null;
this.cctx = cctx;
@@ -126,17 +126,18 @@ public class GridCachePreloaderAdapter<K, V> implements GridCachePreloader<K, V>
// No-op.
}
+ /** {@inheritDoc} */
@Override public void updateLastExchangeFuture(GridDhtPartitionsExchangeFuture lastFut) {
// No-op.
}
/** {@inheritDoc} */
- @Override public GridDhtPreloaderAssignments<K, V> assign(GridDhtPartitionsExchangeFuture exchFut) {
+ @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) {
return null;
}
/** {@inheritDoc} */
- @Override public void addAssignments(GridDhtPreloaderAssignments<K, V> assignments, boolean forcePreload) {
+ @Override public void addAssignments(GridDhtPreloaderAssignments assignments, boolean forcePreload) {
// No-op.
}
}