You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/22 15:13:17 UTC
[49/50] [abbrv] ignite git commit: ignite-5075
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
index 08857ee..71c8014 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheDatabaseSharedManager.java
@@ -94,6 +94,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -214,7 +215,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private volatile GridFutureAdapter<Void> enableChangeApplied;
/** */
- public ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
+ private ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
private long checkpointFreq;
@@ -388,25 +389,25 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!reconnect && !cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) {
Collection<String> cacheNames = new HashSet<>();
- for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
+ // TODO IGNITE-5075 group descriptors.
+ for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration()) {
if (CU.isSystemCache(ccfg.getName())) {
- storeMgr.initializeForCache(ccfg);
+ storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg);
cacheNames.add(ccfg.getName());
}
+ }
for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
if (!CU.isSystemCache(ccfg.getName())) {
- storeMgr.initializeForCache(ccfg);
+ storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg);
cacheNames.add(ccfg.getName());
}
- for (String name : cctx.pageStore().savedCacheNames()) {
- CacheConfiguration ccfg = cctx.pageStore().readConfiguration(name);
-
- if (ccfg != null && !cacheNames.contains(name))
- storeMgr.initializeForCache(ccfg);
+ for (CacheConfiguration ccfg : cctx.pageStore().readCacheConfigurations().values()) {
+ if (!cacheNames.contains(ccfg.getName()))
+ storeMgr.initializeForCache(cctx.cache().cacheDescriptors().get(ccfg.getName()).groupDescriptor(), ccfg);
}
readCheckpointAndRestoreMemory();
@@ -490,7 +491,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public void onCacheStop(GridCacheContext cctx) {
- snapshotMgr.onCacheStop(cctx);
+ snapshotMgr.onCacheStop(cctx); // TODO IGNITE-5075.
}
/** {@inheritDoc} */
@@ -527,7 +528,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- protected long[] calculateFragmentSizes(int concLvl, long cacheSize) {
+ private long[] calculateFragmentSizes(int concLvl, long cacheSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
@@ -575,7 +576,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
- snapshotMgr.onChangeTrackerPage(page,fullId,pageMem);
+ snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
},
this
@@ -693,7 +694,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) {
+ @Override public void onCacheGroupsStopped(
+ Collection<IgniteBiTuple<CacheGroupInfrastructure, Boolean>> stoppedGrps) {
try {
waitForCheckpoint("caches stop");
}
@@ -703,30 +705,30 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
- for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) {
+ for (IgniteBiTuple<CacheGroupInfrastructure, Boolean> tup : stoppedGrps) {
PageMemoryEx pageMem = (PageMemoryEx)tup.get1().memoryPolicy().pageMemory();
- Collection<Integer> cacheIds = destroyed.get(pageMem);
+ Collection<Integer> grpIds = destroyed.get(pageMem);
- if (cacheIds == null) {
- cacheIds = new HashSet<>();
+ if (grpIds == null) {
+ grpIds = new HashSet<>();
- destroyed.put(pageMem, cacheIds);
+ destroyed.put(pageMem, grpIds);
}
- cacheIds.add(tup.get1().cacheId());
+ grpIds.add(tup.get1().groupId());
- pageMem.onCacheDestroyed(tup.get1().cacheId());
+ pageMem.onCacheGroupDestroyed(tup.get1().groupId());
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
- final Collection<Integer> cacheIds = entry.getValue();
+ final Collection<Integer> grpIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() {
- @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
- return cacheIds.contains(cacheId);
+ @Override public boolean apply(Integer grpId, Long pageId, Integer tag) {
+ return grpIds.contains(grpId);
}
}, false));
}
@@ -740,15 +742,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
if (cctx.pageStore() != null) {
- for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) {
- GridCacheContext cacheCtx = tup.get1();
+ for (IgniteBiTuple<CacheGroupInfrastructure, Boolean> tup : stoppedGrps) {
+ CacheGroupInfrastructure grp = tup.get1();
try {
- cctx.pageStore().shutdownForCache(cacheCtx, tup.get2());
+ cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
- "[cache=" + cacheCtx.name() + "]", e);
+ "[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
@@ -825,15 +827,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointLock.readLock().unlock();
if (checkpointer != null) {
- Collection<GridCacheContext> cacheCtxs = context().cacheContexts();
+ Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies();
- for (GridCacheContext cacheCtx : cacheCtxs) {
- PageMemoryEx mem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory();
+ if (memPlcs != null) {
+ for (MemoryPolicy memPlc : memPlcs) {
+ PageMemoryEx mem = (PageMemoryEx)memPlc.pageMemory();
- if (mem != null && !mem.safeToUpdate()) {
- checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
+ if (mem != null && !mem.safeToUpdate()) {
+ checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
- break;
+ break;
+ }
}
}
}
@@ -875,34 +879,34 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
reservedForExchange = new HashMap<>();
- for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
continue;
- for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) {
- if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= ggWalRebalanceThreshold)
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
+ if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().fullSize() <= ggWalRebalanceThreshold)
continue;
for (Long cpTs : checkpointHist.checkpoints()) {
try {
CheckpointEntry entry = checkpointHist.entry(cpTs);
- if (!entry.cacheStates.containsKey(cacheCtx.cacheId()) ||
- !entry.cacheStates.get(cacheCtx.cacheId()).partitions().containsKey(part.id()))
+ if (!entry.cacheGrpStates.containsKey(grp.groupId()) ||
+ !entry.cacheGrpStates.get(grp.groupId()).partitions().containsKey(part.id()))
continue;
- WALPointer ptr = searchPartitionCounter(cacheCtx, part.id(), entry.checkpointTimestamp());
+ WALPointer ptr = searchPartitionCounter(grp.groupId(), part.id(), entry.checkpointTimestamp());
if (ptr != null && cctx.wal().reserve(ptr)) {
- Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
+ Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(grp.groupId());
if (cacheMap == null) {
cacheMap = new HashMap<>();
- reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
+ reservedForExchange.put(grp.groupId(), cacheMap);
}
- cacheMap.put(part.id(), new T2<>(entry.partitionCounter(cacheCtx.cacheId(), part.id()), ptr));
+ cacheMap.put(part.id(), new T2<>(entry.partitionCounter(grp.groupId(), part.id()), ptr));
}
}
catch (IgniteCheckedException ex) {
@@ -947,8 +951,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) {
- WALPointer ptr = searchPartitionCounter(cctx.cacheContext(cacheId), partId, cntr);
+ @Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
+ WALPointer ptr = searchPartitionCounter(grpId, partId, cntr);
if (ptr == null)
return false;
@@ -965,7 +969,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
if (reserved)
- reservedForPreloading.put(new T2<>(cacheId, partId), new T2<>(cntr, ptr));
+ reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr));
return reserved;
}
@@ -1030,30 +1034,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * Schedules partition destroy during next checkpoint. This method must be called inside checkpoint read lock.
- *
- * @param cacheCtx Cache context.
- * @param partId Partition ID.
- */
- public void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) {
- Checkpointer cp = checkpointer;
-
- if (cp != null)
- cp.schedulePartitionDestroy(cacheCtx, partId);
- }
-
- /**
* Cancels partition destroy if it has not begun yet. Otherwise, will wait for cleanup to finish.
*
- * @param cacheCtx Cache context.
+ * @param grpId Cache group ID.
* @param partId Partition ID.
*/
- public void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId)
+ void cancelOrWaitPartitionDestroy(int grpId, int partId)
throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp != null)
- cp.cancelOrWaitPartitionDestroy(cacheCtx, partId);
+ cp.cancelOrWaitPartitionDestroy(grpId, partId);
}
@@ -1061,12 +1052,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
- * @param cacheCtx Cache context.
+ * @param grpId Cache group ID.
* @param part Partition ID.
* @param partCntrSince Partition counter.
* @return WAL pointer or {@code null} if failed to search.
*/
- public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, Long partCntrSince) {
+ WALPointer searchPartitionCounter(int grpId, int part, Long partCntrSince) {
boolean hasGap = false;
WALPointer first = null;
@@ -1074,7 +1065,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
try {
CheckpointEntry entry = checkpointHist.entry(cpTs);
- Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part);
+ Long foundCntr = entry.partitionCounter(grpId, part);
if (foundCntr != null) {
if (foundCntr <= partCntrSince) {
@@ -1276,7 +1267,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int cacheId = pageRec.fullPageId().cacheId();
long pageId = pageRec.fullPageId().pageId();
- PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId);
long page = pageMem.acquirePage(cacheId, pageId, true);
@@ -1306,7 +1297,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final int cId = destroyRec.cacheId();
final int pId = destroyRec.partitionId();
- PageMemoryEx pageMem = getPageMemoryForCacheId(cId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(cId);
pageMem.clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
@@ -1324,7 +1315,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
int cacheId = r.cacheId();
long pageId = r.pageId();
- PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId);
+ PageMemoryEx pageMem = getPageMemoryForCacheGroup(cacheId);
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
@@ -1369,17 +1360,19 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
- * @param cacheId Cache id.
+ * @param grpId Cache group id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor.
*/
- private PageMemoryEx getPageMemoryForCacheId(int cacheId) throws IgniteCheckedException {
+ private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
+ // TODO IGNITE-5075: group ID should not change.
+ // TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
String memPlcName = sharedCtx
.cache()
- .cacheDescriptor(cacheId)
- .cacheConfiguration()
+ .cacheGroupDescriptors().get(grpId)
+ .config()
.getMemoryPolicyName();
return (PageMemoryEx) sharedCtx.database().memoryPolicy(memPlcName).pageMemory();
@@ -1455,35 +1448,31 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private void restorePartitionState(
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
- Collection<GridCacheContext> cacheContexts = cctx.cacheContexts();
-
- for (GridCacheContext context : cacheContexts) {
- int cacheId = context.cacheId();
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ int grpId = grp.groupId();
- GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+ PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
- PageMemoryEx pageMem = (PageMemoryEx)cacheCtx.memoryPolicy().pageMemory();
+ for (int i = 0; i < grp.affinity().partitions(); i++) {
+ if (storeMgr.exists(grpId, i)) {
+ storeMgr.ensure(grpId, i);
- for (int i = 0; i < context.affinity().partitions(); i++) {
- if (storeMgr.exists(cacheId, i)) {
- storeMgr.ensure(cacheId, i);
-
- if (storeMgr.pages(cacheId, i) <= 1)
+ if (storeMgr.pages(grpId, i) <= 1)
continue;
- long partMetaId = pageMem.partitionMetaPageId(cacheId, i);
- long partMetaPage = pageMem.acquirePage(cacheId, partMetaId);
+ long partMetaId = pageMem.partitionMetaPageId(grpId, i);
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
- long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage);
+ long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
boolean changed = false;
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
- T2<Integer, Long> fromWal = partStates.get(new T2<>(cacheId, i));
+ T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
- GridDhtLocalPartition part = context.topology()
+ GridDhtLocalPartition part = grp.topology()
.localPartition(i, AffinityTopologyVersion.NONE, true);
assert part != null;
@@ -1496,7 +1485,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
changed = updateState(part, stateId);
if (stateId == GridDhtPartitionState.OWNING.ordinal()) {
- cacheCtx.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2());
+ grp.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2());
if (part.initialUpdateCounter() < fromWal.get2()) {
part.initialUpdateCounter(fromWal.get2());
@@ -1509,11 +1498,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
changed = updateState(part, (int)io.getPartitionState(pageAddr));
}
finally {
- pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, changed);
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
}
}
finally {
- pageMem.releasePage(cacheId, partMetaId, partMetaPage);
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
}
@@ -1551,6 +1540,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
case CREATE:
case UPDATE:
cacheCtx.offheap().update(
+ cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
@@ -1564,7 +1554,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
break;
case DELETE:
- cacheCtx.offheap().remove(dataEntry.key(), dataEntry.partitionId(), locPart);
+ cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), dataEntry.partitionId(), locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter());
@@ -1697,7 +1687,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
ch.force(true);
return type == CheckpointEntryType.START ?
- new CheckpointEntry(cpTs, ptr, cpId, rec.cacheStates()) : null;
+ new CheckpointEntry(cpTs, ptr, cpId, rec.cacheGroupStates()) : null;
}
catch (IOException e) {
throw new IgniteCheckedException(e);
@@ -1724,28 +1714,29 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
final Set<PageMemoryEx> pageMemSet = new HashSet<>();
for (PartitionDestroyRequest req : reqs) {
- Collection<Integer> partIds = filterMap.get(req.cacheId);
+ Collection<Integer> partIds = filterMap.get(req.grpId);
if (partIds == null) {
partIds = new HashSet<>();
- filterMap.put(req.cacheId, partIds);
+ filterMap.put(req.grpId, partIds);
}
partIds.add(req.partId);
- pageMemSet.add((PageMemoryEx)cctx.cacheContext(req.cacheId).memoryPolicy().pageMemory());
+ // TODO IGNITE-5075.
+ pageMemSet.add((PageMemoryEx)cctx.cache().cacheGroup(req.grpId).memoryPolicy().pageMemory());
}
for (PageMemoryEx pageMem : pageMemSet) {
IgniteInternalFuture<Void> clearFut = pageMem.clearAsync(new P3<Integer, Long, Integer>() {
- @Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
- assert cacheId != null;
+ @Override public boolean apply(Integer grpId, Long pageId, Integer tag) {
+ assert grpId != null;
assert pageId != null;
int partId = PageIdUtils.partId(pageId);
- Collection<Integer> parts = filterMap.get(cacheId);
+ Collection<Integer> parts = filterMap.get(grpId);
return parts != null && parts.contains(partId);
}
@@ -1758,7 +1749,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
assert !req.allowFastEviction;
// Tag should never grow in this case.
- cctx.pageStore().onPartitionDestroyed(req.cacheId, req.partId, 1);
+ cctx.pageStore().onPartitionDestroyed(req.grpId, req.partId, 1);
}
catch (IgniteCheckedException e) {
req.onDone(e);
@@ -1885,36 +1876,36 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * @param cacheCtx Cache context.
+ * @param grp Cache group.
* @param partId Partition ID.
*/
- private void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) {
+ private void schedulePartitionDestroy(CacheGroupInfrastructure grp, int partId) {
synchronized (this) {
- scheduledCp.destroyQueue.addDestroyRequest(cacheCtx, partId);
+ scheduledCp.destroyQueue.addDestroyRequest(grp, partId);
}
wakeupForCheckpoint(partDestroyCheckpointDelay, "partition destroy");
}
/**
- * @param cacheCtx Cache context.
+ * @param grpId Cache group ID.
* @param partId Partition ID.
*/
- private void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId)
+ private void cancelOrWaitPartitionDestroy(int grpId, int partId)
throws IgniteCheckedException {
CheckpointProgress cur = curCpProgress;
PartitionDestroyRequest req;
if (cur != null) {
- req = cur.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId);
+ req = cur.destroyQueue.cancelDestroy(grpId, partId);
if (req != null)
req.waitCompleted();
}
synchronized (this) {
- req = scheduledCp.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId);
+ req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId);
}
if (req != null)
@@ -2013,7 +2004,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (req != null) {
// Log destroy record before actual partition clear.
- lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.cacheId, req.partId));
+ lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.grpId, req.partId));
if (reqs == null)
reqs = new ArrayList<>();
@@ -2131,18 +2122,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx0);
- Collection<GridCacheContext> cacheCtxs = ((GridCacheSharedContext<Object, Object>)cctx).cacheContexts();
+ for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
+ if (grp.isLocal())
+ continue;
- for (GridCacheContext cacheCtx : cacheCtxs) {
CacheState state = new CacheState();
- if (cacheCtx.isLocal())
- continue;
+ for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
+ state.addPartitionState(part.id(), part.dataStore().fullSize(), part.lastAppliedUpdate());
- for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions())
- state.addPartitionState(part.id(), part.dataStore().size(), part.lastAppliedUpdate());
-
- cpRec.addCacheState(cacheCtx.cacheId(), state);
+ cpRec.addCacheGroupState(grp.groupId(), state);
}
if (curr.nextSnapshot)
@@ -2638,15 +2627,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
try {
entry.initIfNeeded(cctx);
- if (entry.cacheStates == null)
+ if (entry.cacheGrpStates == null)
continue;
- CacheState cacheState = entry.cacheStates.get(cacheId);
+ CacheState grpState = entry.cacheGrpStates.get(cacheId);
- if (cacheState == null)
+ if (grpState == null)
continue;
- CacheState.PartitionState partState = cacheState.partitions().get(partId);
+ CacheState.PartitionState partState = grpState.partitions().get(partId);
if (partState != null) {
if (cctx.wal().reserve(entry.checkpointMark()))
@@ -2687,7 +2676,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private UUID cpId;
/** Cache states. Initialized lazily. */
- private Map<Integer, CacheState> cacheStates;
+ private Map<Integer, CacheState> cacheGrpStates;
/** Initialization exception. */
private IgniteCheckedException initEx;
@@ -2713,13 +2702,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint mark pointer.
* @param cpId Checkpoint ID.
- * @param cacheStates Cache states.
+ * @param cacheGrpStates Cache groups states.
*/
- private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheStates) {
+ private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheGrpStates) {
this.cpTs = cpTs;
this.cpMark = cpMark;
this.cpId = cpId;
- this.cacheStates = cacheStates;
+ this.cacheGrpStates = cacheGrpStates;
initGuard = 1;
initLatch = new CountDownLatch(0);
@@ -2761,17 +2750,17 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param part Partition ID.
* @return Partition counter or {@code null} if not found.
*/
- private Long partitionCounter(int cacheId, int part) {
+ private Long partitionCounter(int grpId, int part) {
assert initGuard != 0;
- if (initEx != null || cacheStates == null)
+ if (initEx != null || cacheGrpStates == null)
return null;
- CacheState state = cacheStates.get(cacheId);
+ CacheState state = cacheGrpStates.get(grpId);
if (state != null) {
CacheState.PartitionState partState = state.partitions().get(part);
@@ -2794,7 +2783,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
CheckpointRecord rec = (CheckpointRecord)tup.get2();
cpId = rec.checkpointId();
- cacheStates = rec.cacheStates();
+ cacheGrpStates = rec.cacheGroupStates();
}
else
initEx = new IgniteCheckedException("Failed to find checkpoint record at " +
@@ -2827,16 +2816,16 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
new ConcurrentHashMap<>();
/**
- * @param cacheCtx Cache context.
+ * @param grp Cache group.
* @param partId Partition ID to destroy.
*/
- private void addDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) {
- PartitionDestroyRequest req = new PartitionDestroyRequest(cacheCtx, partId);
+ private void addDestroyRequest(CacheGroupInfrastructure grp, int partId) {
+ PartitionDestroyRequest req = new PartitionDestroyRequest(grp, partId);
- PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(cacheCtx.cacheId(), partId), req);
+ PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grp.groupId(), partId), req);
assert old == null : "Must wait for old destroy request to finish before adding a new one " +
- "[cacheId=" + cacheCtx.cacheId() + ", cacheName=" + cacheCtx.name() + ", partId=" + partId + ']';
+ "[grpId=" + grp.groupId() + ", name=" + grp.cacheOrGroupName() + ", partId=" + partId + ']';
}
/**
@@ -2866,10 +2855,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
*/
private static class PartitionDestroyRequest {
/** */
- private int cacheId;
+ private int grpId;
/** */
- private String cacheName;
+ private String name;
/** */
private int partId;
@@ -2884,13 +2873,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
private GridFutureAdapter<Void> destroyFut;
/**
- * @param cacheCtx Cache context.
+ * @param grp Cache group.
* @param partId Partition ID.
*/
- private PartitionDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) {
- cacheId = cacheCtx.cacheId();
- cacheName = cacheCtx.name();
- allowFastEviction = cacheCtx.allowFastEviction();
+ private PartitionDestroyRequest(CacheGroupInfrastructure grp, int partId) {
+ grpId = grp.groupId();
+ name = grp.cacheOrGroupName();
+ allowFastEviction = grp.allowFastEviction();
this.partId = partId;
}
@@ -2958,7 +2947,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** {@inheritDoc} */
@Override public String toString() {
- return "PartitionDestroyRequest [cacheId=" + cacheId + ", cacheName=" + cacheName +
+ return "PartitionDestroyRequest [grpId=" + grpId + ", name=" + name +
", partId=" + partId + ']';
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
index 5253818..f8f91df 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/GridCacheOffheapManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.database;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSna
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
@@ -80,60 +82,71 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
RootPage reuseListRoot = metas.reuseListRoot;
- reuseList = new ReuseListImpl(cctx.cacheId(),
- cctx.name(),
- cctx.memoryPolicy().pageMemory(),
- cctx.shared().wal(),
+ reuseList = new ReuseListImpl(grp.groupId(),
+ grp.cacheOrGroupName(),
+ grp.memoryPolicy().pageMemory(),
+ ctx.wal(),
reuseListRoot.pageId().pageId(),
reuseListRoot.isAllocated());
RootPage metastoreRoot = metas.treeRoot;
- metaStore = new MetadataStorage(cctx.memoryPolicy().pageMemory(),
- cctx.shared().wal(),
+ metaStore = new MetadataStorage(grp.memoryPolicy().pageMemory(),
+ ctx.wal(),
globalRemoveId(),
- cctx.cacheId(),
+ grp.groupId(),
PageIdAllocator.INDEX_PARTITION,
PageIdAllocator.FLAG_IDX,
reuseList,
metastoreRoot.pageId().pageId(),
metastoreRoot.isAllocated());
- if (cctx.shared().ttl().eagerTtlEnabled()) {
- final String name = "PendingEntries";
+ ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
+ }
- RootPage pendingRootPage = metaStore.getOrAllocateForTree(name);
+ /** {@inheritDoc} */
+ @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException {
+ if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) {
+ ctx.database().checkpointReadLock();
- pendingEntries = new PendingEntriesTree(
- cctx,
- name,
- cctx.memoryPolicy().pageMemory(),
- pendingRootPage.pageId().pageId(),
- reuseList,
- pendingRootPage.isAllocated()
- );
+ try {
+ final String name = "PendingEntries";
+
+ // TODO IGNITE-5075: per cache?
+ RootPage pendingRootPage = metaStore.getOrAllocateForTree(name);
+
+ pendingEntries = new PendingEntriesTree(
+ grp,
+ name,
+ grp.memoryPolicy().pageMemory(),
+ pendingRootPage.pageId().pageId(),
+ reuseList,
+ pendingRootPage.isAllocated()
+ );
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
}
-
- ((GridCacheDatabaseSharedManager)cctx.shared().database()).addCheckpointListener(this);
}
/** {@inheritDoc} */
@Override protected CacheDataStore createCacheDataStore0(final int p)
throws IgniteCheckedException {
- GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.shared().database();
+ GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ctx.database();
- if (!cctx.allowFastEviction())
- dbMgr.cancelOrWaitPartitionDestroy(cctx, p);
+ if (!grp.allowFastEviction())
+ dbMgr.cancelOrWaitPartitionDestroy(grp.groupId(), p);
- boolean exists = cctx.shared().pageStore() != null
- && cctx.shared().pageStore().exists(cctx.cacheId(), p);
+ boolean exists = ctx.pageStore() != null
+ && ctx.pageStore().exists(grp.groupId(), p);
return new GridCacheDataStore(p, exists);
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
- assert cctx.memoryPolicy().pageMemory() instanceof PageMemoryEx;
+ assert grp.memoryPolicy().pageMemory() instanceof PageMemoryEx;
reuseList.saveMetadata();
@@ -166,12 +179,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
freeList.saveMetadata();
+ // TODO IGNITE-5075.
long updCntr = store.updateCounter();
- int size = store.size();
+ int size = store.fullSize();
long rmvId = globalRemoveId().get();
- PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory();
- IgniteWriteAheadLogManager wal = cctx.shared().wal();
+ PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
+ IgniteWriteAheadLogManager wal = this.ctx.wal();
if (size > 0 || updCntr > 0) {
int state = -1;
@@ -180,7 +194,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
state = GridDhtPartitionState.EVICTED.ordinal();
else {
// localPartition will not acquire writeLock here because create=false.
- GridDhtLocalPartition part = cctx.topology().localPartition(store.partId(),
+ GridDhtLocalPartition part = grp.topology().localPartition(store.partId(),
AffinityTopologyVersion.NONE, false);
if (part != null && part.state() != GridDhtPartitionState.EVICTED)
@@ -191,12 +205,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (state == -1)
return false;
- int cacheId = cctx.cacheId();
- long partMetaId = pageMem.partitionMetaPageId(cacheId, store.partId());
- long partMetaPage = pageMem.acquirePage(cacheId, partMetaId);
+ int grpId = grp.groupId();
+ long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
- long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage);
+ long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
try {
PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
@@ -210,52 +224,52 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
int pageCount;
if (beforeSnapshot) {
- pageCount = cctx.shared().pageStore().pages(cctx.cacheId(), store.partId());
+ pageCount = this.ctx.pageStore().pages(grpId, store.partId());
io.setCandidatePageCount(pageAddr, pageCount);
if (saveMeta) {
- long metaPageId = pageMem.metaPageId(cctx.cacheId());
- long metaPage = pageMem.acquirePage(cctx.cacheId(), metaPageId);
+ long metaPageId = pageMem.metaPageId(grpId);
+ long metaPage = pageMem.acquirePage(grpId, metaPageId);
try {
- long metaPageAddr = pageMem.writeLock(cctx.cacheId(), metaPageId, metaPage);
+ long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage);
try {
long nextSnapshotTag = io.getNextSnapshotTag(metaPageAddr);
io.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1);
- if (PageHandler.isWalDeltaRecordNeeded(pageMem, cctx.cacheId(), metaPageId,
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId,
metaPage, wal, null))
- wal.log(new MetaPageUpdateNextSnapshotId(cctx.cacheId(), metaPageId,
+ wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId,
nextSnapshotTag + 1));
- addPartition(ctx.partitionStatMap(), metaPageAddr, io, cctx.cacheId(), PageIdAllocator.INDEX_PARTITION,
- cctx.kernalContext().cache().context().pageStore().pages(cacheId, PageIdAllocator.INDEX_PARTITION));
+ addPartition(ctx.partitionStatMap(), metaPageAddr, io, grpId, PageIdAllocator.INDEX_PARTITION,
+ this.ctx.kernalContext().cache().context().pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION));
}
finally {
- pageMem.writeUnlock(cctx.cacheId(), metaPageId, metaPage, null, true);
+ pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true);
}
}
finally {
- pageMem.releasePage(cctx.cacheId(), metaPageId, metaPage);
+ pageMem.releasePage(grpId, metaPageId, metaPage);
}
wasSaveToMeta = true;
}
- GridDhtPartitionMap partMap = cctx.topology().localPartitionMap();
+ GridDhtPartitionMap partMap = grp.topology().localPartitionMap();
if (partMap.containsKey(store.partId()) &&
partMap.get(store.partId()) == GridDhtPartitionState.OWNING)
- addPartition(ctx.partitionStatMap(), pageAddr, io, cctx.cacheId(), store.partId(),
- cctx.kernalContext().cache().context().pageStore().pages(cctx.cacheId(), store.partId()));
+ addPartition(ctx.partitionStatMap(), pageAddr, io, grpId, store.partId(),
+ this.ctx.pageStore().pages(grpId, store.partId()));
}
else
pageCount = io.getCandidatePageCount(pageAddr);
- if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null))
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
wal.log(new MetaPageUpdatePartitionDataRecord(
- cacheId,
+ grpId,
partMetaId,
updCntr,
rmvId,
@@ -265,11 +279,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
));
}
finally {
- pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, true);
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, true);
}
}
finally {
- pageMem.releasePage(cacheId, partMetaId, partMetaPage);
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
}
@@ -304,23 +318,23 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException {
- cctx.shared().database().checkpointReadLock();
+ ctx.database().checkpointReadLock();
try {
int p = store.partId();
saveStoreMetadata(store, null, false, true);
- PageMemoryEx pageMemory = (PageMemoryEx)cctx.memoryPolicy().pageMemory();
+ PageMemoryEx pageMemory = (PageMemoryEx)grp.memoryPolicy().pageMemory();
- int tag = pageMemory.invalidate(cctx.cacheId(), p);
+ int tag = pageMemory.invalidate(grp.groupId(), p);
- cctx.shared().wal().log(new PartitionDestroyRecord(cctx.cacheId(), p));
+ ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), p));
- cctx.shared().pageStore().onPartitionDestroyed(cctx.cacheId(), p, tag);
+ ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
}
finally {
- cctx.shared().database().checkpointReadUnlock();
+ ctx.database().checkpointReadUnlock();
}
}
@@ -355,11 +369,13 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public RootPage rootPageForIndex(String idxName) throws IgniteCheckedException {
+ // TODO IGNITE-5075: per cache?
return metaStore.getOrAllocateForTree(idxName);
}
/** {@inheritDoc} */
@Override public void dropRootPageForIndex(String idxName) throws IgniteCheckedException {
+ // TODO IGNITE-5075: per cache?
metaStore.dropRootPage(idxName);
}
@@ -369,10 +385,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override protected void destroyCacheDataStructures() {
- assert cctx.affinityNode();
-
- ((GridCacheDatabaseSharedManager)cctx.shared().database()).removeCheckpointListener(this);
+ @Override public void stop() {
+ if (grp.affinityNode())
+ ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this);
}
/**
@@ -380,14 +395,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
* @throws IgniteCheckedException If failed.
*/
private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
- PageMemoryEx pageMem = (PageMemoryEx) cctx.memoryPolicy().pageMemory();
- IgniteWriteAheadLogManager wal = cctx.shared().wal();
+ PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
+ IgniteWriteAheadLogManager wal = ctx.wal();
+
+ int grpId = grp.groupId();
+ long metaId = pageMem.metaPageId(grpId);
+ long metaPage = pageMem.acquirePage(grpId, metaId);
- int cacheId = cctx.cacheId();
- long metaId = pageMem.metaPageId(cacheId);
- long metaPage = pageMem.acquirePage(cacheId, metaId);
try {
- final long pageAddr = pageMem.writeLock(cacheId, metaId, metaPage);
+ final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage);
boolean allocated = false;
@@ -399,15 +415,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
pageIO.initNewPage(pageAddr, metaId, pageMem.pageSize());
- metastoreRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
- reuseListRoot = pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
+ metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
+ reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
pageIO.setTreeRoot(pageAddr, metastoreRoot);
pageIO.setReuseListRoot(pageAddr, reuseListRoot);
- if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, metaId, metaPage, wal, null))
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null))
wal.log(new MetaPageInitRecord(
- cacheId,
+ grpId,
metaId,
pageIO.getType(),
pageIO.getVersion(),
@@ -427,15 +443,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
return new Metas(
- new RootPage(new FullPageId(metastoreRoot, cacheId), allocated),
- new RootPage(new FullPageId(reuseListRoot, cacheId), allocated));
+ new RootPage(new FullPageId(metastoreRoot, grpId), allocated),
+ new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
}
finally {
- pageMem.writeUnlock(cacheId, metaId, metaPage, null, allocated);
+ pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
}
}
finally {
- pageMem.releasePage(cacheId, metaId, metaPage);
+ pageMem.releasePage(grpId, metaId, metaPage);
}
}
@@ -445,10 +461,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
if (partCntrSince == null)
return super.rebalanceIterator(part, topVer, partCntrSince);
- GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)cctx.shared().database();
+ GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)ctx.database();
try {
- WALPointer startPtr = database.searchPartitionCounter(cctx, part, partCntrSince);
+ WALPointer startPtr = database.searchPartitionCounter(grp.groupId(), part, partCntrSince);
if (startPtr == null) {
assert false : "partCntr=" + partCntrSince + ", reservations=" + S.toString(Map.class, database.reservedForPreloading());
@@ -456,9 +472,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return super.rebalanceIterator(part, topVer, partCntrSince);
}
- WALIterator it = cctx.shared().wal().replay(startPtr);
+ WALIterator it = ctx.wal().replay(startPtr);
- return new RebalanceIteratorAdapter(cctx, it, part);
+ return new RebalanceIteratorAdapter(grp, it, part);
}
catch (IgniteCheckedException e) {
U.warn(log, "Failed to create WAL-based rebalance iterator (a full partition will transferred to a " +
@@ -472,14 +488,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
*
*/
private static class RebalanceIteratorAdapter implements IgniteRebalanceIterator {
- /** Cache context. */
- private GridCacheContext cctx;
+ /** Cache group caches. */
+ private final Set<Integer> cacheGrpCaches;
/** WAL iterator. */
- private WALIterator walIt;
+ private final WALIterator walIt;
/** Partition to scan. */
- private int part;
+ private final int part;
/** */
private Iterator<DataEntry> entryIt;
@@ -488,12 +504,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
private CacheDataRow next;
/**
- * @param cctx Cache context.
+ * @param grp Cache group.
* @param walIt WAL iterator.
* @param part Partition ID.
*/
- private RebalanceIteratorAdapter(GridCacheContext cctx, WALIterator walIt, int part) {
- this.cctx = cctx;
+ private RebalanceIteratorAdapter(CacheGroupInfrastructure grp, WALIterator walIt, int part) {
+ this.cacheGrpCaches = grp.cacheIds();
this.walIt = walIt;
this.part = part;
@@ -568,8 +584,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
while (entryIt.hasNext()) {
DataEntry entry = entryIt.next();
- if (entry.cacheId() == cctx.cacheId() &&
- entry.partitionId() == part) {
+ if (entry.partitionId() == part && cacheGrpCaches.contains(entry.cacheId())) {
next = new DataEntryRow(entry);
@@ -742,7 +757,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
return null;
}
- IgniteCacheDatabaseSharedManager dbMgr = cctx.shared().database();
+ IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
dbMgr.checkpointReadLock();
@@ -753,12 +768,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
RootPage reuseRoot = metas.reuseListRoot;
freeList = new FreeListImpl(
- cctx.cacheId(),
- cctx.name() + "-" + partId,
- (MemoryMetricsImpl)cctx.memoryPolicy().memoryMetrics(),
- cctx.memoryPolicy(),
+ grp.groupId(),
+ grp.cacheOrGroupName() + "-" + partId,
+ grp.memoryPolicy().memoryMetrics(),
+ grp.memoryPolicy(),
null,
- cctx.shared().wal(),
+ ctx.wal(),
reuseRoot.pageId().pageId(),
reuseRoot.isAllocated()) {
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
@@ -766,15 +781,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
};
- CacheDataRowStore rowStore = new CacheDataRowStore(cctx, freeList, partId);
+ CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId);
RootPage treeRoot = metas.treeRoot;
CacheDataTree dataTree = new CacheDataTree(
+ grp,
name,
freeList,
rowStore,
- cctx,
treeRoot.pageId().pageId(),
treeRoot.isAllocated()) {
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
@@ -782,15 +797,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
};
- PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory();
+ PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree);
- int cacheId = cctx.cacheId();
- long partMetaId = pageMem.partitionMetaPageId(cacheId, partId);
- long partMetaPage = pageMem.acquirePage(cacheId, partMetaId);
+ int grpId = grp.groupId();
+ long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
- long pageAddr = pageMem.readLock(cacheId, partMetaId, partMetaPage);
+ long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage);
try {
if (PageIO.getType(pageAddr) != 0) {
@@ -798,15 +813,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr));
- cctx.offheap().globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
+ globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
}
}
finally {
- pageMem.readUnlock(cacheId, partMetaId, partMetaPage);
+ pageMem.readUnlock(grpId, partMetaId, partMetaPage);
}
}
finally {
- pageMem.releasePage(cacheId, partMetaId, partMetaPage);
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
delegate = delegate0;
@@ -835,15 +850,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
* @return Partition metas.
*/
private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException {
- PageMemoryEx pageMem = (PageMemoryEx)cctx.memoryPolicy().pageMemory();
- IgniteWriteAheadLogManager wal = cctx.shared().wal();
+ PageMemoryEx pageMem = (PageMemoryEx)grp.memoryPolicy().pageMemory();
+ IgniteWriteAheadLogManager wal = ctx.wal();
- int cacheId = cctx.cacheId();
- long partMetaId = pageMem.partitionMetaPageId(cacheId, partId);
- long partMetaPage = pageMem.acquirePage(cacheId, partMetaId);
+ int grpId = grp.groupId();
+ long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
+ long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
boolean allocated = false;
- long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage);
+ long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
try {
long treeRoot, reuseListRoot;
@@ -854,8 +869,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
io.initNewPage(pageAddr, partMetaId, pageMem.pageSize());
- treeRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA);
- reuseListRoot = pageMem.allocatePage(cacheId, partId, PageMemory.FLAG_DATA);
+ treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
+ reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
@@ -863,9 +878,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
io.setTreeRoot(pageAddr, treeRoot);
io.setReuseListRoot(pageAddr, reuseListRoot);
- if (PageHandler.isWalDeltaRecordNeeded(pageMem, cacheId, partMetaId, partMetaPage, wal, null))
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
wal.log(new MetaPageInitRecord(
- cctx.cacheId(),
+ grpId,
partMetaId,
io.getType(),
io.getVersion(),
@@ -882,21 +897,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
reuseListRoot = io.getReuseListRoot(pageAddr);
assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA :
- U.hexLong(treeRoot) + ", part=" + partId + ", cacheId=" + cacheId;
+ U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId;
assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA :
- U.hexLong(reuseListRoot) + ", part=" + partId + ", cacheId=" + cacheId;
+ U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId;
}
return new Metas(
- new RootPage(new FullPageId(treeRoot, cacheId), allocated),
- new RootPage(new FullPageId(reuseListRoot, cacheId), allocated));
+ new RootPage(new FullPageId(treeRoot, grpId), allocated),
+ new RootPage(new FullPageId(reuseListRoot, grpId), allocated));
}
finally {
- pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, allocated);
+ pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated);
}
}
finally {
- pageMem.releasePage(cacheId, partMetaId, partMetaPage);
+ pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
@@ -918,11 +933,11 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public int size() {
+ @Override public int fullSize() {
try {
CacheDataStore delegate0 = init0(true);
- return delegate0 == null ? 0 : delegate0.size();
+ return delegate0 == null ? 0 : delegate0.fullSize();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -998,6 +1013,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/** {@inheritDoc} */
@Override public void update(
+ GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
@@ -1006,47 +1022,51 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- delegate.update(key, val, ver, expireTime, oldRow);
+ delegate.update(cctx, key, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
- @Override public void updateIndexes(KeyCacheObject key) throws IgniteCheckedException {
+ @Override public void updateIndexes(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- delegate.updateIndexes(key);
+ delegate.updateIndexes(cctx, key);
}
/** {@inheritDoc} */
- @Override public CacheDataRow createRow(KeyCacheObject key,
+ @Override public CacheDataRow createRow(
+ GridCacheContext cctx,
+ KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- return delegate.createRow(key, val, ver, expireTime, oldRow);
+ return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
- @Override public void invoke(KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException {
+ @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
+ throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- delegate.invoke(key, c);
+ delegate.invoke(cctx, key, c);
}
/** {@inheritDoc} */
- @Override public void remove(KeyCacheObject key, int partId) throws IgniteCheckedException {
+ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
+ throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
- delegate.remove(key, partId);
+ delegate.remove(cctx, key, partId);
}
/** {@inheritDoc} */
- @Override public CacheDataRow find(KeyCacheObject key) throws IgniteCheckedException {
+ @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
- return delegate.find(key);
+ return delegate.find(cctx, key);
return null;
}
@@ -1062,12 +1082,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower,
+ @Override public GridCursor<? extends CacheDataRow> cursor(
+ int cacheId,
+ KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
- return delegate.cursor(lower, upper);
+ return delegate.cursor(cacheId, lower, upper);
return EMPTY_CURSOR;
}
@@ -1076,6 +1098,29 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public void destroy() throws IgniteCheckedException {
// No need to destroy delegate.
}
+
+ /** {@inheritDoc} */
+ @Override public int cacheSize(int cacheId) {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.cursor(cacheId);
+
+ return EMPTY_CURSOR;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear(int cacheId) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ delegate.clear(cacheId);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java
index 1bb83d2..576e58a 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/file/FilePageStoreManager.java
@@ -29,7 +29,7 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,10 +43,10 @@ import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
+import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.database.IgniteCacheSnapshotManager;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
@@ -72,6 +72,9 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
public static final String CACHE_DIR_PREFIX = "cache-";
/** */
+ public static final String CACHE_GRP_DIR_PREFIX = "cacheGroup-";
+
+ /** */
public static final String CACHE_CONF_FILENAME = "conf.dat";
/** Marshaller. */
@@ -93,7 +96,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
private final long metaPageId = PageIdUtils.pageId(-1, PageMemory.FLAG_IDX, 0);
/** */
- private final Set<Integer> cachesWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+ private final Set<Integer> grpsWithoutIdx = Collections.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
/**
* @param ctx Kernal context.
@@ -179,25 +182,57 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
- @Override public void initializeForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
- int cacheId = CU.cacheId(ccfg.getName());
+ @Override public void initializeForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg)
+ throws IgniteCheckedException {
+ int grpId = grpDesc.groupId();
- if (!idxCacheStores.containsKey(cacheId)) {
- CacheStoreHolder holder = initForCache(ccfg);
+ if (!idxCacheStores.containsKey(grpId)) {
+ CacheStoreHolder holder = initForCache(grpDesc, ccfg);
- CacheStoreHolder old = idxCacheStores.put(cacheId, holder);
+ CacheStoreHolder old = idxCacheStores.put(grpId, holder);
assert old == null : "Non-null old store holder for cache: " + ccfg.getName();
}
+
+ storeCacheConfiguration(grpDesc, ccfg);
+ }
+
+ /**
+ * @param grpDesc Cache group descriptor.
+ * @param ccfg Cache configuration.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void storeCacheConfiguration(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg)
+ throws IgniteCheckedException {
+ File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg);
+ File file;
+
+ if (grpDesc.sharedGroup())
+ file = new File(cacheWorkDir, ccfg.getName() + CACHE_CONF_FILENAME);
+ else
+ file = new File(cacheWorkDir, CACHE_CONF_FILENAME);
+
+ if (!file.exists() || file.length() == 0) {
+ try {
+ file.createNewFile();
+
+ try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) {
+ marshaller.marshal(ccfg, stream);
+ }
+ }
+ catch (IOException ex) {
+ throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex);
+ }
+ }
}
/** {@inheritDoc} */
- @Override public void shutdownForCache(GridCacheContext cacheCtx, boolean destroy) throws IgniteCheckedException {
- cachesWithoutIdx.remove(cacheCtx.cacheId());
+ @Override public void shutdownForCacheGroup(CacheGroupInfrastructure grp, boolean destroy) throws IgniteCheckedException {
+ grpsWithoutIdx.remove(grp.groupId());
- CacheStoreHolder old = idxCacheStores.remove(cacheCtx.cacheId());
+ CacheStoreHolder old = idxCacheStores.remove(grp.groupId());
- assert old != null : "Missing cache store holder [cache=" + cacheCtx.name() +
+ assert old != null : "Missing cache store holder [cache=" + grp.cacheOrGroupName() +
", locNodeId=" + cctx.localNodeId() + ", gridName=" + cctx.igniteInstanceName() + ']';
IgniteCheckedException ex = shutdown(old, /*clean files if destroy*/destroy, null);
@@ -207,17 +242,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
- @Override public void onPartitionCreated(int cacheId, int partId) throws IgniteCheckedException {
+ @Override public void onPartitionCreated(int grpId, int partId) throws IgniteCheckedException {
// No-op.
}
/** {@inheritDoc} */
- @Override public void onPartitionDestroyed(int cacheId, int partId, int tag) throws IgniteCheckedException {
+ @Override public void onPartitionDestroyed(int grpId, int partId, int tag) throws IgniteCheckedException {
assert partId <= PageIdAllocator.MAX_PARTITION_ID;
- PageStore store = getStore(cacheId, partId);
+ PageStore store = getStore(grpId, partId);
- assert store instanceof FilePageStore;
+ assert store instanceof FilePageStore : store;
((FilePageStore)store).truncate(tag);
}
@@ -286,12 +321,31 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/**
+ * @param grpDesc Cache group descriptor.
+ * @param ccfg Cache configuration.
+ * @return Cache work directory.
+ */
+ private File cacheWorkDirectory(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) {
+ String dirName;
+
+ if (grpDesc.sharedGroup())
+ dirName = CACHE_GRP_DIR_PREFIX + ccfg.getGroupName();
+ else
+ dirName = CACHE_DIR_PREFIX + ccfg.getName();
+
+ return new File(storeWorkDir, dirName);
+ }
+
+ /**
+ * @param grpDesc Cache group descriptor.
* @param ccfg Cache configuration.
* @return Cache store holder.
* @throws IgniteCheckedException If failed.
*/
- private CacheStoreHolder initForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
- File cacheWorkDir = new File(storeWorkDir, CACHE_DIR_PREFIX + ccfg.getName());
+ private CacheStoreHolder initForCache(CacheGroupDescriptor grpDesc, CacheConfiguration ccfg) throws IgniteCheckedException {
+ assert !grpDesc.sharedGroup() || ccfg.getGroupName() != null : ccfg.getName();
+
+ File cacheWorkDir = cacheWorkDirectory(grpDesc, ccfg);
boolean dirExisted = false;
@@ -348,32 +402,17 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
dirExisted = true;
}
- File file = new File(cacheWorkDir, CACHE_CONF_FILENAME);
-
- if (!file.exists() || file.length() == 0) {
- try {
- file.createNewFile();
-
- try (OutputStream stream = new BufferedOutputStream(new FileOutputStream(file))) {
- marshaller.marshal(ccfg, stream);
- }
- }
- catch (IOException ex) {
- throw new IgniteCheckedException("Failed to persist cache configuration: " + ccfg.getName(), ex);
- }
- }
-
File idxFile = new File(cacheWorkDir, INDEX_FILE_NAME);
if (dirExisted && !idxFile.exists())
- cachesWithoutIdx.add(CU.cacheId(ccfg.getName()));
+ grpsWithoutIdx.add(grpDesc.groupId());
FilePageStore idxStore = new FilePageStore(
PageMemory.FLAG_IDX,
idxFile,
cctx.kernalContext().config().getMemoryConfiguration());
- FilePageStore[] partStores = new FilePageStore[ccfg.getAffinity().partitions()];
+ FilePageStore[] partStores = new FilePageStore[grpDesc.config().getAffinity().partitions()];
for (int partId = 0; partId < partStores.length; partId++) {
FilePageStore partStore = new FilePageStore(
@@ -422,52 +461,74 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
- @Override public Set<String> savedCacheNames() {
+ @Override public Map<String, CacheConfiguration> readCacheConfigurations() throws IgniteCheckedException {
if (cctx.kernalContext().clientNode())
- return Collections.emptySet();
+ return Collections.emptyMap();
File[] files = storeWorkDir.listFiles();
if (files == null)
- return Collections.emptySet();
+ return Collections.emptyMap();
- Set<String> cacheNames = new HashSet<>();
+ Map<String, CacheConfiguration> ccfgs = new HashMap<>();
for (File file : files) {
- if (file.isDirectory() && file.getName().startsWith(CACHE_DIR_PREFIX)) {
- File conf = new File(file, CACHE_CONF_FILENAME);
- if (conf.exists() && conf.length() > 0) {
- String name = file.getName().substring(CACHE_DIR_PREFIX.length());
+ if (file.isDirectory()) {
+ if (file.getName().startsWith(CACHE_DIR_PREFIX)) {
+ File conf = new File(file, CACHE_CONF_FILENAME);
- // TODO remove when fixed null cache names.
- if ("null".equals(name))
- name = null;
+ if (conf.exists() && conf.length() > 0) {
+ CacheConfiguration ccfg = readCacheConfig(conf);
- cacheNames.add(name);
+ ccfgs.put(ccfg.getName(), ccfg);
+ }
}
+ else if (file.getName().startsWith(CACHE_GRP_DIR_PREFIX))
+ readCacheGroupCaches(file, ccfgs);
}
}
- return cacheNames;
+ return ccfgs;
}
- /** {@inheritDoc} */
- @Override public CacheConfiguration readConfiguration(String cacheName) {
- File file = new File(storeWorkDir, CACHE_DIR_PREFIX + cacheName);
+ /**
+ * @param grpDir Group directory.
+ * @param ccfgs Cache configurations.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void readCacheGroupCaches(File grpDir, Map<String, CacheConfiguration> ccfgs) throws IgniteCheckedException {
+ File[] files = grpDir.listFiles();
+
+ if (files == null)
+ return;
+
+ for (File file : files) {
+ if (!file.isDirectory() && file.getName().endsWith(CACHE_CONF_FILENAME) && file.length() > 0) {
+ CacheConfiguration ccfg = readCacheConfig(file);
- assert file.exists() && file.isDirectory();
+ ccfgs.put(ccfg.getName(), ccfg);
+ }
+ }
+ }
- try (InputStream stream = new BufferedInputStream(new FileInputStream(new File(file, CACHE_CONF_FILENAME)))) {
+ /**
+ * @param conf File with stored configuration.
+ * @return Cache configuration.
+ * @throws IgniteCheckedException If failed.
+ */
+ private CacheConfiguration readCacheConfig(File conf) throws IgniteCheckedException {
+ try (InputStream stream = new BufferedInputStream(new FileInputStream(conf))) {
return marshaller.unmarshal(stream, U.resolveClassLoader(igniteCfg));
}
- catch (IOException | IgniteCheckedException e) {
- throw new IllegalStateException("Failed to read cache configuration from disk for cache: " + cacheName, e);
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to read cache configuration from disk for cache: " +
+ conf.getAbsolutePath(), e);
}
}
/** {@inheritDoc} */
- @Override public boolean hasIndexStore(int cacheId) {
- return !cachesWithoutIdx.contains(cacheId);
+ @Override public boolean hasIndexStore(int grpId) {
+ return !grpsWithoutIdx.contains(grpId);
}
/**
@@ -537,19 +598,19 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/**
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
* @param partId Partition ID.
* @return Page store for the corresponding parameters.
* @throws IgniteCheckedException If cache or partition with the given ID was not created.
*
* Note: visible for testing.
*/
- public PageStore getStore(int cacheId, int partId) throws IgniteCheckedException {
- CacheStoreHolder holder = idxCacheStores.get(cacheId);
+ public PageStore getStore(int grpId, int partId) throws IgniteCheckedException {
+ CacheStoreHolder holder = idxCacheStores.get(grpId);
if (holder == null)
throw new IgniteCheckedException("Failed to get page store for the given cache ID " +
- "(cache has not been started): " + cacheId);
+ "(cache has not been started): " + grpId);
if (partId == PageIdAllocator.INDEX_PARTITION)
return holder.idxStore;
@@ -561,7 +622,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (store == null)
throw new IgniteCheckedException("Failed to get page store for the given partition ID " +
- "(partition has not been created) [cacheId=" + cacheId + ", partId=" + partId + ']');
+ "(partition has not been created) [grpId=" + grpId + ", partId=" + partId + ']');
return store;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
index e8ae554..ef84d83 100644
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryEx.java
@@ -131,16 +131,16 @@ public interface PageMemoryEx extends PageMemory {
public int invalidate(int cacheId, int partId);
/**
- * Clears internal metadata of destroyed cache.
+ * Clears internal metadata of destroyed cache group.
*
- * @param cacheId Cache ID.
+ * @param grpId Cache group ID.
*/
- public void onCacheDestroyed(int cacheId);
+ public void onCacheGroupDestroyed(int grpId);
/**
* Asynchronously clears pages satisfying the given predicate.
*
- * @param pred Predicate for cacheId, pageId and partition tag.
+ * @param pred Predicate for cache group id, pageId and partition tag.
* @param cleanDirty Flag indicating that dirty pages collection should be cleaned.
* @return Future that will be completed when all pages are cleared.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/7ae45b4b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
index 21ebcbf..a791ec9 100755
--- a/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
+++ b/modules/pds/src/main/java/org/apache/ignite/internal/processors/cache/database/pagemem/PageMemoryImpl.java
@@ -970,12 +970,12 @@ public class PageMemoryImpl implements PageMemoryEx {
}
/** {@inheritDoc} */
- @Override public void onCacheDestroyed(int cacheId) {
+ @Override public void onCacheGroupDestroyed(int grpId) {
for (Segment seg : segments) {
seg.writeLock().lock();
try {
- seg.resetPartTags(cacheId);
+ seg.resetPartTags(grpId);
}
finally {
seg.writeLock().unlock();
@@ -1935,7 +1935,7 @@ public class PageMemoryImpl implements PageMemoryEx {
}
}
- private void resetPartTags(int cacheId) {
+ private void resetPartTags(int grpId) {
assert getWriteHoldCount() > 0;
Iterator<T2<Integer, Integer>> iter = partitionTagMap.keySet().iterator();
@@ -1943,7 +1943,7 @@ public class PageMemoryImpl implements PageMemoryEx {
while (iter.hasNext()) {
T2<Integer, Integer> t = iter.next();
- if (t.get1() == cacheId)
+ if (t.get1() == grpId)
iter.remove();
}
}