You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/05 14:47:43 UTC
[3/4] ignite git commit: ignite-5075
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 650f65e..8ddea9c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.internal.NodeStoppingException;
@@ -81,7 +82,16 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
*
*/
@SuppressWarnings("PublicInnerClass")
-public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter implements IgniteCacheOffheapManager {
+public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
+ /** */
+ private GridCacheSharedContext ctx;
+
+ /** */
+ private CacheGroupInfrastructure grp;
+
+ /** */
+ private IgniteLogger log;
+
/** */
// TODO GG-11208 need restore size after restart.
private CacheDataStore locCacheDataStore;
@@ -116,25 +126,24 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override protected void start0() throws IgniteCheckedException {
- super.start0();
+ @Override public void start(GridCacheSharedContext ctx, CacheGroupInfrastructure grp) throws IgniteCheckedException {
+ this.ctx = ctx;
+ this.grp = grp;
+ this.log = ctx.logger(getClass());
- updateValSizeThreshold = cctx.shared().database().pageSize() / 2;
+ updateValSizeThreshold = ctx.database().pageSize() / 2;
- if (cctx.affinityNode()) {
- cctx.shared().database().checkpointReadLock();
+ if (grp.affinityNode()) {
+ ctx.database().checkpointReadLock();
try {
initDataStructures();
- if (cctx.isLocal()) {
- assert cctx.cache() instanceof GridLocalCache : cctx.cache();
-
+ if (grp.isLocal())
locCacheDataStore = createCacheDataStore(0);
- }
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
}
@@ -143,32 +152,29 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
protected void initDataStructures() throws IgniteCheckedException {
- if (cctx.shared().ttl().eagerTtlEnabled()) {
+ if (ctx.ttl().eagerTtlEnabled()) {
String name = "PendingEntries";
long rootPage = allocateForTree();
- pendingEntries = new PendingEntriesTree(cctx,
+ pendingEntries = new PendingEntriesTree(
+ grp,
name,
- cctx.memoryPolicy().pageMemory(),
+ grp.memoryPolicy().pageMemory(),
rootPage,
- cctx.reuseList(),
+ grp.reuseList(),
true);
}
}
/** {@inheritDoc} */
- @Override protected void stop0(final boolean cancel, final boolean destroy) {
- super.stop0(cancel, destroy);
-
- if (destroy && cctx.affinityNode())
+ @Override public void stop(final boolean destroy) {
+ if (destroy && grp.affinityNode())
destroyCacheDataStructures(destroy);
}
/** {@inheritDoc} */
- @Override protected void onKernalStop0(boolean cancel) {
- super.onKernalStop0(cancel);
-
+ @Override public void onKernalStop() {
busyLock.block();
}
@@ -176,7 +182,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
*
*/
protected void destroyCacheDataStructures(boolean destroy) {
- assert cctx.affinityNode();
+ assert grp.affinityNode();
try {
if (locCacheDataStore != null)
@@ -198,7 +204,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return Data store for given entry.
*/
public CacheDataStore dataStore(GridDhtLocalPartition part) {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore;
else {
assert part != null;
@@ -209,7 +215,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public long entriesCount() {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore.size();
long size = 0;
@@ -225,10 +231,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return Partition data.
*/
@Nullable private CacheDataStore partitionData(int p) {
- if (cctx.isLocal())
+ if (grp.isLocal())
return locCacheDataStore;
else {
- GridDhtLocalPartition part = cctx.topology().localPartition(p, AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition part = grp.topology().localPartition(p, AffinityTopologyVersion.NONE, false);
return part != null ? part.dataStore() : null;
}
@@ -240,16 +246,19 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
boolean backup,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
- if (cctx.isLocal())
+ if (grp.isLocal())
return entriesCount(0);
else {
- ClusterNode locNode = cctx.localNode();
+ ClusterNode locNode = ctx.localNode();
long cnt = 0;
- for (GridDhtLocalPartition locPart : cctx.topology().currentLocalPartitions()) {
+ Set<Integer> primaryParts = grp.affinity().cachedAffinity(topVer).primaryPartitions(locNode.id());
+ Set<Integer> backupParts = grp.affinity().cachedAffinity(topVer).backupPartitions(locNode.id());
+
+ for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
if (primary) {
- if (cctx.affinity().primaryByPartition(locNode, locPart.id(), topVer)) {
+ if (primaryParts.contains(locPart.id())) {
cnt += locPart.dataStore().size();
continue;
@@ -257,7 +266,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
if (backup) {
- if (cctx.affinity().backupByPartition(locNode, locPart.id(), topVer))
+ if (backupParts.contains(locPart.id()))
cnt += locPart.dataStore().size();
}
}
@@ -268,13 +277,13 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public long entriesCount(int part) {
- if (cctx.isLocal()) {
+ if (grp.isLocal()) {
assert part == 0;
return locCacheDataStore.size();
}
else {
- GridDhtLocalPartition locPart = cctx.topology().localPartition(part, AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false);
return locPart == null ? 0 : locPart.dataStore().size();
}
@@ -289,10 +298,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, AffinityTopologyVersion topVer) {
assert primary || backup;
- if (cctx.isLocal())
+ if (grp.isLocal())
return Collections.singleton(locCacheDataStore).iterator();
else {
- final Iterator<GridDhtLocalPartition> it = cctx.topology().currentLocalPartitions().iterator();
+ final Iterator<GridDhtLocalPartition> it = grp.topology().currentLocalPartitions().iterator();
if (primary && backup) {
return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() {
@@ -302,8 +311,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}, true);
}
- final Set<Integer> parts = primary ? cctx.affinity().primaryPartitions(cctx.localNodeId(), topVer) :
- cctx.affinity().backupPartitions(cctx.localNodeId(), topVer);
+ final Set<Integer> parts = primary ? grp.affinity().primaryPartitions(ctx.localNodeId(), topVer) :
+ grp.affinity().backupPartitions(ctx.localNodeId(), topVer);
return F.iterator(it, new IgniteClosure<GridDhtLocalPartition, CacheDataStore>() {
@Override public CacheDataStore apply(GridDhtLocalPartition part) {
@@ -319,15 +328,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void invoke(KeyCacheObject key,
+ @Override public void invoke(
+ GridCacheContext cctx,
+ KeyCacheObject key,
GridDhtLocalPartition part,
OffheapInvokeClosure c)
throws IgniteCheckedException {
- dataStore(part).invoke(key, c);
+ dataStore(part).invoke(cctx, key, c);
}
/** {@inheritDoc} */
@Override public void update(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -338,16 +350,17 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
) throws IgniteCheckedException {
assert expireTime >= 0;
- dataStore(part).update(key, partId, val, ver, expireTime, oldRow);
+ dataStore(part).update(cctx, key, partId, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
@Override public void remove(
+ GridCacheContext cctx,
KeyCacheObject key,
int partId,
GridDhtLocalPartition part
) throws IgniteCheckedException {
- dataStore(part).remove(key, partId);
+ dataStore(part).remove(cctx, key, partId);
}
/** {@inheritDoc} */
@@ -356,9 +369,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throws IgniteCheckedException {
KeyCacheObject key = entry.key();
- assert cctx.isLocal() || entry.localPartition() != null : entry;
+ assert grp.isLocal() || entry.localPartition() != null : entry;
- return dataStore(entry.localPartition()).find(key);
+ return dataStore(entry.localPartition()).find(entry.context(), key);
}
/** {@inheritDoc} */
@@ -394,7 +407,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param readers {@code True} to clear readers.
*/
@SuppressWarnings("unchecked")
- @Override public void clear(boolean readers) {
+ @Override public void clear(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
GridIterator<CacheDataRow> it = rowsIterator(true, true, null);
@@ -404,7 +417,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
try {
if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
+ obsoleteVer = ctx.versions().next();
GridCacheEntryEx entry = cctx.cache().entryEx(key);
@@ -439,7 +452,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("unchecked")
- @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(final boolean primary,
+ @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> entriesIterator(
+ final GridCacheContext cctx,
+ final boolean primary,
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary) throws IgniteCheckedException {
@@ -622,12 +637,12 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
private long allocateForTree() throws IgniteCheckedException {
- ReuseList reuseList = cctx.reuseList();
+ ReuseList reuseList = grp.reuseList();
long pageId;
if (reuseList == null || (pageId = reuseList.takeRecycledPage()) == 0L)
- pageId = cctx.memoryPolicy().pageMemory().allocatePage(cctx.cacheId(), INDEX_PARTITION, FLAG_IDX);
+ pageId = grp.memoryPolicy().pageMemory().allocatePage(grp.groupId(), INDEX_PARTITION, FLAG_IDX);
return pageId;
}
@@ -636,7 +651,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
@Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException {
long pageId = allocateForTree();
- return new RootPage(new FullPageId(pageId, cctx.cacheId()), true);
+ return new RootPage(new FullPageId(pageId, grp.groupId()), true);
}
/** {@inheritDoc} */
@@ -646,7 +661,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public ReuseList reuseListForIndex(String idxName) {
- return cctx.reuseList();
+ return grp.reuseList();
}
/** {@inheritDoc} */
@@ -721,14 +736,15 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throws IgniteCheckedException {
final long rootPage = allocateForTree();
- CacheDataRowStore rowStore = new CacheDataRowStore(cctx, cctx.freeList(), p);
+ CacheDataRowStore rowStore = new CacheDataRowStore(grp, grp.freeList(), p);
String idxName = treeName(p);
- CacheDataTree dataTree = new CacheDataTree(idxName,
- cctx.reuseList(),
+ CacheDataTree dataTree = new CacheDataTree(
+ grp,
+ idxName,
+ grp.reuseList(),
rowStore,
- cctx,
rootPage,
true);
@@ -737,7 +753,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public Iterable<CacheDataStore> cacheDataStores() {
- if (cctx.isLocal())
+ if (grp.isLocal())
return Collections.singleton(locCacheDataStore);
return new Iterable<CacheDataStore>() {
@@ -786,18 +802,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (amount != -1 && cleared > amount)
return true;
-
- if (row.key.partition() == -1)
- row.key.partition(cctx.affinity().partition(row.key));
-
- assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
-
- if (pendingEntries.remove(row) != null) {
- if (obsoleteVer == null)
- obsoleteVer = cctx.versions().next();
-
- c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
- }
+// TODO: IGNITE-5075.
+// if (row.key.partition() == -1)
+// row.key.partition(cctx.affinity().partition(row.key));
+//
+// assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
+//
+// if (pendingEntries.remove(row) != null) {
+// if (obsoleteVer == null)
+// obsoleteVer = ctx.versions().next();
+//
+// c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
+// }
cleared++;
}
@@ -895,7 +911,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @return {@code True} if it is possible to update old row data.
* @throws IgniteCheckedException If failed.
*/
- private boolean canUpdateOldRow(@Nullable CacheDataRow oldRow, DataRow dataRow)
+ private boolean canUpdateOldRow(GridCacheContext cctx, @Nullable CacheDataRow oldRow, DataRow dataRow)
throws IgniteCheckedException {
if (oldRow == null || cctx.queries().enabled())
return false;
@@ -916,7 +932,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c)
+ @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
@@ -930,7 +946,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheDataRow oldRow = c.oldRow();
- finishUpdate(c.newRow(), oldRow);
+ finishUpdate(cctx, c.newRow(), oldRow);
break;
}
@@ -938,7 +954,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
case REMOVE: {
CacheDataRow oldRow = c.oldRow();
- finishRemove(key, oldRow);
+ finishRemove(cctx, key, oldRow);
break;
}
@@ -956,18 +972,20 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public CacheDataRow createRow(KeyCacheObject key,
+ @Override public CacheDataRow createRow(
+ GridCacheContext cctx,
+ KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException
{
- int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
+ int cacheId = grp.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
0 : cctx.cacheId();
DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId);
- if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
+ if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow))
dataRow.link(oldRow.link());
else {
CacheObjectContext coCtx = cctx.cacheObjectContext();
@@ -984,7 +1002,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/** {@inheritDoc} */
- @Override public void update(KeyCacheObject key,
+ @Override public void update(
+ GridCacheContext cctx,
+ KeyCacheObject key,
int p,
CacheObject val,
GridCacheVersion ver,
@@ -996,7 +1016,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- int cacheId = cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ?
+ int cacheId = grp.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED ?
cctx.cacheId() : 0;
DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId);
@@ -1009,7 +1029,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
CacheDataRow old;
- if (canUpdateOldRow(oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
+ if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) {
old = oldRow;
dataRow.link(oldRow.link());
@@ -1028,7 +1048,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
old = dataTree.put(dataRow);
}
- finishUpdate(dataRow, old);
+ finishUpdate(cctx, dataRow, old);
}
finally {
busyLock.leaveBusy();
@@ -1040,7 +1060,8 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param oldRow Old row if available.
* @throws IgniteCheckedException If failed.
*/
- private void finishUpdate(CacheDataRow newRow, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow)
+ throws IgniteCheckedException {
if (oldRow == null)
storageSize.incrementAndGet();
@@ -1085,18 +1106,18 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
hasPendingEntries = true;
}
- updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), newRow.value());
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), newRow.value());
}
/** {@inheritDoc} */
- @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException {
+ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException {
if (!busyLock.enterBusy())
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
- finishRemove(key, oldRow);
+ finishRemove(cctx, key, oldRow);
}
finally {
busyLock.leaveBusy();
@@ -1108,7 +1129,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param oldRow Removed row.
* @throws IgniteCheckedException If failed.
*/
- private void finishRemove(KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+ private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
CacheObject val = null;
GridCacheVersion ver = null;
@@ -1133,11 +1154,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (oldRow != null)
rowStore.removeRow(oldRow.link());
- updateIgfsMetrics(key, (oldRow != null ? oldRow.value() : null), null);
+ updateIgfsMetrics(cctx, key, (oldRow != null ? oldRow.value() : null), null);
}
/** {@inheritDoc} */
- @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException {
+ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY);
@@ -1145,7 +1166,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (row != null) {
row.key(key);
- cctx.memoryPolicy().evictionTracker().touchPage(row.link());
+ grp.memoryPolicy().evictionTracker().touchPage(row.link());
}
return row;
@@ -1235,6 +1256,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param newVal New value.
*/
private void updateIgfsMetrics(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject oldVal,
CacheObject newVal
@@ -1242,11 +1264,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
// In case we deal with IGFS cache, count updated data
if (cctx.cache().isIgfsDataCache() &&
!cctx.isNear() &&
- cctx.kernalContext()
+ ctx.kernalContext()
.igfsHelper()
.isIgfsBlockKey(key.value(cctx.cacheObjectContext(), false))) {
- int oldSize = valueLength(oldVal);
- int newSize = valueLength(newVal);
+ int oldSize = valueLength(cctx, oldVal);
+ int newSize = valueLength(cctx, newVal);
int delta = newSize - oldSize;
@@ -1261,7 +1283,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @param val Value.
* @return Length of value.
*/
- private int valueLength(@Nullable CacheObject val) {
+ private int valueLength(GridCacheContext cctx, @Nullable CacheObject val) {
if (val == null)
return 0;
@@ -1333,7 +1355,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
try {
// We can not init data row lazily because underlying buffer can be concurrently cleared.
- initFromLink(cctx, rowData);
+ initFromLink(grp, rowData);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1386,30 +1408,29 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private final CacheDataRowStore rowStore;
/** */
- private final GridCacheContext cctx;
+ private final CacheGroupInfrastructure grp;
/**
* @param name Tree name.
* @param reuseList Reuse list.
* @param rowStore Row store.
- * @param cctx Context.
* @param metaPageId Meta page ID.
* @param initNew Initialize new index.
* @throws IgniteCheckedException If failed.
*/
public CacheDataTree(
+ CacheGroupInfrastructure grp,
String name,
ReuseList reuseList,
CacheDataRowStore rowStore,
- GridCacheContext cctx,
long metaPageId,
boolean initNew
) throws IgniteCheckedException {
super(name,
- cctx.cacheId(),
- cctx.memoryPolicy().pageMemory(),
- cctx.shared().wal(),
- cctx.offheap().globalRemoveId(),
+ grp.groupId(),
+ grp.memoryPolicy().pageMemory(),
+ grp.shared().wal(),
+ grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
DataInnerIO.VERSIONS,
@@ -1418,7 +1439,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
assert rowStore != null;
this.rowStore = rowStore;
- this.cctx = cctx;
+ this.grp = grp;
initTree(initNew);
}
@@ -1460,7 +1481,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException {
- byte[] bytes = key.valueBytes(cctx.cacheObjectContext());
+ byte[] bytes = key.valueBytes(grp.cacheObjectContext());
final long pageId = pageId(link);
final long page = acquirePage(pageId);
@@ -1479,7 +1500,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
if (data.nextLink() == 0) {
long addr = pageAddr + data.offset();
- if (cctx.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED)
+ if (grp.memoryPolicy().config().getPageEvictionMode() != DataPageEvictionMode.DISABLED)
addr += 4; // Skip cache id.
final int len = PageUtils.getInt(addr, 0);
@@ -1526,10 +1547,10 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
// TODO GG-11768.
CacheDataRowAdapter other = new CacheDataRowAdapter(link);
- other.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY);
+ other.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
- byte[] bytes1 = other.key().valueBytes(cctx.cacheObjectContext());
- byte[] bytes2 = key.valueBytes(cctx.cacheObjectContext());
+ byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext());
+ byte[] bytes2 = key.valueBytes(grp.cacheObjectContext());
int lenCmp = Integer.compare(bytes1.length, bytes2.length);
@@ -1571,11 +1592,11 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
private final int partId;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param freeList Free list.
*/
- public CacheDataRowStore(GridCacheContext<?, ?> cctx, FreeList freeList, int partId) {
- super(cctx, freeList);
+ public CacheDataRowStore(CacheGroupInfrastructure grp, FreeList freeList, int partId) {
+ super(grp, freeList);
this.partId = partId;
}
@@ -1774,19 +1795,19 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
}
/**
- * @param cctx Context.
+ * @param grp Cache group.
* @param expireTime Expire time.
* @param link Link.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
- static PendingRow createRowWithKey(GridCacheContext cctx, long expireTime, long link)
+ static PendingRow createRowWithKey(CacheGroupInfrastructure grp, long expireTime, long link)
throws IgniteCheckedException {
PendingRow row = new PendingRow(expireTime, link);
CacheDataRowAdapter rowData = new CacheDataRowAdapter(link);
- rowData.initFromLink(cctx, CacheDataRowAdapter.RowData.KEY_ONLY);
+ rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
row.key = rowData.key();
@@ -1804,10 +1825,9 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
*/
protected static class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> {
/** */
- private final GridCacheContext cctx;
+ private final CacheGroupInfrastructure grp;
/**
- * @param cctx Cache context.
* @param name Tree name.
* @param pageMem Page memory.
* @param metaPageId Meta page ID.
@@ -1816,7 +1836,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
* @throws IgniteCheckedException If failed.
*/
public PendingEntriesTree(
- GridCacheContext cctx,
+ CacheGroupInfrastructure grp,
String name,
PageMemory pageMem,
long metaPageId,
@@ -1824,16 +1844,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
boolean initNew)
throws IgniteCheckedException {
super(name,
- cctx.cacheId(),
+ grp.groupId(),
pageMem,
- cctx.shared().wal(),
- cctx.offheap().globalRemoveId(),
+ grp.shared().wal(),
+ grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
PendingEntryInnerIO.VERSIONS,
PendingEntryLeafIO.VERSIONS);
- this.cctx = cctx;
+ this.grp = grp;
initTree(initNew);
}
@@ -1925,7 +1945,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
@@ -1984,7 +2004,7 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
- return PendingRow.createRowWithKey(((PendingEntriesTree)tree).cctx,
+ return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index afeada5..0ce0a0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -23,6 +23,7 @@ import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -94,25 +95,25 @@ public class CacheDataRowAdapter implements CacheDataRow {
/**
* Read row from data pages.
*
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param rowData Required row data.
* @throws IgniteCheckedException If failed.
*/
- public final void initFromLink(GridCacheContext<?, ?> cctx, RowData rowData) throws IgniteCheckedException {
- initFromLink(cctx, cctx.shared(), cctx.memoryPolicy().pageMemory(), rowData);
+ public final void initFromLink(CacheGroupInfrastructure grp, RowData rowData) throws IgniteCheckedException {
+ initFromLink(grp, grp.shared(), grp.memoryPolicy().pageMemory(), rowData);
}
/**
* Read row from data pages.
* Can be called with cctx == null, if cache instance is unknown, but its ID is stored in the data row.
*
- * @param cctx Cctx.
+ * @param grp Cache group.
* @param sharedCtx Shared context.
* @param pageMem Page memory.
* @param rowData Row data.
*/
public final void initFromLink(
- @Nullable GridCacheContext<?, ?> cctx,
+ @Nullable CacheGroupInfrastructure grp,
GridCacheSharedContext<?, ?> sharedCtx,
PageMemory pageMem,
RowData rowData)
@@ -122,11 +123,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
CacheObjectContext coctx = null;
- if (cctx != null) {
- cacheId = cctx.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
- cctx.cacheId() : 0; // Force cacheId reading for evictable memory policies.
+ if (grp != null) {
+ cacheId = grp.memoryPolicy().config().getPageEvictionMode() == DataPageEvictionMode.DISABLED ?
+ -1 : 0; // Force cacheId reading for evictable memory policies.
- coctx = cctx.cacheObjectContext();
+ coctx = grp.cacheObjectContext();
}
long nextLink = link;
@@ -401,7 +402,6 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param buf Buffer.
* @param incomplete Incomplete object.
* @return Incomplete object.
- * @throws IgniteCheckedException If failed.
*/
private IncompleteObject<?> readIncompleteExpireTime(
ByteBuffer buf,
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
index 1c4c89e..233fa55 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/RowStore.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.processors.cache.database;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
/**
@@ -34,24 +34,20 @@ public class RowStore {
protected final PageMemory pageMem;
/** */
- protected final GridCacheContext<?,?> cctx;
-
- /** */
protected final CacheObjectContext coctx;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param freeList Free list.
*/
- public RowStore(GridCacheContext<?,?> cctx, FreeList freeList) {
- assert cctx != null;
+ public RowStore(CacheGroupInfrastructure grp, FreeList freeList) {
+ assert grp != null;
assert freeList != null;
- this.cctx = cctx;
this.freeList = freeList;
- coctx = cctx.cacheObjectContext();
- pageMem = cctx.memoryPolicy().pageMemory();
+ coctx = grp.cacheObjectContext();
+ pageMem = grp.memoryPolicy().pageMemory();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index 357bf89..1fa909b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -25,6 +25,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -39,14 +40,13 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap {
/** Context. */
- private final GridCacheContext ctx;
+ private final CacheGroupInfrastructure grp;
/**
- * Constructor.
- * @param ctx Context.
+ * @param grp Cache group.
*/
- public GridCachePartitionedConcurrentMap(GridCacheContext ctx) {
- this.ctx = ctx;
+ GridCachePartitionedConcurrentMap(CacheGroupInfrastructure grp) {
+ this.grp = grp;
}
/**
@@ -56,6 +56,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
* @return Local partition.
*/
@Nullable private GridDhtLocalPartition localPartition(
+ GridCacheContext cctx,
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean create
@@ -63,31 +64,31 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
int p = key.partition();
if (p == -1)
- p = ctx.affinity().partition(key);
+ p = cctx.affinity().partition(key);
- return ctx.topology().localPartition(p, topVer, create);
+ return grp.topology().localPartition(p, topVer, create);
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheMapEntry getEntry(KeyCacheObject key) {
- GridDhtLocalPartition part = localPartition(key, AffinityTopologyVersion.NONE, false);
+ @Nullable @Override public GridCacheMapEntry getEntry(GridCacheContext ctx, KeyCacheObject key) {
+ GridDhtLocalPartition part = localPartition(ctx, key, AffinityTopologyVersion.NONE, false);
if (part == null)
return null;
- return part.getEntry(key);
+ return part.getEntry(ctx, key);
}
/** {@inheritDoc} */
- @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
+ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(GridCacheContext ctx, AffinityTopologyVersion topVer, KeyCacheObject key,
@Nullable CacheObject val, boolean create, boolean touch) {
while (true) {
- GridDhtLocalPartition part = localPartition(key, topVer, create);
+ GridDhtLocalPartition part = localPartition(ctx, key, topVer, create);
if (part == null)
return null;
- GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch);
+ GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(ctx, topVer, key, val, create, touch);
if (res != null || !create)
return res;
@@ -100,7 +101,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int size() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += part.size();
return size;
@@ -110,7 +111,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int publicSize() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += part.publicSize();
return size;
@@ -118,17 +119,17 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
/** {@inheritDoc} */
@Override public void incrementPublicSize(GridCacheEntryEx e) {
- localPartition(e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
+ localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).incrementPublicSize(e);
}
/** {@inheritDoc} */
@Override public void decrementPublicSize(GridCacheEntryEx e) {
- localPartition(e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
+ localPartition(e.context(), e.key(), AffinityTopologyVersion.NONE, true).decrementPublicSize(e);
}
/** {@inheritDoc} */
@Override public boolean removeEntry(GridCacheEntryEx entry) {
- GridDhtLocalPartition part = localPartition(entry.key(), AffinityTopologyVersion.NONE, false);
+ GridDhtLocalPartition part = localPartition(entry.context(), entry.key(), AffinityTopologyVersion.NONE, false);
if (part == null)
return false;
@@ -185,7 +186,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
*/
private abstract class PartitionedIterator<T> implements Iterator<T> {
/** Partitions iterator. */
- private Iterator<GridDhtLocalPartition> partsIter = ctx.topology().currentLocalPartitions().iterator();
+ private Iterator<GridDhtLocalPartition> partsIter = grp.topology().currentLocalPartitions().iterator();
/** Current partition iterator. */
private Iterator<T> currIter = partsIter.hasNext() ? iterator(partsIter.next()) :
@@ -249,7 +250,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
@Override public int size() {
int size = 0;
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions())
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
size += set(part).size();
return size;
@@ -257,7 +258,7 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap
/** {@inheritDoc} */
@Override public boolean contains(Object o) {
- for (GridDhtLocalPartition part : ctx.topology().currentLocalPartitions()) {
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
if (set(part).contains(o))
return true;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index f3c3a1b..ef7221f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -71,7 +71,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
private GridCacheSharedContext cctx;
/** Cache ID. */
- private int cacheId;
+ private int grpId;
/** Logger. */
private final IgniteLogger log;
@@ -111,18 +111,18 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/**
* @param cctx Context.
- * @param cacheId Cache ID.
+ * @param grpId Group ID.
* @param exchFut Exchange ID.
* @param similarAffKey Key to find caches with similar affinity.
*/
public GridClientPartitionTopology(
GridCacheSharedContext cctx,
- int cacheId,
+ int grpId,
GridDhtPartitionsExchangeFuture exchFut,
Object similarAffKey
) {
this.cctx = cctx;
- this.cacheId = cacheId;
+ this.grpId = grpId;
this.similarAffKey = similarAffKey;
topVer = exchFut.topologyVersion();
@@ -166,8 +166,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public int cacheId() {
- return cacheId;
+ @Override public int groupId() {
+ return grpId;
}
/** {@inheritDoc} */
@@ -281,7 +281,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node.
- if (oldest.id().equals(loc.id()) || exchFut.isCacheAdded(cacheId, exchId.topologyVersion())) {
+ if (oldest.id().equals(loc.id())) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
@@ -353,8 +353,8 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
}
/** {@inheritDoc} */
- @Override public GridDhtLocalPartition localPartition(Object key, boolean create) {
- return localPartition(1, AffinityTopologyVersion.NONE, create);
+ @Override public GridDhtLocalPartition localPartition(int p) {
+ return localPartition(p, AffinityTopologyVersion.NONE, false);
}
/** {@inheritDoc} */
@@ -997,7 +997,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
- ", cacheId=" + cacheId + ']');
+ ", grpId=" + grpId + ']');
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 2ee6f83..36b501b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -96,9 +96,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** */
private static final long serialVersionUID = 0L;
- /** Preloader. */
- protected GridCachePreloader preldr;
-
/** Multi tx future holder. */
private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>();
@@ -157,7 +154,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
* @param ctx Context.
*/
protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) {
- this(ctx, new GridCachePartitionedConcurrentMap(ctx));
+ this(ctx, new GridCachePartitionedConcurrentMap(ctx.group()));
}
/**
@@ -182,43 +179,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
}
/** {@inheritDoc} */
- @Override public void stop() {
- super.stop();
-
- if (preldr != null)
- preldr.stop();
-
- // Clean up to help GC.
- preldr = null;
- }
-
- /** {@inheritDoc} */
@Override public void onReconnected() {
super.onReconnected();
ctx.affinity().onReconnected();
// TODO IGNITE-5075.
- //top.onReconnected();
-
- if (preldr != null)
- preldr.onReconnected();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- super.onKernalStart();
-
- if (preldr != null)
- preldr.onKernalStart();
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop() {
- super.onKernalStop();
-
- if (preldr != null)
- preldr.onKernalStop();
+// top.onReconnected();
+//
+// if (preldr != null)
+// preldr.onReconnected();
}
/** {@inheritDoc} */
@@ -259,16 +229,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
/** {@inheritDoc} */
@Override public GridCachePreloader preloader() {
- return preldr;
- }
-
- /**
- * @return DHT preloader.
- */
- public GridDhtPreloader dhtPreloader() {
- assert preldr instanceof GridDhtPreloader;
-
- return (GridDhtPreloader)preldr;
+ return ctx.group().preloader();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 7bc17a1..8031c8f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -166,7 +166,7 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
* Initializes future.
*/
void init() {
- GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
+ GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer);
if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
@@ -292,9 +292,11 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col
*/
private boolean map(KeyCacheObject key) {
try {
+ int keyPart = cctx.affinity().partition(key);
+
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
- cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
- cache().topology().localPartition(key, false);
+ cache().topology().localPartition(keyPart, topVer, true) :
+ cache().topology().localPartition(keyPart);
if (part == null)
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
index 9cc69b5..8860b5e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -197,8 +197,9 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
*
*/
private void map() {
- if (cctx.dht().dhtPreloader().needForceKeys()) {
- GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+ if (cctx.group().preloader().needForceKeys()) {
+ GridDhtFuture<Object> fut = cctx.group().preloader().request(
+ cctx,
Collections.singleton(key),
topVer);
@@ -268,9 +269,11 @@ public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCa
* @return {@code True} if mapped.
*/
private boolean map(KeyCacheObject key) {
+ int keyPart = cctx.affinity().partition(key);
+
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
- cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
- cache().topology().localPartition(key, false);
+ cache().topology().localPartition(keyPart, topVer, true) :
+ cache().topology().localPartition(keyPart);
if (part == null)
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 5425954..62fe24f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -30,16 +31,19 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
@@ -101,7 +105,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
private final GridFutureAdapter<?> rent;
/** Context. */
- private final GridCacheContext cctx;
+ private final GridCacheSharedContext ctx;
+
+ /** */
+ private final CacheGroupInfrastructure grp;
/** Create time. */
@GridToStringExclude
@@ -133,18 +140,22 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
private volatile boolean shouldBeRenting;
/**
- * @param cctx Context.
+ * @param ctx Context.
* @param id Partition ID.
* @param entryFactory Entry factory.
*/
@SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor")
- GridDhtLocalPartition(GridCacheContext cctx, int id, GridCacheMapEntryFactory entryFactory) {
- super(cctx, entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / cctx.affinity().partitions()));
+ GridDhtLocalPartition(GridCacheSharedContext ctx,
+ CacheGroupInfrastructure grp,
+ int id,
+ GridCacheMapEntryFactory entryFactory) {
+ super(entryFactory, Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()));
this.id = id;
- this.cctx = cctx;
+ this.ctx = ctx;
+ this.grp = grp;
- log = U.logger(cctx.kernalContext(), logRef, this);
+ log = U.logger(ctx.kernalContext(), logRef, this);
rent = new GridFutureAdapter<Object>() {
@Override public String toString() {
@@ -152,15 +163,16 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
};
- int delQueueSize = CU.isSystemCache(cctx.name()) ? 100 :
- Math.max(MAX_DELETE_QUEUE_SIZE / cctx.affinity().partitions(), 20);
+ // TODO IGNITE-5075.
+ int delQueueSize = CU.isSystemCache(grp.name()) ? 100 :
+ Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
rmvQueueMaxSize = U.ceilPow2(delQueueSize);
rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
try {
- store = cctx.offheap().createCacheDataStore(id);
+ store = grp.offheap().createCacheDataStore(id);
}
catch (IgniteCheckedException e) {
// TODO ignite-db
@@ -235,7 +247,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if partition is empty.
*/
public boolean isEmpty() {
- if (cctx.allowFastEviction())
+ if (grp.allowFastEviction())
return size() == 0;
return store.size() == 0 && size() == 0;
@@ -307,6 +319,17 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
}
/**
+ * @param key Key.
+ * @param ver Version.
+ */
+ private void removeVersionedEntry(KeyCacheObject key, GridCacheVersion ver) {
+ GridCacheMapEntry entry = getEntry(null, key);
+
+ if (entry != null && entry.markObsoleteVersion(ver))
+ removeEntry(entry);
+ }
+
+ /**
*
*/
public void cleanupRemoveQueue() {
@@ -314,10 +337,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
RemovedEntryHolder item = rmvQueue.pollFirst();
if (item != null)
- cctx.dht().removeVersionedEntry(item.key(), item.version());
+ removeVersionedEntry(item.key(), item.version());
}
- if (!cctx.isDrEnabled()) {
+ if (!grp.isDrEnabled()) {
RemovedEntryHolder item = rmvQueue.peekFirst();
while (item != null && item.expireTime() < U.currentTimeMillis()) {
@@ -326,7 +349,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (item == null)
break;
- cctx.dht().removeVersionedEntry(item.key(), item.version());
+ removeVersionedEntry(item.key(), item.version());
item = rmvQueue.peekFirst();
}
@@ -486,13 +509,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code true} if cas succeeds.
*/
private boolean casState(long state, GridDhtPartitionState toState) {
- if (cctx.shared().database().persistenceEnabled()) {
+ if (ctx.database().persistenceEnabled()) {
synchronized (this) {
boolean update = this.state.compareAndSet(state, setPartState(state, toState));
if (update)
try {
- cctx.shared().wal().log(new PartitionMetaStateRecord(cctx.cacheId(), id, toState, updateCounter()));
+ // TODO IGNITE-5075.
+ ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, updateCounter()));
}
catch (IgniteCheckedException e) {
log.error("Error while writing to log", e);
@@ -610,13 +634,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @param updateSeq Update sequence.
*/
void tryEvictAsync(boolean updateSeq) {
- assert cctx.kernalContext().state().active();
+ assert ctx.kernalContext().state().active();
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
- if (isEmpty() && !QueryUtils.isEnabled(cctx.config()) && getSize(state) == 0 &&
+ if (isEmpty() && !grp.queriesEnabled() && getSize(state) == 0 &&
partState == RENTING && getReservations(state) == 0 && !groupReserved() &&
casState(state, EVICTED)) {
if (log.isDebugEnabled())
@@ -626,7 +650,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
finishDestroy(updateSeq);
}
else if (partState == RENTING || shouldBeRenting())
- cctx.preloader().evictPartitionAsync(this);
+ grp.preloader().evictPartitionAsync(this);
}
/**
@@ -702,18 +726,19 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
assert state() == EVICTED : this;
assert evictGuard.get() == -1;
- if (cctx.isDrEnabled())
- cctx.dr().partitionEvicted(id);
-
- cctx.continuousQueries().onPartitionEvicted(id);
-
- cctx.dataStructures().onPartitionEvicted(id);
+// TODO IGNITE-5075.
+// if (cctx.isDrEnabled())
+// cctx.dr().partitionEvicted(id);
+//
+// cctx.continuousQueries().onPartitionEvicted(id);
+//
+// cctx.dataStructures().onPartitionEvicted(id);
destroyCacheDataStore();
rent.onDone();
- ((GridDhtPreloader)cctx.preloader()).onPartitionEvicted(this, updateSeq);
+ ((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeq);
clearDeferredDeletes();
}
@@ -753,7 +778,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
try {
CacheDataStore store = dataStore();
- cctx.offheap().destroyCacheDataStore(id, store);
+ grp.offheap().destroyCacheDataStore(id, store);
}
catch (IgniteCheckedException e) {
log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e);
@@ -777,7 +802,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if local node is primary for this partition.
*/
public boolean primary(AffinityTopologyVersion topVer) {
- return cctx.affinity().primaryByPartition(cctx.localNode(), id, topVer);
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+ return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
}
/**
@@ -785,7 +812,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @return {@code True} if local node is backup for this partition.
*/
public boolean backup(AffinityTopologyVersion topVer) {
- return cctx.affinity().backupByPartition(cctx.localNode(), id, topVer);
+ List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
+
+ return nodes.indexOf(ctx.localNode()) > 0;
}
/**
@@ -829,9 +858,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
* @throws NodeStoppingException If node stopping.
*/
public void clearAll() throws NodeStoppingException {
- GridCacheVersion clearVer = cctx.versions().next();
+ GridCacheVersion clearVer = ctx.versions().next();
- boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
+ // TODO IGNITE-5075.
+ boolean rec = grp.shared().gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
Iterator<GridCacheMapEntry> it = allEntries().iterator();
@@ -840,7 +870,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
while (it.hasNext()) {
GridCacheMapEntry cached = null;
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
cached = it.next();
@@ -850,20 +880,21 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
if (!cached.isInternal()) {
if (rec) {
- cctx.events().addEvent(cached.partition(),
- cached.key(),
- cctx.localNodeId(),
- (IgniteUuid)null,
- null,
- EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
- null,
- false,
- cached.rawGet(),
- cached.hasValue(),
- null,
- null,
- null,
- false);
+ // TODO IGNITE-5075.
+// cctx.events().addEvent(cached.partition(),
+// cached.key(),
+// ctx.localNodeId(),
+// (IgniteUuid)null,
+// null,
+// EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+// null,
+// false,
+// cached.rawGet(),
+// cached.hasValue(),
+// null,
+// null,
+// null,
+// false);
}
}
}
@@ -885,28 +916,34 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
- if (!cctx.allowFastEviction()) {
+ if (!grp.allowFastEviction()) {
+ GridCacheContext cctx = grp.cacheContext();
+
try {
- GridIterator<CacheDataRow> it0 = cctx.offheap().iterator(id);
+ GridIterator<CacheDataRow> it0 = grp.offheap().iterator(id);
while (it0.hasNext()) {
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
CacheDataRow row = it0.next();
- GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(),
- row.key(), null, true, false);
+ GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx,
+ grp.affinity().lastVersion(),
+ row.key(),
+ null,
+ true,
+ false);
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
if (rec) {
cctx.events().addEvent(cached.partition(),
cached.key(),
- cctx.localNodeId(),
+ ctx.localNodeId(),
(IgniteUuid)null,
null,
EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
@@ -927,7 +964,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
break; // Partition is already concurrently cleared and evicted.
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
}
@@ -950,7 +987,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements
*/
private void clearDeferredDeletes() {
for (RemovedEntryHolder e : rmvQueue)
- cctx.dht().removeVersionedEntry(e.key(), e.version());
+ removeVersionedEntry(e.key(), e.version());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/0096266b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index cf12986..9617a0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -140,7 +140,7 @@ public interface GridDhtPartitionTopology {
* @throws GridDhtInvalidPartitionException If partition is evicted or absent and
* does not belong to this node.
*/
- @Nullable public GridDhtLocalPartition localPartition(Object key, boolean create)
+ @Nullable public GridDhtLocalPartition localPartition(int part)
throws GridDhtInvalidPartitionException;
/**