You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/05/16 11:10:47 UTC
ignite git commit: ignite-5075 cacheId filter for BPlusTree
Repository: ignite
Updated Branches:
refs/heads/ignite-5075 72cb4459e -> e84d4a2d1
ignite-5075 cacheId filter for BPlusTree
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e84d4a2d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e84d4a2d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e84d4a2d
Branch: refs/heads/ignite-5075
Commit: e84d4a2d14fbacf2f55aeecd3e031de0bbbec577
Parents: 72cb445
Author: sboikov <sb...@gridgain.com>
Authored: Tue May 16 14:10:41 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue May 16 14:10:41 2017 +0300
----------------------------------------------------------------------
.../cache/CacheGroupInfrastructure.java | 8 +-
.../cache/IgniteCacheOffheapManager.java | 19 +-
.../cache/IgniteCacheOffheapManagerImpl.java | 826 +++++++++++++++----
.../processors/cache/database/CacheDataRow.java | 5 -
.../cache/database/CacheDataRowAdapter.java | 31 +-
.../cache/database/CacheSearchRow.java | 5 +
.../cache/database/tree/io/PageIO.java | 24 +
.../query/h2/database/H2PkHashIndex.java | 2 +-
8 files changed, 715 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index ed4ba46..9581068 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -157,8 +157,7 @@ public class CacheGroupInfrastructure {
depEnabled = ctx.kernalContext().deploy().enabled() && !ctx.kernalContext().cacheObjects().isBinaryEnabled(ccfg);
- storeCacheId = affNode &&
- (sharedGroup() || memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED);
+ storeCacheId = affNode && memPlc.config().getPageEvictionMode() != DataPageEvictionMode.DISABLED;
log = ctx.kernalContext().log(getClass());
@@ -172,7 +171,10 @@ public class CacheGroupInfrastructure {
return rcvdFrom;
}
- public boolean storeCacheId() {
+ /**
+ * @return {@code True} if cacheId should be stored in data pages.
+ */
+ public boolean storeCacheIdInDataPage() {
return storeCacheId;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 95556f8..a3f44af 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -403,6 +403,7 @@ public interface IgniteCacheOffheapManager {
@Nullable CacheDataRow oldRow) throws IgniteCheckedException;
/**
+ * @param cctx Cache context.
* @param key Key.
* @param c Closure.
* @throws IgniteCheckedException If failed.
@@ -410,6 +411,7 @@ public interface IgniteCacheOffheapManager {
public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException;
/**
+ * @param cctx Cache context.
* @param key Key.
* @param partId Partition number.
* @throws IgniteCheckedException If failed.
@@ -417,6 +419,7 @@ public interface IgniteCacheOffheapManager {
public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException;
/**
+ * @param cctx Cache context.
* @param key Key.
* @return Data row.
* @throws IgniteCheckedException If failed.
@@ -430,12 +433,19 @@ public interface IgniteCacheOffheapManager {
public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException;
/**
+ * @return Data cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException;
+
+ /**
+ * @param cacheId Cache ID.
* @param lower Lower bound.
* @param upper Upper bound.
* @return Data cursor.
* @throws IgniteCheckedException If failed.
*/
- public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower,
+ public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException;
/**
@@ -446,6 +456,13 @@ public interface IgniteCacheOffheapManager {
public void destroy() throws IgniteCheckedException;
/**
+ * Clears all the records associated with logical cache with given ID.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ public void clear(int cacheId) throws IgniteCheckedException;
+
+ /**
* @return Row store.
*/
public RowStore rowStore();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index e037572..471f073 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import java.util.Collections;
import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -28,7 +29,6 @@ import javax.cache.Cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageMemory;
@@ -80,6 +80,9 @@ import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
@SuppressWarnings("PublicInnerClass")
public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager {
/** */
+ private static final int UNDEFINED_CACHE_ID = 0;
+
+ /** */
protected GridCacheSharedContext ctx;
/** */
@@ -105,9 +108,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
private volatile boolean hasPendingEntries;
/** */
- private static final PendingRow START_PENDING_ROW = new PendingRow(Long.MIN_VALUE, 0);
-
- /** */
private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000);
/** */
@@ -176,7 +176,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public void stop() {
- // TODO IGNITE-5075.
+ try {
+ for (CacheDataStore store : cacheDataStores())
+ store.destroy();
+
+ if (pendingEntries != null)
+ pendingEntries.destroy();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
}
/** {@inheritDoc} */
@@ -188,18 +197,24 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
*
*/
protected void destroyCacheDataStructures(int cacheId, boolean destroy) {
- // TODO IGNITE-5075.
assert grp.affinityNode();
try {
- if (locCacheDataStore != null)
- locCacheDataStore.destroy();
+ if (grp.sharedGroup()) {
+ assert cacheId != UNDEFINED_CACHE_ID;
- if (pendingEntries != null)
- pendingEntries.destroy();
+ for (CacheDataStore store : cacheDataStores())
+ store.clear(cacheId);
- for (CacheDataStore store : partDataStores.values())
- store.destroy();
+ if (pendingEntries != null) {
+ PendingRow row = new PendingRow(cacheId);
+
+ boolean removex = pendingEntries.removex(row);
+
+ while (removex)
+ removex = pendingEntries.removex(row);
+ }
+ }
}
catch (IgniteCheckedException e) {
throw new IgniteException(e.getMessage(), e);
@@ -222,12 +237,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public long cacheEntriesCount(int cacheId) {
- if (grp.isLocal())
- return locCacheDataStore.cacheSize(cacheId);
-
long size = 0;
- for (CacheDataStore store : partDataStores.values())
+ for (CacheDataStore store : cacheDataStores())
size += store.cacheSize(cacheId);
return size;
@@ -257,27 +269,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (grp.isLocal())
return cacheEntriesCount(cacheId, 0);
else {
- ClusterNode locNode = ctx.localNode();
-
long cnt = 0;
- Set<Integer> primaryParts = grp.affinity().cachedAffinity(topVer).primaryPartitions(locNode.id());
- Set<Integer> backupParts = grp.affinity().cachedAffinity(topVer).backupPartitions(locNode.id());
+ Iterator<CacheDataStore> it = cacheData(primary, backup, topVer);
- for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
- if (primary) {
- if (primaryParts.contains(locPart.id())) {
- cnt += locPart.dataStore().cacheSize(cacheId);
-
- continue;
- }
- }
-
- if (backup) {
- if (backupParts.contains(locPart.id()))
- cnt += locPart.dataStore().cacheSize(cacheId);
- }
- }
+ while (it.hasNext())
+ cnt += it.next().cacheSize(cacheId);
return cnt;
}
@@ -285,16 +282,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override public long cacheEntriesCount(int cacheId, int part) {
- if (grp.isLocal()) {
- assert part == 0;
+ CacheDataStore store = partitionData(part);
- return locCacheDataStore.cacheSize(cacheId);
- }
- else {
- GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false);
-
- return locPart == null ? 0 : locPart.dataStore().cacheSize(cacheId);
- }
+ return store == null ? 0 : store.cacheSize(cacheId);
}
/**
@@ -307,7 +297,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert primary || backup;
if (grp.isLocal())
- return Collections.singleton(locCacheDataStore).iterator();
+ return singletonIterator(locCacheDataStore);
else {
final Iterator<GridDhtLocalPartition> it = grp.topology().currentLocalPartitions().iterator();
@@ -435,7 +425,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public void clear(GridCacheContext cctx, boolean readers) {
GridCacheVersion obsoleteVer = null;
- GridIterator<CacheDataRow> it = rowsIterator(true, true, null);
+ GridIterator<CacheDataRow> it = iterator(cctx.cacheId(), cacheDataStores().iterator());
while (it.hasNext()) {
KeyCacheObject key = it.next().key();
@@ -483,7 +473,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
final boolean backup,
final AffinityTopologyVersion topVer,
final boolean keepBinary) throws IgniteCheckedException {
- final Iterator<CacheDataRow> it = rowsIterator(primary, backup, topVer);
+ final Iterator<CacheDataRow> it = iteratorForCache(cctx.cacheId(), primary, backup, topVer);
return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() {
/** */
@@ -566,19 +556,35 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
boolean backups,
final AffinityTopologyVersion topVer)
throws IgniteCheckedException {
- // TODO IGNITE-5075.
- return rowsIterator(primary, backups, topVer);
+ return iterator(cacheId, cacheData(primary, backups, topVer));
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridIterator<CacheDataRow> iteratorForCache(int cacheId, int part) throws IgniteCheckedException {
+ CacheDataStore data = partitionData(part);
+
+ if (data == null)
+ return new GridEmptyCloseableIterator<>();
+
+ return iterator(cacheId, singletonIterator(data));
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridIterator<CacheDataRow> partitionIterator(int part) throws IgniteCheckedException {
+ CacheDataStore data = partitionData(part);
+
+ if (data == null)
+ return new GridEmptyCloseableIterator<>();
+
+ return iterator(UNDEFINED_CACHE_ID, singletonIterator(data));
}
/**
- * @param primary Primary entries flag.
- * @param backups Backup entries flag.
- * @param topVer Topology version.
- * @return Iterator.
+ * @param cacheId Cache ID.
+ * @param dataIt Data store iterator.
+ * @return Rows iterator
*/
- private GridIterator<CacheDataRow> rowsIterator(boolean primary, boolean backups, AffinityTopologyVersion topVer) {
- final Iterator<CacheDataStore> dataIt = cacheData(primary, backups, topVer);
-
+ private GridIterator<CacheDataRow> iterator(final int cacheId, final Iterator<CacheDataStore> dataIt) {
return new GridCloseableIteratorAdapter<CacheDataRow>() {
/** */
private GridCursor<? extends CacheDataRow> cur;
@@ -607,7 +613,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
CacheDataStore ds = dataIt.next();
curPart = ds.partId();
- cur = ds.cursor();
+ cur = cacheId == UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId);
}
else
break;
@@ -628,41 +634,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
};
}
- /** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> iteratorForCache(int cacheId, int part) throws IgniteCheckedException {
- // TODO IGNITE-5075.
- return partitionIterator(part);
- }
-
- /** {@inheritDoc} */
- @Override public GridIterator<CacheDataRow> partitionIterator(int part) throws IgniteCheckedException {
- CacheDataStore data = partitionData(part);
-
- if (data == null)
- return new GridEmptyCloseableIterator<>();
-
- final GridCursor<? extends CacheDataRow> cur = data.cursor();
-
- return new GridCloseableIteratorAdapter<CacheDataRow>() {
+ /**
+ * @param item Item.
+ * @return Single item iterator.
+ */
+ private <T> Iterator<T> singletonIterator(final T item) {
+ return new Iterator<T>() {
/** */
- private CacheDataRow next;
-
- @Override protected CacheDataRow onNext() {
- CacheDataRow res = next;
-
- next = null;
+ private boolean hasNext = true;
- return res;
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return hasNext;
}
- @Override protected boolean onHasNext() throws IgniteCheckedException {
- if (next != null)
- return true;
+ /** {@inheritDoc} */
+ @Override public T next() {
+ if (hasNext) {
+ hasNext = false;
- if (cur.next())
- next = cur.get();
+ return item;
+ }
- return next != null;
+ throw new NoSuchElementException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
}
};
}
@@ -824,13 +823,17 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
- // TODO IGNITE-5075 filter by cache ID if needed.
if (hasPendingEntries && pendingEntries != null) {
GridCacheVersion obsoleteVer = null;
long now = U.currentTimeMillis();
- GridCursor<PendingRow> cur = pendingEntries.find(START_PENDING_ROW, new PendingRow(now, 0));
+ GridCursor<PendingRow> cur;
+
+ if (grp.sharedGroup())
+ cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
+ else
+ cur = pendingEntries.find(null, new PendingRow(UNDEFINED_CACHE_ID, now, 0));
int cleared = 0;
@@ -1030,7 +1033,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- dataTree.invoke(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY, c);
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
+ dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c);
switch (c.operationType()) {
case PUT: {
@@ -1072,7 +1077,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException
{
- int cacheId = grp.storeCacheId() ? cctx.cacheId() : 0;
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId);
@@ -1089,6 +1094,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert dataRow.link() != 0 : dataRow;
+ if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup())
+ dataRow.cacheId(cctx.cacheId());
+
return dataRow;
}
@@ -1107,7 +1115,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- int cacheId = grp.storeCacheId() ? cctx.cacheId() : 0;
+ int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
+ assert oldRow == null || oldRow.cacheId() == cacheId : oldRow;
DataRow dataRow = new DataRow(key, val, ver, p, expireTime, cacheId);
@@ -1129,6 +1139,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert dataRow.link() != 0 : dataRow;
+ if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup())
+ dataRow.cacheId(cctx.cacheId());
+
if (oldRow != null) {
old = oldRow;
@@ -1161,6 +1174,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheQueryManager qryMgr = cctx.queries();
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
if (qryMgr.enabled()) {
if (oldRow != null) {
qryMgr.store(key,
@@ -1184,14 +1199,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
assert oldRow.link() != 0 : oldRow;
if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
+ pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
if (newRow.link() != oldRow.link())
rowStore.removeRow(oldRow.link());
}
if (pendingEntries != null && expireTime != 0) {
- pendingEntries.putx(new PendingRow(expireTime, newRow.link()));
+ pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link()));
hasPendingEntries = true;
}
@@ -1205,7 +1220,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
try {
- CacheDataRow oldRow = dataTree.remove(new SearchRow(key));
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
+ CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key));
finishRemove(cctx, key, oldRow);
}
@@ -1224,10 +1241,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
GridCacheVersion ver = null;
if (oldRow != null) {
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
assert oldRow.link() != 0 : oldRow;
+ assert cacheId == UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId :
+ "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "].";
if (pendingEntries != null && oldRow.expireTime() != 0)
- pendingEntries.removex(new PendingRow(oldRow.expireTime(), oldRow.link()));
+ pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link()));
decrementSize(cctx.cacheId());
@@ -1251,7 +1272,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
key.valueBytes(cctx.cacheObjectContext());
- CacheDataRow row = dataTree.findOne(new SearchRow(key), CacheDataRowAdapter.RowData.NO_KEY);
+ int cacheId = grp.sharedGroup() ? cctx.cacheId() : UNDEFINED_CACHE_ID;
+
+ CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY);
if (row != null) {
row.key(key);
@@ -1267,17 +1290,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
return dataTree.find(null, null);
}
+ /** {@inheritDoc}
+ * @param cacheId*/
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
+ return cursor(cacheId, null, null);
+ }
+
/** {@inheritDoc} */
- @Override public GridCursor<? extends CacheDataRow> cursor(KeyCacheObject lower,
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException {
- SearchRow lowerRow = null;
- SearchRow upperRow = null;
+ SearchRow lowerRow;
+ SearchRow upperRow;
- if (lower != null)
- lowerRow = new SearchRow(lower);
+ if (grp.sharedGroup()) {
+ assert cacheId != UNDEFINED_CACHE_ID;
- if (upper != null)
- upperRow = new SearchRow(upper);
+ lowerRow = lower != null ? new SearchRow(cacheId, lower) : new SearchRow(cacheId);
+ upperRow = upper != null ? new SearchRow(cacheId, upper) : new SearchRow(cacheId);
+ }
+ else {
+ lowerRow = lower != null ? new SearchRow(UNDEFINED_CACHE_ID, lower) : null;
+ upperRow = upper != null ? new SearchRow(UNDEFINED_CACHE_ID, upper) : null;
+ }
return dataTree.find(lowerRow, upperRow);
}
@@ -1309,6 +1343,41 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
+ @Override public void clear(int cacheId) throws IgniteCheckedException {
+ assert cacheId != UNDEFINED_CACHE_ID;
+
+ if (cacheSize(cacheId) == 0)
+ return;
+
+ Exception ex = null;
+
+ SearchRow row = new SearchRow(cacheId);
+
+ CacheDataRow removed = dataTree.remove(row);
+
+ while (removed != null) {
+ try {
+ rowStore.removeRow(removed.link());
+
+ decrementSize(cacheId);
+
+ removed = dataTree.remove(row);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Fail remove row [link=" + row.link() + "]");
+
+ if (ex == null)
+ ex = e;
+ else
+ ex.addSuppressed(e);
+ }
+ }
+
+ if (ex != null)
+ throw new IgniteCheckedException("Fail destroy store", ex);
+ }
+
+ /** {@inheritDoc} */
@Override public RowStore rowStore() {
return rowStore;
}
@@ -1396,13 +1465,28 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** */
private final int hash;
+ /** */
+ private final int cacheId;
+
/**
+ * @param cacheId Cache ID.
* @param key Key.
*/
- SearchRow(KeyCacheObject key) {
+ SearchRow(int cacheId, KeyCacheObject key) {
this.key = key;
+ this.hash = key.hashCode();
+ this.cacheId = cacheId;
+ }
- hash = key.hashCode();
+ /**
+ * Instantiates a new fake search row as a logic cache based bound.
+ *
+ * @param cacheId Cache ID.
+ */
+ SearchRow(int cacheId) {
+ this.key = null;
+ this.hash = 0;
+ this.cacheId = cacheId;
}
/** {@inheritDoc} */
@@ -1419,6 +1503,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public int hash() {
return hash;
}
+
+ /** {@inheritDoc} */
+ @Override public int cacheId() {
+ return cacheId;
+ }
}
/**
@@ -1488,6 +1577,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public void link(long link) {
this.link = link;
}
+
+ /**
+ * @param cacheId Cache ID.
+ */
+ void cacheId(int cacheId) {
+ this.cacheId = cacheId;
+ }
}
/**
@@ -1523,8 +1619,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
- DataInnerIO.VERSIONS,
- DataLeafIO.VERSIONS);
+ grp.sharedGroup() ? CacheIdAwareDataInnerIO.VERSIONS : DataInnerIO.VERSIONS,
+ grp.sharedGroup() ? CacheIdAwareDataLeafIO.VERSIONS : DataLeafIO.VERSIONS);
assert rowStore != null;
@@ -1535,16 +1631,39 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override protected int compare(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, CacheSearchRow row)
+ @Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
throws IgniteCheckedException {
- int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
+ RowLinkIO io = (RowLinkIO)iox;
+
+ int cmp;
+
+ if (grp.sharedGroup()) {
+ assert row.cacheId() != UNDEFINED_CACHE_ID : "Cache ID is not provided!";
+ assert io.getCacheId(pageAddr, idx) != UNDEFINED_CACHE_ID : "Cache ID is not stored!";
+
+ cmp = Integer.compare(io.getCacheId(pageAddr, idx), row.cacheId());
- int cmp = Integer.compare(hash, row.hash());
+ if (cmp != 0)
+ return cmp;
+
+ if(cmp == 0 && row.key() == null) {
+ assert row.getClass() == SearchRow.class;
+
+ // A search row with a cach ID only is used as a cache bound.
+ // The found position will be shifted until the exact cache bound is found;
+ // See for details:
+ // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound()
+ // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findUpperBound()
+ return cmp;
+ }
+ }
+
+ cmp = Integer.compare(io.getHash(pageAddr, idx), row.hash());
if (cmp != 0)
return cmp;
- long link = ((RowLinkIO)io).getLink(pageAddr, idx);
+ long link = io.getLink(pageAddr, idx);
assert row.key() != null : row;
@@ -1554,14 +1673,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** {@inheritDoc} */
@Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags)
throws IgniteCheckedException {
- int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
long link = ((RowLinkIO)io).getLink(pageAddr, idx);
+ int hash = ((RowLinkIO)io).getHash(pageAddr, idx);
+ int cacheId = ((RowLinkIO)io).getCacheId(pageAddr, idx);
CacheDataRowAdapter.RowData x = flags != null ?
(CacheDataRowAdapter.RowData)flags :
CacheDataRowAdapter.RowData.FULL;
- return rowStore.dataRow(hash, link, x);
+ return rowStore.dataRow(cacheId, hash, link, x);
}
/**
@@ -1590,7 +1710,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (data.nextLink() == 0) {
long addr = pageAddr + data.offset();
- if (grp.storeCacheId())
+ if (grp.storeCacheIdInDataPage())
addr += 4; // Skip cache id.
final int len = PageUtils.getInt(addr, 0);
@@ -1696,8 +1816,13 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param link Link.
* @return Search row.
*/
- private CacheSearchRow keySearchRow(int hash, long link) {
- return new DataRow(hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY);
+ private CacheSearchRow keySearchRow(int cacheId, int hash, long link) {
+ DataRow dataRow = new DataRow(hash, link, partId, CacheDataRowAdapter.RowData.KEY_ONLY);
+
+ if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup())
+ dataRow.cacheId(cacheId);
+
+ return dataRow;
}
/**
@@ -1706,20 +1831,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @param rowData Required row data.
* @return Data row.
*/
- private CacheDataRow dataRow(int hash, long link, CacheDataRowAdapter.RowData rowData) {
- return new DataRow(hash, link, partId, rowData);
- }
- }
+ private CacheDataRow dataRow(int cacheId, int hash, long link, CacheDataRowAdapter.RowData rowData) {
+ DataRow dataRow = new DataRow(hash, link, partId, rowData);
- /**
- * @param pageAddr Page address.
- * @param off Offset.
- * @param link Link.
- * @param hash Hash.
- */
- private static void store0(long pageAddr, int off, long link, int hash) {
- PageUtils.putLong(pageAddr, off, link);
- PageUtils.putInt(pageAddr, off + 8, hash);
+ if (dataRow.cacheId() == UNDEFINED_CACHE_ID && grp.sharedGroup())
+ dataRow.cacheId(cacheId);
+
+ return dataRow;
+ }
}
/**
@@ -1739,37 +1858,50 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @return Key hash code.
*/
public int getHash(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Cache ID or {@code 0} if cache ID is not defined.
+ */
+ public int getCacheId(long pageAddr, int idx);
}
/**
*
*/
- public static final class DataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO {
- /** */
- public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>(
- new DataInnerIO(1)
- );
-
+ private static abstract class AbstractDataInnerIO extends BPlusInnerIO<CacheSearchRow> implements RowLinkIO {
/**
+ * @param type Page type.
* @param ver Page format version.
+ * @param canGetRow If we can get full row from this page.
+ * @param itemSize Single item size on page.
*/
- DataInnerIO(int ver) {
- super(T_DATA_REF_INNER, ver, true, 12);
+ protected AbstractDataInnerIO(int type, int ver, boolean canGetRow, int itemSize) {
+ super(type, ver, canGetRow, itemSize);
}
/** {@inheritDoc} */
@Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) {
assert row.link() != 0;
- store0(pageAddr, off, row.link(), row.hash());
+ PageUtils.putLong(pageAddr, off, row.link());
+ PageUtils.putInt(pageAddr, off + 8, row.hash());
+
+ if (storeCacheId()) {
+ assert row.cacheId() != UNDEFINED_CACHE_ID : row;
+
+ PageUtils.putInt(pageAddr, off + 12, row.cacheId());
+ }
}
/** {@inheritDoc} */
@Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long pageAddr, int idx) {
+ int cacheId = getCacheId(pageAddr, idx);
int hash = getHash(pageAddr, idx);
long link = getLink(pageAddr, idx);
- return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link);
+ return ((CacheDataTree)tree).rowStore.keySearchRow(cacheId, hash, link);
}
/** {@inheritDoc} */
@@ -1777,8 +1909,18 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
int srcIdx) {
int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx);
long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
+ int off = offset(dstIdx);
+
+ PageUtils.putLong(dstPageAddr, off, link);
+ PageUtils.putInt(dstPageAddr, off + 8, hash);
+
+ if (storeCacheId()) {
+ int cacheId = ((RowLinkIO)srcIo).getCacheId(srcPageAddr, srcIdx);
- store0(dstPageAddr, offset(dstIdx), link, hash);
+ assert cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(dstPageAddr, off + 12, cacheId);
+ }
}
/** {@inheritDoc} */
@@ -1800,43 +1942,66 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
for (int i = 0; i < cnt; i++)
c.apply(new CacheDataRowAdapter(getLink(pageAddr, i)));
}
+
+ /**
+ * @return {@code True} if cache ID has to be stored.
+ */
+ protected abstract boolean storeCacheId();
}
/**
*
*/
- public static final class DataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO {
- /** */
- public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>(
- new DataLeafIO(1)
- );
-
+ private static abstract class AbstractDataLeafIO extends BPlusLeafIO<CacheSearchRow> implements RowLinkIO {
/**
+ * @param type Page type.
* @param ver Page format version.
+ * @param itemSize Single item size on page.
*/
- DataLeafIO(int ver) {
- super(T_DATA_REF_LEAF, ver, 12);
+ protected AbstractDataLeafIO(int type, int ver, int itemSize) {
+ super(type, ver, itemSize);
}
/** {@inheritDoc} */
@Override public void storeByOffset(long pageAddr, int off, CacheSearchRow row) {
assert row.link() != 0;
- store0(pageAddr, off, row.link(), row.hash());
+ PageUtils.putLong(pageAddr, off, row.link());
+ PageUtils.putInt(pageAddr, off + 8, row.hash());
+
+ if (storeCacheId()) {
+ assert row.cacheId() != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(pageAddr, off + 12, row.cacheId());
+ }
}
/** {@inheritDoc} */
@Override public void store(long dstPageAddr, int dstIdx, BPlusIO<CacheSearchRow> srcIo, long srcPageAddr,
int srcIdx) {
- store0(dstPageAddr, offset(dstIdx), getLink(srcPageAddr, srcIdx), getHash(srcPageAddr, srcIdx));
+ int hash = ((RowLinkIO)srcIo).getHash(srcPageAddr, srcIdx);
+ long link = ((RowLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
+ int off = offset(dstIdx);
+
+ PageUtils.putLong(dstPageAddr, off, link);
+ PageUtils.putInt(dstPageAddr, off + 8, hash);
+
+ if (storeCacheId()) {
+ int cacheId = ((RowLinkIO)srcIo).getCacheId(srcPageAddr, srcIdx);
+
+ assert cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(dstPageAddr, off + 12, cacheId);
+ }
}
/** {@inheritDoc} */
@Override public CacheSearchRow getLookupRow(BPlusTree<CacheSearchRow, ?> tree, long buf, int idx) {
+ int cacheId = getCacheId(buf, idx);
int hash = getHash(buf, idx);
long link = getLink(buf, idx);
- return ((CacheDataTree)tree).rowStore.keySearchRow(hash, link);
+ return ((CacheDataTree)tree).rowStore.keySearchRow(cacheId, hash, link);
}
/** {@inheritDoc} */
@@ -1858,6 +2023,119 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
for (int i = 0; i < cnt; i++)
c.apply(new CacheDataRowAdapter(getLink(pageAddr, i)));
}
+
+ /**
+ * @return {@code True} if cache ID has to be stored.
+ */
+ protected abstract boolean storeCacheId();
+ }
+
+ /**
+ *
+ */
+ public static final class DataInnerIO extends AbstractDataInnerIO {
+ /** */
+ public static final IOVersions<DataInnerIO> VERSIONS = new IOVersions<>(
+ new DataInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ DataInnerIO(int ver) {
+ super(T_DATA_REF_INNER, ver, true, 12);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return UNDEFINED_CACHE_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class DataLeafIO extends AbstractDataLeafIO {
+ /** */
+ public static final IOVersions<DataLeafIO> VERSIONS = new IOVersions<>(
+ new DataLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ DataLeafIO(int ver) {
+ super(T_DATA_REF_LEAF, ver, 12);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return UNDEFINED_CACHE_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class CacheIdAwareDataInnerIO extends AbstractDataInnerIO {
+ /** */
+ public static final IOVersions<CacheIdAwareDataInnerIO> VERSIONS = new IOVersions<>(
+ new CacheIdAwareDataInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ CacheIdAwareDataInnerIO(int ver) {
+ super(T_CACHE_ID_AWARE_DATA_REF_INNER, ver, true, 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 12);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class CacheIdAwareDataLeafIO extends AbstractDataLeafIO {
+ /** */
+ public static final IOVersions<CacheIdAwareDataLeafIO> VERSIONS = new IOVersions<>(
+ new CacheIdAwareDataLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ CacheIdAwareDataLeafIO(int ver) {
+ super(T_CACHE_ID_AWARE_DATA_REF_LEAF, ver, 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 12);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return true;
+ }
}
/**
@@ -1870,33 +2148,48 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
/** Link. */
private long link;
+ /** Cache ID. */
+ private int cacheId;
+
/** */
private KeyCacheObject key;
/**
+ * Creates a new instance which represents an upper or lower bound
+ * inside a logical cache.
+ *
+ * @param cacheId Cache ID.
+ */
+ public PendingRow(int cacheId) {
+ this.cacheId = cacheId;
+ }
+
+ /**
+ * @param cacheId Cache ID.
* @param expireTime Expire time.
* @param link Link
*/
- PendingRow(long expireTime, long link) {
+ PendingRow(int cacheId, long expireTime, long link) {
assert expireTime != 0;
+ this.cacheId = cacheId;
this.expireTime = expireTime;
this.link = link;
}
/**
* @param grp Cache group.
+ * @param cacheId Cache ID.
* @param expireTime Expire time.
* @param link Link.
* @return Row.
* @throws IgniteCheckedException If failed.
*/
- static PendingRow createRowWithKey(CacheGroupInfrastructure grp, long expireTime, long link)
+ static PendingRow createRowWithKey(CacheGroupInfrastructure grp, int cacheId, long expireTime, long link)
throws IgniteCheckedException {
- PendingRow row = new PendingRow(expireTime, link);
+ PendingRow row = new PendingRow(cacheId, expireTime, link);
CacheDataRowAdapter rowData = new CacheDataRowAdapter(link);
-
rowData.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
row.key = rowData.key();
@@ -1940,8 +2233,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
- PendingEntryInnerIO.VERSIONS,
- PendingEntryLeafIO.VERSIONS);
+ grp.sharedGroup() ? CacheIdAwarePendingEntryInnerIO.VERSIONS : PendingEntryInnerIO.VERSIONS,
+ grp.sharedGroup() ? CacheIdAwarePendingEntryLeafIO.VERSIONS : PendingEntryLeafIO.VERSIONS);
this.grp = grp;
@@ -1949,11 +2242,34 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
}
/** {@inheritDoc} */
- @Override protected int compare(BPlusIO<PendingRow> io, long pageAddr, int idx, PendingRow row)
+ @Override protected int compare(BPlusIO<PendingRow> iox, long pageAddr, int idx, PendingRow row)
throws IgniteCheckedException {
- long expireTime = ((PendingRowIO)io).getExpireTime(pageAddr, idx);
+ PendingRowIO io = (PendingRowIO)iox;
- int cmp = Long.compare(expireTime, row.expireTime);
+ int cmp;
+
+ if (grp.sharedGroup()) {
+ assert row.cacheId != UNDEFINED_CACHE_ID : "Cache ID is not provided!";
+ assert io.getCacheId(pageAddr, idx) != UNDEFINED_CACHE_ID : "Cache ID is not stored!";
+
+ cmp = Integer.compare(io.getCacheId(pageAddr, idx), row.cacheId);
+
+ if (cmp != 0)
+ return cmp;
+
+ if(cmp == 0 && row.expireTime == 0 && row.link == 0) {
+ // A search row with a cach ID only is used as a cache bound.
+ // The found position will be shifted until the exact cache bound is found;
+ // See for details:
+ // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound()
+ // o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findUpperBound()
+ return cmp;
+ }
+ }
+
+ long expireTime = io.getExpireTime(pageAddr, idx);
+
+ cmp = Long.compare(expireTime, row.expireTime);
if (cmp != 0)
return cmp;
@@ -1961,7 +2277,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
if (row.link == 0L)
return 0;
- long link = ((PendingRowIO)io).getLink(pageAddr, idx);
+ long link = io.getLink(pageAddr, idx);
return Long.compare(link, row.link);
}
@@ -1990,22 +2306,25 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
* @return Link.
*/
long getLink(long pageAddr, int idx);
- }
- /**
- *
- */
- public static class PendingEntryInnerIO extends BPlusInnerIO<PendingRow> implements PendingRowIO {
- /** */
- public static final IOVersions<PendingEntryInnerIO> VERSIONS = new IOVersions<>(
- new PendingEntryInnerIO(1)
- );
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Cache ID or {@code 0} if Cache ID is not defined.
+ */
+ int getCacheId(long pageAddr, int idx);
+ }
+ /** */
+ private static abstract class AbstractPendingEntryInnerIO extends BPlusInnerIO<PendingRow> implements PendingRowIO {
/**
+ * @param type Page type.
* @param ver Page format version.
+ * @param canGetRow If we can get full row from this page.
+ * @param itemSize Single item size on page.
*/
- PendingEntryInnerIO(int ver) {
- super(T_PENDING_REF_INNER, ver, true, 8 + 8);
+ protected AbstractPendingEntryInnerIO(int type, int ver, boolean canGetRow, int itemSize) {
+ super(type, ver, canGetRow, itemSize);
}
/** {@inheritDoc} */
@@ -2015,6 +2334,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
PageUtils.putLong(pageAddr, off, row.expireTime);
PageUtils.putLong(pageAddr, off + 8, row.link);
+
+ if (storeCacheId()) {
+ assert row.cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(pageAddr, off + 16, row.cacheId);
+ }
}
/** {@inheritDoc} */
@@ -2030,12 +2355,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
PageUtils.putLong(dstPageAddr, dstOff, expireTime);
PageUtils.putLong(dstPageAddr, dstOff + 8, link);
+
+ if (storeCacheId()) {
+ int cacheId = ((PendingRowIO)srcIo).getCacheId(srcPageAddr, srcIdx);
+
+ assert cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(dstPageAddr, dstOff + 16, cacheId);
+ }
}
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
+ getCacheId(pageAddr, idx),
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
@@ -2049,22 +2383,22 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public long getLink(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 8);
}
- }
- /**
- *
- */
- public static class PendingEntryLeafIO extends BPlusLeafIO<PendingRow> implements PendingRowIO {
- /** */
- public static final IOVersions<PendingEntryLeafIO> VERSIONS = new IOVersions<>(
- new PendingEntryLeafIO(1)
- );
+ /**
+ * @return {@code True} if cache ID has to be stored.
+ */
+ protected abstract boolean storeCacheId();
+ }
+ /** */
+ private static abstract class AbstractPendingEntryLeafIO extends BPlusLeafIO<PendingRow> implements PendingRowIO {
/**
+ * @param type Page type.
* @param ver Page format version.
+ * @param itemSize Single item size on page.
*/
- PendingEntryLeafIO(int ver) {
- super(T_PENDING_REF_LEAF, ver, 8 + 8);
+ protected AbstractPendingEntryLeafIO(int type, int ver, int itemSize) {
+ super(type, ver, itemSize);
}
/** {@inheritDoc} */
@@ -2074,6 +2408,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
PageUtils.putLong(pageAddr, off, row.expireTime);
PageUtils.putLong(pageAddr, off + 8, row.link);
+
+ if (storeCacheId()) {
+ assert row.cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(pageAddr, off + 16, row.cacheId);
+ }
}
/** {@inheritDoc} */
@@ -2089,12 +2429,21 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
PageUtils.putLong(dstPageAddr, dstOff, expireTime);
PageUtils.putLong(dstPageAddr, dstOff + 8, link);
+
+ if (storeCacheId()) {
+ int cacheId = ((PendingRowIO)srcIo).getCacheId(srcPageAddr, srcIdx);
+
+ assert cacheId != UNDEFINED_CACHE_ID;
+
+ PageUtils.putInt(dstPageAddr, dstOff + 16, cacheId);
+ }
}
/** {@inheritDoc} */
@Override public PendingRow getLookupRow(BPlusTree<PendingRow, ?> tree, long pageAddr, int idx)
throws IgniteCheckedException {
return PendingRow.createRowWithKey(((PendingEntriesTree)tree).grp,
+ getCacheId(pageAddr, idx),
getExpireTime(pageAddr, idx),
getLink(pageAddr, idx));
}
@@ -2108,5 +2457,118 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager
@Override public long getLink(long pageAddr, int idx) {
return PageUtils.getLong(pageAddr, offset(idx) + 8);
}
+
+ /**
+ * @return {@code True} if cache ID has to be stored.
+ */
+ protected abstract boolean storeCacheId();
+ }
+
+ /**
+ *
+ */
+ public static final class PendingEntryInnerIO extends AbstractPendingEntryInnerIO {
+ /** */
+ public static final IOVersions<PendingEntryInnerIO> VERSIONS = new IOVersions<>(
+ new PendingEntryInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ PendingEntryInnerIO(int ver) {
+ super(T_PENDING_REF_INNER, ver, true, 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return UNDEFINED_CACHE_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class PendingEntryLeafIO extends AbstractPendingEntryLeafIO {
+ /** */
+ public static final IOVersions<PendingEntryLeafIO> VERSIONS = new IOVersions<>(
+ new PendingEntryLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ PendingEntryLeafIO(int ver) {
+ super(T_PENDING_REF_LEAF, ver, 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return UNDEFINED_CACHE_ID;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return false;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class CacheIdAwarePendingEntryInnerIO extends AbstractPendingEntryInnerIO {
+ /** */
+ public static final IOVersions<CacheIdAwarePendingEntryInnerIO> VERSIONS = new IOVersions<>(
+ new CacheIdAwarePendingEntryInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ CacheIdAwarePendingEntryInnerIO(int ver) {
+ super(T_PENDING_REF_INNER, ver, true, 20);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return true;
+ }
+ }
+
+ /**
+ *
+ */
+ public static final class CacheIdAwarePendingEntryLeafIO extends AbstractPendingEntryLeafIO {
+ /** */
+ public static final IOVersions<CacheIdAwarePendingEntryLeafIO> VERSIONS = new IOVersions<>(
+ new CacheIdAwarePendingEntryLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ CacheIdAwarePendingEntryLeafIO(int ver) {
+ super(T_PENDING_REF_LEAF, ver, 20);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int getCacheId(long pageAddr, int idx) {
+ return PageUtils.getInt(pageAddr, offset(idx) + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean storeCacheId() {
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
index e0076d5..cc26b21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRow.java
@@ -36,11 +36,6 @@ public interface CacheDataRow extends CacheSearchRow {
public GridCacheVersion version();
/**
- * @return Cache id. Stored only if memory policy with configured per-page eviction is used.
- */
- public int cacheId();
-
- /**
* @return Expire time.
*/
public long expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index f5abd78..de9bdbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -19,14 +19,12 @@ package org.apache.ignite.internal.processors.cache.database;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.processors.cache.CacheGroupInfrastructure;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.IncompleteObject;
@@ -121,13 +119,9 @@ public class CacheDataRowAdapter implements CacheDataRow {
assert link != 0 : "link";
assert key == null : "key";
- CacheObjectContext coctx = null;
+ CacheObjectContext coctx = grp != null ? grp.cacheObjectContext() : null;
- if (grp != null) {
- cacheId = !grp.storeCacheId() ? -1 : 0; // Skip cacheId reading if it is not needed.
-
- coctx = grp.cacheObjectContext();
- }
+ boolean readCacheId = grp == null || grp.storeCacheIdInDataPage();
long nextLink = link;
IncompleteObject<?> incomplete = null;
@@ -135,6 +129,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
do {
final long pageId = pageId(nextLink);
+
+ // TODO IGNITE-5075 Here must be "physical" cache ID (aka group ID)
final long page = pageMem.acquirePage(cacheId, pageId);
try {
@@ -154,7 +150,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (first) {
if (nextLink == 0) {
// Fast path for a single page row.
- readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData);
+ readFullRow(sharedCtx, coctx, pageAddr + data.offset(), rowData, readCacheId);
return;
}
@@ -169,7 +165,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
boolean keyOnly = rowData == RowData.KEY_ONLY;
- incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, incomplete);
+ incomplete = readFragment(sharedCtx, coctx, buf, keyOnly, readCacheId, incomplete);
if (keyOnly && key != null)
return;
@@ -191,6 +187,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param coctx Cache object context.
* @param buf Buffer.
* @param keyOnly {@code true} If need to read only key object.
+ * @param readCacheId {@code true} If need to read cache ID.
* @param incomplete Incomplete object.
* @throws IgniteCheckedException If failed.
* @return Read object.
@@ -200,9 +197,10 @@ public class CacheDataRowAdapter implements CacheDataRow {
CacheObjectContext coctx,
ByteBuffer buf,
boolean keyOnly,
+ boolean readCacheId,
IncompleteObject<?> incomplete
) throws IgniteCheckedException {
- if (cacheId == 0) {
+ if (readCacheId && cacheId == 0) {
incomplete = readIncompleteCacheId(buf, incomplete);
if (cacheId == 0)
@@ -212,6 +210,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
if (coctx == null)
+ // TODO IGNITE-5075 Possible null pointer in case cacheId is not stored
coctx = sharedCtx.cacheContext(cacheId).cacheObjectContext();
// Read key.
@@ -254,17 +253,19 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param coctx Cache object context.
* @param addr Address.
* @param rowData Required row data.
+ * @param readCacheId {@code true} If need to read cache ID.
* @throws IgniteCheckedException If failed.
*/
private void readFullRow(
GridCacheSharedContext<?, ?> sharedCtx,
CacheObjectContext coctx,
long addr,
- RowData rowData)
+ RowData rowData,
+ boolean readCacheId)
throws IgniteCheckedException {
int off = 0;
- if (cacheId == 0) {
+ if (readCacheId) {
cacheId = PageUtils.getInt(addr, off);
off += 4;
@@ -328,6 +329,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
if (remaining >= size) {
cacheId = buf.getInt();
+ assert cacheId != 0;
+
return null;
}
@@ -342,6 +345,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
timeBuf.order(buf.order());
cacheId = timeBuf.getInt();
+
+ assert cacheId != 0;
}
return incomplete;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
index d51cf0e..6e429c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheSearchRow.java
@@ -37,4 +37,9 @@ public interface CacheSearchRow {
* @return Key hash code.
*/
public int hash();
+
+ /**
+ * @return Cache ID or {@code 0} if cache ID is not defined.
+ */
+ public int cacheId();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index 2586696..927446b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -157,6 +157,18 @@ public abstract class PageIO {
/** */
public static final short T_PAGE_UPDATE_TRACKING = 15;
+ /** */
+ public static final short T_CACHE_ID_AWARE_DATA_REF_INNER = 16;
+
+ /** */
+ public static final short T_CACHE_ID_AWARE_DATA_REF_LEAF = 17;
+
+ /** */
+ public static final short T_CACHE_ID_AWARE_PENDING_REF_INNER = 18;
+
+ /** */
+ public static final short T_CACHE_ID_AWARE_PENDING_REF_LEAF = 19;
+
/** Index for payload == 1. */
public static final short T_H2_EX_REF_LEAF_START = 10000;
@@ -484,6 +496,12 @@ public abstract class PageIO {
case T_DATA_REF_LEAF:
return (Q)IgniteCacheOffheapManagerImpl.DataLeafIO.VERSIONS.forVersion(ver);
+ case T_CACHE_ID_AWARE_DATA_REF_INNER:
+ return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataInnerIO.VERSIONS.forVersion(ver);
+
+ case T_CACHE_ID_AWARE_DATA_REF_LEAF:
+ return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwareDataLeafIO.VERSIONS.forVersion(ver);
+
case T_METASTORE_INNER:
return (Q)MetadataStorage.MetaStoreInnerIO.VERSIONS.forVersion(ver);
@@ -496,6 +514,12 @@ public abstract class PageIO {
case T_PENDING_REF_LEAF:
return (Q)IgniteCacheOffheapManagerImpl.PendingEntryLeafIO.VERSIONS.forVersion(ver);
+ case T_CACHE_ID_AWARE_PENDING_REF_INNER:
+ return (Q) IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryInnerIO.VERSIONS.forVersion(ver);
+
+ case T_CACHE_ID_AWARE_PENDING_REF_LEAF:
+ return (Q)IgniteCacheOffheapManagerImpl.CacheIdAwarePendingEntryLeafIO.VERSIONS.forVersion(ver);
+
default:
// For tests.
if (innerTestIO != null && innerTestIO.getType() == type && innerTestIO.getVersion() == ver)
http://git-wip-us.apache.org/repos/asf/ignite/blob/e84d4a2d/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 9292b5b..4a73dad 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -108,7 +108,7 @@ public class H2PkHashIndex extends GridH2IndexBase {
List<GridCursor<? extends CacheDataRow>> cursors = new ArrayList<>();
for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores())
- cursors.add(store.cursor(lowerObj, upperObj));
+ cursors.add(store.cursor(cctx.cacheId(), lowerObj, upperObj));
return new H2Cursor(new CompositeGridCursor<>(cursors.iterator()), p);
}