You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/10/06 15:20:07 UTC
[3/4] ignite git commit: IGNITE-5849 Introduced meta store
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
new file mode 100644
index 0000000..88d19e8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -0,0 +1,591 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.persistence.Storable;
+import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ */
+public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList {
+ /** */
+ private static final int BUCKETS = 256; // Must be power of 2.
+
+ /** */
+ private static final int REUSE_BUCKET = BUCKETS - 1;
+
+ /** */
+ private static final Integer COMPLETE = Integer.MAX_VALUE;
+
+ /** */
+ private static final Integer FAIL_I = Integer.MIN_VALUE;
+
+ /** */
+ private static final Long FAIL_L = Long.MAX_VALUE;
+
+ /** */
+ private static final int MIN_PAGE_FREE_SPACE = 8;
+
+ /** */
+ private final int shift;
+
+ /** */
+ private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
+
+ /** */
+ private final int MIN_SIZE_FOR_DATA_PAGE;
+
+ /** */
+ private final int emptyDataPagesBucket;
+
+ /** */
+ private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler();
+
+ /** */
+ private final MemoryMetricsImpl memMetrics;
+
+ /** */
+ private final PageEvictionTracker evictionTracker;
+
+ /**
+ *
+ */
+ private final class UpdateRowHandler extends PageHandler<T, Boolean> {
+ @Override public Boolean run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ T row,
+ int itemId)
+ throws IgniteCheckedException {
+ AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+ int rowSize = io.getRowSize(row);
+
+ boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
+
+ evictionTracker.touchPage(pageId);
+
+ if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
+ // TODO This record must contain only a reference to a logical WAL record with the actual data.
+ byte[] payload = new byte[rowSize];
+
+ DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
+
+ assert data.payloadSize() == rowSize;
+
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+ wal.log(new DataPageUpdateRecord(
+ cacheId,
+ pageId,
+ itemId,
+ payload));
+ }
+
+ return updated;
+ }
+ }
+
+ /** */
+ private final PageHandler<T, Integer> writeRow = new WriteRowHandler();
+
+ /**
+ *
+ */
+ private final class WriteRowHandler extends PageHandler<T, Integer> {
+ @Override public Integer run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ T row,
+ int written)
+ throws IgniteCheckedException {
+ AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+ int rowSize = io.getRowSize(row);
+ int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+ assert oldFreeSpace > 0 : oldFreeSpace;
+
+ // If the full row does not fit into this page write only a fragment.
+ written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) :
+ addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
+
+ // Reread free space after update.
+ int newFreeSpace = io.getFreeSpace(pageAddr);
+
+ if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+ int bucket = bucket(newFreeSpace, false);
+
+ put(null, pageId, page, pageAddr, bucket);
+ }
+
+ if (written == rowSize)
+ evictionTracker.touchPage(pageId);
+
+ // Avoid boxing with garbage generation for usual case.
+ return written == rowSize ? COMPLETE : written;
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param pageAddr Page address.
+ * @param io IO.
+ * @param row Row.
+ * @param rowSize Row size.
+ * @return Written size which is always equal to row size here.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int addRow(
+ long pageId,
+ long page,
+ long pageAddr,
+ AbstractDataPageIO<T> io,
+ T row,
+ int rowSize
+ ) throws IgniteCheckedException {
+ io.addRow(pageAddr, row, rowSize, pageSize());
+
+ if (needWalDeltaRecord(pageId, page, null)) {
+ // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
+ byte[] payload = new byte[rowSize];
+
+ DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+ assert data.payloadSize() == rowSize;
+
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+ wal.log(new DataPageInsertRecord(
+ grpId,
+ pageId,
+ payload));
+ }
+
+ return rowSize;
+ }
+
+ /**
+ * @param pageId Page ID.
+ * @param page Page pointer.
+ * @param pageAddr Page address.
+ * @param io IO.
+ * @param row Row.
+ * @param written Written size.
+ * @param rowSize Row size.
+ * @return Updated written size.
+ * @throws IgniteCheckedException If failed.
+ */
+ private int addRowFragment(
+ long pageId,
+ long page,
+ long pageAddr,
+ AbstractDataPageIO<T> io,
+ T row,
+ int written,
+ int rowSize
+ ) throws IgniteCheckedException {
+ // Read last link before the fragment write, because it will be updated there.
+ long lastLink = row.link();
+
+ int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
+
+ assert payloadSize > 0 : payloadSize;
+
+ if (needWalDeltaRecord(pageId, page, null)) {
+ // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
+ byte[] payload = new byte[payloadSize];
+
+ DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
+
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
+
+ wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink));
+ }
+
+ return written + payloadSize;
+ }
+ }
+
+ /** */
+ private final PageHandler<Void, Long> rmvRow = new RemoveRowHandler();
+
+ /**
+ *
+ */
+ private final class RemoveRowHandler extends PageHandler<Void, Long> {
+ @Override public Long run(
+ int cacheId,
+ long pageId,
+ long page,
+ long pageAddr,
+ PageIO iox,
+ Boolean walPlc,
+ Void ignored,
+ int itemId)
+ throws IgniteCheckedException {
+ AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
+
+ int oldFreeSpace = io.getFreeSpace(pageAddr);
+
+ assert oldFreeSpace >= 0 : oldFreeSpace;
+
+ long nextLink = io.removeRow(pageAddr, itemId, pageSize());
+
+ if (needWalDeltaRecord(pageId, page, walPlc))
+ wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
+
+ int newFreeSpace = io.getFreeSpace(pageAddr);
+
+ if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
+ int newBucket = bucket(newFreeSpace, false);
+
+ if (oldFreeSpace > MIN_PAGE_FREE_SPACE) {
+ int oldBucket = bucket(oldFreeSpace, false);
+
+ if (oldBucket != newBucket) {
+ // It is possible that page was concurrently taken for put, in this case put will handle bucket change.
+ if (removeDataPage(pageId, page, pageAddr, io, oldBucket))
+ put(null, pageId, page, pageAddr, newBucket);
+ }
+ }
+ else
+ put(null, pageId, page, pageAddr, newBucket);
+
+ if (io.isEmpty(pageAddr))
+ evictionTracker.forgetPage(pageId);
+ }
+
+ // For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
+ return nextLink;
+ }
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param name Name (for debug purpose).
+ * @param memMetrics Memory metrics.
+ * @param memPlc Memory policy.
+ * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
+ * @param wal Write ahead log manager.
+ * @param metaPageId Metadata page ID.
+ * @param initNew {@code True} if new metadata should be initialized.
+ * @throws IgniteCheckedException If failed.
+ */
+ public AbstractFreeList(
+ int cacheId,
+ String name,
+ MemoryMetricsImpl memMetrics,
+ MemoryPolicy memPlc,
+ ReuseList reuseList,
+ IgniteWriteAheadLogManager wal,
+ long metaPageId,
+ boolean initNew) throws IgniteCheckedException {
+ super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId);
+ this.evictionTracker = memPlc.evictionTracker();
+ this.reuseList = reuseList == null ? this : reuseList;
+ int pageSize = pageMem.pageSize();
+
+ assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
+ assert U.isPow2(BUCKETS);
+ assert BUCKETS <= pageSize : pageSize;
+
+ // TODO this constant is used because currently we cannot reuse data pages as index pages
+ // TODO and vice-versa. It should be removed when data storage format is finalized.
+ MIN_SIZE_FOR_DATA_PAGE = pageSize - AbstractDataPageIO.MIN_DATA_PAGE_OVERHEAD;
+
+ int shift = 0;
+
+ while (pageSize > BUCKETS) {
+ shift++;
+ pageSize >>>= 1;
+ }
+
+ this.shift = shift;
+
+ this.memMetrics = memMetrics;
+
+ emptyDataPagesBucket = bucket(MIN_SIZE_FOR_DATA_PAGE, false);
+
+ init(metaPageId, initNew);
+ }
+
+ /**
+ * Calculates average fill factor over FreeListImpl instance.
+ *
+ * @return Tuple (numenator, denominator).
+ */
+ public T2<Long, Long> fillFactor() {
+ long pageSize = pageSize();
+
+ long totalSize = 0;
+ long loadSize = 0;
+
+ for (int b = BUCKETS - 2; b > 0; b--) {
+ long bsize = pageSize - ((REUSE_BUCKET - b) << shift);
+
+ long pages = bucketsSize[b].longValue();
+
+ loadSize += pages * (pageSize - bsize);
+
+ totalSize += pages * pageSize;
+ }
+
+ return totalSize == 0 ? new T2<>(0L, 0L) : new T2<>(loadSize, totalSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void dumpStatistics(IgniteLogger log) {
+ long dataPages = 0;
+
+ final boolean dumpBucketsInfo = false;
+
+ for (int b = 0; b < BUCKETS; b++) {
+ long size = bucketsSize[b].longValue();
+
+ if (!isReuseBucket(b))
+ dataPages += size;
+
+ if (dumpBucketsInfo) {
+ Stripe[] stripes = getBucket(b);
+
+ boolean empty = true;
+
+ if (stripes != null) {
+ for (Stripe stripe : stripes) {
+ if (!stripe.empty) {
+ empty = false;
+
+ break;
+ }
+ }
+ }
+
+ if (log.isInfoEnabled())
+ log.info("Bucket [b=" + b +
+ ", size=" + size +
+ ", stripes=" + (stripes != null ? stripes.length : 0) +
+ ", stripesEmpty=" + empty + ']');
+ }
+ }
+
+ if (dataPages > 0) {
+ if (log.isInfoEnabled())
+ log.info("FreeList [name=" + name +
+ ", buckets=" + BUCKETS +
+ ", dataPages=" + dataPages +
+ ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
+ }
+ }
+
+ /**
+ * @param freeSpace Page free space.
+ * @param allowReuse {@code True} if it is allowed to get reuse bucket.
+ * @return Bucket.
+ */
+ private int bucket(int freeSpace, boolean allowReuse) {
+ assert freeSpace > 0 : freeSpace;
+
+ int bucket = freeSpace >>> shift;
+
+ assert bucket >= 0 && bucket < BUCKETS : bucket;
+
+ if (!allowReuse && isReuseBucket(bucket))
+ bucket--;
+
+ return bucket;
+ }
+
+ /**
+ * @param part Partition.
+ * @return Page ID.
+ * @throws IgniteCheckedException If failed.
+ */
+ private long allocateDataPage(int part) throws IgniteCheckedException {
+ assert part <= PageIdAllocator.MAX_PARTITION_ID;
+ assert part != PageIdAllocator.INDEX_PARTITION;
+
+ return pageMem.allocatePage(grpId, part, PageIdAllocator.FLAG_DATA);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void insertDataRow(T row) throws IgniteCheckedException {
+ int rowSize = ioVersions().latest().getRowSize(row);
+
+ int written = 0;
+
+ do {
+ if (written != 0)
+ memMetrics.incrementLargeEntriesPages();
+
+ int freeSpace = Math.min(MIN_SIZE_FOR_DATA_PAGE, rowSize - written);
+
+ long pageId = 0L;
+
+ if (freeSpace == MIN_SIZE_FOR_DATA_PAGE)
+ pageId = takeEmptyPage(emptyDataPagesBucket, ioVersions());
+
+ boolean reuseBucket = false;
+
+ // TODO: properly handle reuse bucket.
+ if (pageId == 0L) {
+ for (int b = bucket(freeSpace, false) + 1; b < BUCKETS - 1; b++) {
+ pageId = takeEmptyPage(b, ioVersions());
+
+ if (pageId != 0L) {
+ reuseBucket = isReuseBucket(b);
+
+ break;
+ }
+ }
+ }
+
+ boolean allocated = pageId == 0L;
+
+ if (allocated)
+ pageId = allocateDataPage(row.partition());
+
+ AbstractDataPageIO<T> init = reuseBucket || allocated ? ioVersions().latest() : null;
+
+ written = write(pageId, writeRow, init, row, written, FAIL_I);
+
+ assert written != FAIL_I; // We can't fail here.
+ }
+ while (written != COMPLETE);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean updateDataRow(long link, T row) throws IgniteCheckedException {
+ assert link != 0;
+
+ long pageId = PageIdUtils.pageId(link);
+ int itemId = PageIdUtils.itemId(link);
+
+ Boolean updated = write(pageId, updateRow, row, itemId, null);
+
+ assert updated != null; // Can't fail here.
+
+ return updated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
+ assert link != 0;
+
+ long pageId = PageIdUtils.pageId(link);
+ int itemId = PageIdUtils.itemId(link);
+
+ long nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+ assert nextLink != FAIL_L; // Can't fail here.
+
+ while (nextLink != 0L) {
+ memMetrics.decrementLargeEntriesPages();
+
+ itemId = PageIdUtils.itemId(nextLink);
+ pageId = PageIdUtils.pageId(nextLink);
+
+ nextLink = write(pageId, rmvRow, itemId, FAIL_L);
+
+ assert nextLink != FAIL_L; // Can't fail here.
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Stripe[] getBucket(int bucket) {
+ return buckets.get(bucket);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
+ return buckets.compareAndSet(bucket, exp, upd);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean isReuseBucket(int bucket) {
+ return bucket == REUSE_BUCKET;
+ }
+
+ /**
+ * @return Number of empty data pages in free list.
+ */
+ public int emptyDataPages() {
+ return bucketsSize[emptyDataPagesBucket].intValue();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
+ assert reuseList == this : "not allowed to be a reuse list";
+
+ put(bag, 0, 0, 0L, REUSE_BUCKET);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long takeRecycledPage() throws IgniteCheckedException {
+ assert reuseList == this : "not allowed to be a reuse list";
+
+ return takeEmptyPage(REUSE_BUCKET, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long recycledPagesCount() throws IgniteCheckedException {
+ assert reuseList == this : "not allowed to be a reuse list";
+
+ return storedPagesCount(REUSE_BUCKET);
+ }
+
+ /**
+ * @return IOVersions.
+ */
+ public abstract IOVersions<? extends AbstractDataPageIO<T>> ioVersions();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "FreeList [name=" + name + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
new file mode 100644
index 0000000..d0a48d5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/CacheFreeListImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.freelist;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+
+/**
+ * FreeList implementation for cache.
+ */
+public class CacheFreeListImpl extends AbstractFreeList<CacheDataRow> {
+ /** {@inheritDoc} */
+ public CacheFreeListImpl(int cacheId, String name, MemoryMetricsImpl memMetrics, MemoryPolicy memPlc,
+ ReuseList reuseList,
+ IgniteWriteAheadLogManager wal, long metaPageId, boolean initNew) throws IgniteCheckedException {
+ super(cacheId, name, memMetrics, memPlc, reuseList, wal, metaPageId, initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IOVersions<? extends AbstractDataPageIO<CacheDataRow>> ioVersions() {
+ return DataPageIO.VERSIONS;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "FreeList [name=" + name + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
index d2f0099..bdca21c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
@@ -19,16 +19,16 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
+import org.apache.ignite.internal.processors.cache.persistence.Storable;
/**
*/
-public interface FreeList {
+public interface FreeList<T extends Storable> {
/**
* @param row Row.
* @throws IgniteCheckedException If failed.
*/
- public void insertDataRow(CacheDataRow row) throws IgniteCheckedException;
+ public void insertDataRow(T row) throws IgniteCheckedException;
/**
* @param link Row link.
@@ -36,7 +36,7 @@ public interface FreeList {
* @return {@code True} if was able to update row.
* @throws IgniteCheckedException If failed.
*/
- public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException;
+ public boolean updateDataRow(long link, T row) throws IgniteCheckedException;
/**
* @param link Row link.
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
deleted file mode 100644
index 3eb62ae..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeListImpl.java
+++ /dev/null
@@ -1,605 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.freelist;
-
-import java.util.concurrent.atomic.AtomicReferenceArray;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
-import org.apache.ignite.internal.pagemem.PageIdUtils;
-import org.apache.ignite.internal.pagemem.PageUtils;
-import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
-import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
-import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
-import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
-import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- */
-public class FreeListImpl extends PagesList implements FreeList, ReuseList {
- /** */
- private static final int BUCKETS = 256; // Must be power of 2.
-
- /** */
- private static final int REUSE_BUCKET = BUCKETS - 1;
-
- /** */
- private static final Integer COMPLETE = Integer.MAX_VALUE;
-
- /** */
- private static final Integer FAIL_I = Integer.MIN_VALUE;
-
- /** */
- private static final Long FAIL_L = Long.MAX_VALUE;
-
- /** */
- private static final int MIN_PAGE_FREE_SPACE = 8;
-
- /** */
- private final int shift;
-
- /** */
- private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
-
- /** */
- private final int MIN_SIZE_FOR_DATA_PAGE;
-
- /** */
- private final int emptyDataPagesBucket;
-
- /** */
- private final PageHandler<CacheDataRow, Boolean> updateRow = new UpdateRowHandler();
-
- /** */
- private final MemoryMetricsImpl memMetrics;
-
- /** */
- private final PageEvictionTracker evictionTracker;
-
- /**
- *
- */
- private final class UpdateRowHandler extends PageHandler<CacheDataRow, Boolean> {
- @Override public Boolean run(
- int cacheId,
- long pageId,
- long page,
- long pageAddr,
- PageIO iox,
- Boolean walPlc,
- CacheDataRow row,
- int itemId)
- throws IgniteCheckedException {
- DataPageIO io = (DataPageIO)iox;
-
- int rowSize = getRowSize(row, row.cacheId() != 0);
-
- boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
-
- evictionTracker.touchPage(pageId);
-
- if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
- // TODO This record must contain only a reference to a logical WAL record with the actual data.
- byte[] payload = new byte[rowSize];
-
- DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
-
- assert data.payloadSize() == rowSize;
-
- PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
-
- wal.log(new DataPageUpdateRecord(
- cacheId,
- pageId,
- itemId,
- payload));
- }
-
- return updated;
- }
- }
-
- /** */
- private final PageHandler<CacheDataRow, Integer> writeRow = new WriteRowHandler();
-
- /**
- *
- */
- private final class WriteRowHandler extends PageHandler<CacheDataRow, Integer> {
- @Override public Integer run(
- int cacheId,
- long pageId,
- long page,
- long pageAddr,
- PageIO iox,
- Boolean walPlc,
- CacheDataRow row,
- int written)
- throws IgniteCheckedException {
- DataPageIO io = (DataPageIO)iox;
-
- int rowSize = getRowSize(row, row.cacheId() != 0);
- int oldFreeSpace = io.getFreeSpace(pageAddr);
-
- assert oldFreeSpace > 0 : oldFreeSpace;
-
- // If the full row does not fit into this page write only a fragment.
- written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize):
- addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
-
- // Reread free space after update.
- int newFreeSpace = io.getFreeSpace(pageAddr);
-
- if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
- int bucket = bucket(newFreeSpace, false);
-
- put(null, pageId, page, pageAddr, bucket);
- }
-
- if (written == rowSize)
- evictionTracker.touchPage(pageId);
-
- // Avoid boxing with garbage generation for usual case.
- return written == rowSize ? COMPLETE : written;
- }
-
- /**
- * @param pageId Page ID.
- * @param page Page pointer.
- * @param pageAddr Page address.
- * @param io IO.
- * @param row Row.
- * @param rowSize Row size.
- * @return Written size which is always equal to row size here.
- * @throws IgniteCheckedException If failed.
- */
- private int addRow(
- long pageId,
- long page,
- long pageAddr,
- DataPageIO io,
- CacheDataRow row,
- int rowSize
- ) throws IgniteCheckedException {
- io.addRow(pageAddr, row, rowSize, pageSize());
-
- if (needWalDeltaRecord(pageId, page, null)) {
- // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
- byte[] payload = new byte[rowSize];
-
- DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
-
- assert data.payloadSize() == rowSize;
-
- PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
-
- wal.log(new DataPageInsertRecord(
- grpId,
- pageId,
- payload));
- }
-
- return rowSize;
- }
-
- /**
- * @param pageId Page ID.
- * @param page Page pointer.
- * @param pageAddr Page address.
- * @param io IO.
- * @param row Row.
- * @param written Written size.
- * @param rowSize Row size.
- * @return Updated written size.
- * @throws IgniteCheckedException If failed.
- */
- private int addRowFragment(
- long pageId,
- long page,
- long pageAddr,
- DataPageIO io,
- CacheDataRow row,
- int written,
- int rowSize
- ) throws IgniteCheckedException {
- // Read last link before the fragment write, because it will be updated there.
- long lastLink = row.link();
-
- int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
-
- assert payloadSize > 0 : payloadSize;
-
- if (needWalDeltaRecord(pageId, page, null)) {
- // TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
- byte[] payload = new byte[payloadSize];
-
- DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
-
- PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
-
- wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink));
- }
-
- return written + payloadSize;
- }
- }
-
-
- /** */
- private final PageHandler<Void, Long> rmvRow = new RemoveRowHandler();
-
- /**
- *
- */
- private final class RemoveRowHandler extends PageHandler<Void, Long> {
- @Override public Long run(
- int cacheId,
- long pageId,
- long page,
- long pageAddr,
- PageIO iox,
- Boolean walPlc,
- Void ignored,
- int itemId)
- throws IgniteCheckedException {
- DataPageIO io = (DataPageIO)iox;
-
- int oldFreeSpace = io.getFreeSpace(pageAddr);
-
- assert oldFreeSpace >= 0: oldFreeSpace;
-
- long nextLink = io.removeRow(pageAddr, itemId, pageSize());
-
- if (needWalDeltaRecord(pageId, page, walPlc))
- wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
-
- int newFreeSpace = io.getFreeSpace(pageAddr);
-
- if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
- int newBucket = bucket(newFreeSpace, false);
-
- if (oldFreeSpace > MIN_PAGE_FREE_SPACE) {
- int oldBucket = bucket(oldFreeSpace, false);
-
- if (oldBucket != newBucket) {
- // It is possible that page was concurrently taken for put, in this case put will handle bucket change.
- if (removeDataPage(pageId, page, pageAddr, io, oldBucket))
- put(null, pageId, page, pageAddr, newBucket);
- }
- }
- else
- put(null, pageId, page, pageAddr, newBucket);
-
- if (io.isEmpty(pageAddr))
- evictionTracker.forgetPage(pageId);
- }
-
- // For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
- return nextLink;
- }
- }
-
- /**
- * @param cacheId Cache ID.
- * @param name Name (for debug purpose).
- * @param memMetrics Memory metrics.
- * @param memPlc Memory policy.
- * @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
- * @param wal Write ahead log manager.
- * @param metaPageId Metadata page ID.
- * @param initNew {@code True} if new metadata should be initialized.
- * @throws IgniteCheckedException If failed.
- */
- public FreeListImpl(
- int cacheId,
- String name,
- MemoryMetricsImpl memMetrics,
- MemoryPolicy memPlc,
- ReuseList reuseList,
- IgniteWriteAheadLogManager wal,
- long metaPageId,
- boolean initNew) throws IgniteCheckedException {
- super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId);
- this.evictionTracker = memPlc.evictionTracker();
- this.reuseList = reuseList == null ? this : reuseList;
- int pageSize = pageMem.pageSize();
-
- assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
- assert U.isPow2(BUCKETS);
- assert BUCKETS <= pageSize : pageSize;
-
- // TODO this constant is used because currently we cannot reuse data pages as index pages
- // TODO and vice-versa. It should be removed when data storage format is finalized.
- MIN_SIZE_FOR_DATA_PAGE = pageSize - DataPageIO.MIN_DATA_PAGE_OVERHEAD;
-
- int shift = 0;
-
- while (pageSize > BUCKETS) {
- shift++;
- pageSize >>>= 1;
- }
-
- this.shift = shift;
-
- this.memMetrics = memMetrics;
-
- emptyDataPagesBucket = bucket(MIN_SIZE_FOR_DATA_PAGE, false);
-
- init(metaPageId, initNew);
- }
-
- /**
- * Calculates average fill factor over FreeListImpl instance.
- *
- * @return Tuple (numenator, denominator).
- */
- public T2<Long, Long> fillFactor() {
- long pageSize = pageSize();
-
- long totalSize = 0;
- long loadSize = 0;
-
- for (int b = BUCKETS - 2; b > 0; b--) {
- long bsize = pageSize - ((REUSE_BUCKET - b) << shift);
-
- long pages = bucketsSize[b].longValue();
-
- loadSize += pages * (pageSize - bsize);
-
- totalSize += pages * pageSize;
- }
-
- return totalSize == 0 ? new T2<>(0L, 0L) : new T2<>(loadSize, totalSize);
- }
-
- /** {@inheritDoc} */
- @Override public void dumpStatistics(IgniteLogger log) {
- long dataPages = 0;
-
- final boolean dumpBucketsInfo = false;
-
- for (int b = 0; b < BUCKETS; b++) {
- long size = bucketsSize[b].longValue();
-
- if (!isReuseBucket(b))
- dataPages += size;
-
- if (dumpBucketsInfo) {
- Stripe[] stripes = getBucket(b);
-
- boolean empty = true;
-
- if (stripes != null) {
- for (Stripe stripe : stripes) {
- if (!stripe.empty) {
- empty = false;
-
- break;
- }
- }
- }
-
- if (log.isInfoEnabled())
- log.info("Bucket [b=" + b +
- ", size=" + size +
- ", stripes=" + (stripes != null ? stripes.length : 0) +
- ", stripesEmpty=" + empty + ']');
- }
- }
-
- if (dataPages > 0) {
- if (log.isInfoEnabled())
- log.info("FreeList [name=" + name +
- ", buckets=" + BUCKETS +
- ", dataPages=" + dataPages +
- ", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
- }
- }
-
- /**
- * @param freeSpace Page free space.
- * @param allowReuse {@code True} if it is allowed to get reuse bucket.
- * @return Bucket.
- */
- private int bucket(int freeSpace, boolean allowReuse) {
- assert freeSpace > 0 : freeSpace;
-
- int bucket = freeSpace >>> shift;
-
- assert bucket >= 0 && bucket < BUCKETS : bucket;
-
- if (!allowReuse && isReuseBucket(bucket))
- bucket--;
-
- return bucket;
- }
-
- /**
- * @param part Partition.
- * @return Page ID.
- * @throws IgniteCheckedException If failed.
- */
- private long allocateDataPage(int part) throws IgniteCheckedException {
- assert part <= PageIdAllocator.MAX_PARTITION_ID;
- assert part != PageIdAllocator.INDEX_PARTITION;
-
- return pageMem.allocatePage(grpId, part, PageIdAllocator.FLAG_DATA);
- }
-
- /** {@inheritDoc} */
- @Override public void insertDataRow(CacheDataRow row) throws IgniteCheckedException {
- int rowSize = getRowSize(row, row.cacheId() != 0);
-
- int written = 0;
-
- do {
- if (written != 0)
- memMetrics.incrementLargeEntriesPages();
-
- int freeSpace = Math.min(MIN_SIZE_FOR_DATA_PAGE, rowSize - written);
-
- long pageId = 0L;
-
- if (freeSpace == MIN_SIZE_FOR_DATA_PAGE)
- pageId = takeEmptyPage(emptyDataPagesBucket, DataPageIO.VERSIONS);
-
- boolean reuseBucket = false;
-
- // TODO: properly handle reuse bucket.
- if (pageId == 0L) {
- for (int b = bucket(freeSpace, false) + 1; b < BUCKETS - 1; b++) {
- pageId = takeEmptyPage(b, DataPageIO.VERSIONS);
-
- if (pageId != 0L) {
- reuseBucket = isReuseBucket(b);
-
- break;
- }
- }
- }
-
- boolean allocated = pageId == 0L;
-
- if (allocated)
- pageId = allocateDataPage(row.partition());
-
- DataPageIO init = reuseBucket || allocated ? DataPageIO.VERSIONS.latest() : null;
-
- written = write(pageId, writeRow, init, row, written, FAIL_I);
-
- assert written != FAIL_I; // We can't fail here.
- }
- while (written != COMPLETE);
- }
-
- /** {@inheritDoc} */
- @Override public boolean updateDataRow(long link, CacheDataRow row) throws IgniteCheckedException {
- assert link != 0;
-
- long pageId = PageIdUtils.pageId(link);
- int itemId = PageIdUtils.itemId(link);
-
- Boolean updated = write(pageId, updateRow, row, itemId, null);
-
- assert updated != null; // Can't fail here.
-
- return updated;
- }
-
- /** {@inheritDoc} */
- @Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
- assert link != 0;
-
- long pageId = PageIdUtils.pageId(link);
- int itemId = PageIdUtils.itemId(link);
-
- long nextLink = write(pageId, rmvRow, itemId, FAIL_L);
-
- assert nextLink != FAIL_L; // Can't fail here.
-
- while (nextLink != 0L) {
- memMetrics.decrementLargeEntriesPages();
-
- itemId = PageIdUtils.itemId(nextLink);
- pageId = PageIdUtils.pageId(nextLink);
-
- nextLink = write(pageId, rmvRow, itemId, FAIL_L);
-
- assert nextLink != FAIL_L; // Can't fail here.
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Stripe[] getBucket(int bucket) {
- return buckets.get(bucket);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
- return buckets.compareAndSet(bucket, exp, upd);
- }
-
- /** {@inheritDoc} */
- @Override protected boolean isReuseBucket(int bucket) {
- return bucket == REUSE_BUCKET;
- }
-
- /**
- * @return Number of empty data pages in free list.
- */
- public int emptyDataPages() {
- return bucketsSize[emptyDataPagesBucket].intValue();
- }
-
- /** {@inheritDoc} */
- @Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
- assert reuseList == this: "not allowed to be a reuse list";
-
- put(bag, 0, 0, 0L, REUSE_BUCKET);
- }
-
- /** {@inheritDoc} */
- @Override public long takeRecycledPage() throws IgniteCheckedException {
- assert reuseList == this: "not allowed to be a reuse list";
-
- return takeEmptyPage(REUSE_BUCKET, null);
- }
-
- /** {@inheritDoc} */
- @Override public long recycledPagesCount() throws IgniteCheckedException {
- assert reuseList == this: "not allowed to be a reuse list";
-
- return storedPagesCount(REUSE_BUCKET);
- }
-
- /**
- * @param row Row.
- * @param withCacheId If {@code true} adds cache ID size.
- * @return Entry size on page.
- * @throws IgniteCheckedException If failed.
- */
- public static int getRowSize(CacheDataRow row, boolean withCacheId) throws IgniteCheckedException {
- KeyCacheObject key = row.key();
- CacheObject val = row.value();
-
- int keyLen = key.valueBytesLength(null);
- int valLen = val.valueBytesLength(null);
-
- return keyLen + valLen + CacheVersionIO.size(row.version(), false) + 8 + (withCacheId ? 4 : 0);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return "FreeList [name=" + name + ']';
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
index 8a540a0..d79aa81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/PagesList.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.pagemem.wal.record.delta.PagesListSetPreviousR
import org.apache.ignite.internal.processors.cache.persistence.DataStructure;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListNodeIO;
-import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
@@ -698,7 +698,7 @@ public abstract class PagesList extends DataStructure {
if (needWalDeltaRecord(pageId, page, null))
wal.log(new PagesListAddPageRecord(grpId, pageId, dataId));
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataAddr);
+ AbstractDataPageIO dataIO = PageIO.getPageIO(dataAddr);
dataIO.setFreeListPageId(dataAddr, pageId);
if (needWalDeltaRecord(dataId, dataPage, null))
@@ -729,7 +729,7 @@ public abstract class PagesList extends DataStructure {
final long dataAddr,
int bucket
) throws IgniteCheckedException {
- DataPageIO dataIO = DataPageIO.VERSIONS.forPage(dataAddr);
+ AbstractDataPageIO dataIO = PageIO.getPageIO(dataAddr);
// Attempt to add page failed: the node page is full.
if (isReuseBucket(bucket)) {
@@ -1152,7 +1152,7 @@ public abstract class PagesList extends DataStructure {
final long dataId,
final long dataPage,
final long dataAddr,
- DataPageIO dataIO,
+ AbstractDataPageIO dataIO,
int bucket)
throws IgniteCheckedException {
final long pageId = dataIO.getFreeListPageId(dataAddr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
new file mode 100644
index 0000000..cb588f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetaStorage.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
+import org.apache.ignite.internal.processors.cache.IncompleteObject;
+import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.MemoryPolicy;
+import org.apache.ignite.internal.processors.cache.persistence.RootPage;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.SimpleDataPageIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
+import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+
+/**
+ * General purpose key-value local-only storage.
+ */
+public class MetaStorage implements DbCheckpointListener {
+ /** */
+ public static final String METASTORAGE_CACHE_NAME = "MetaStorage";
+ /** */
+ public static final int METASTORAGE_CACHE_ID = CU.cacheId(METASTORAGE_CACHE_NAME);
+
+ /** */
+ private final IgniteWriteAheadLogManager wal;
+ /** */
+ private final MemoryPolicy memPlc;
+ /** */
+ private MetastorageTree tree;
+ /** */
+ private AtomicLong rmvId = new AtomicLong();
+ /** */
+ private MemoryMetricsImpl memMetrics;
+ /** */
+ private boolean readOnly;
+ /** */
+ private RootPage treeRoot;
+ /** */
+ private RootPage reuseListRoot;
+ /** */
+ private FreeListImpl freeList;
+
+ /** */
+ public MetaStorage(IgniteWriteAheadLogManager wal, MemoryPolicy memPlc, MemoryMetricsImpl memMetrics,
+ boolean readOnly) {
+ this.wal = wal;
+ this.memPlc = memPlc;
+ this.memMetrics = memMetrics;
+ this.readOnly = readOnly;
+ }
+
+ /** */
+ public MetaStorage(IgniteWriteAheadLogManager wal, MemoryPolicy memPlc, MemoryMetricsImpl memMetrics) {
+ this(wal, memPlc, memMetrics, false);
+ }
+
+ /** */
+ public void init(IgniteCacheDatabaseSharedManager db) throws IgniteCheckedException {
+ getOrAllocateMetas();
+
+ freeList = new FreeListImpl(METASTORAGE_CACHE_ID, "metastorage",
+ memMetrics, memPlc, null, wal, reuseListRoot.pageId().pageId(),
+ reuseListRoot.isAllocated());
+
+ MetastorageRowStore rowStore = new MetastorageRowStore(freeList, db);
+
+ tree = new MetastorageTree(METASTORAGE_CACHE_ID, memPlc.pageMemory(), wal, rmvId,
+ freeList, rowStore, treeRoot.pageId().pageId(), treeRoot.isAllocated());
+
+ ((GridCacheDatabaseSharedManager)db).addCheckpointListener(this);
+ }
+
+ /** */
+ public void putData(String key, byte[] data) throws IgniteCheckedException {
+ if (!readOnly) {
+ synchronized (this) {
+ MetastorageDataRow oldRow = tree.findOne(new MetastorageDataRow(key, null));
+
+ if (oldRow != null) {
+ tree.removex(oldRow);
+ tree.rowStore().removeRow(oldRow.link());
+ }
+
+ MetastorageDataRow row = new MetastorageDataRow(key, data);
+ tree.rowStore().addRow(row);
+ tree.put(row);
+ }
+ }
+ }
+
+ /** */
+ public MetastorageDataRow getData(String key) throws IgniteCheckedException {
+ MetastorageDataRow row = tree.findOne(new MetastorageDataRow(key, null));
+
+ return row;
+ }
+
+ /** */
+ public void removeData(String key) throws IgniteCheckedException {
+ if (!readOnly)
+ synchronized (this) {
+ MetastorageDataRow row = new MetastorageDataRow(key, null);
+ MetastorageDataRow oldRow = tree.findOne(row);
+
+ if (oldRow != null) {
+ tree.removex(oldRow);
+ tree.rowStore().removeRow(oldRow.link());
+ }
+ }
+ }
+
+ /** */
+ private void getOrAllocateMetas() throws IgniteCheckedException {
+ PageMemoryEx pageMem = (PageMemoryEx)memPlc.pageMemory();
+
+ int partId = 0;
+
+ long partMetaId = pageMem.partitionMetaPageId(METASTORAGE_CACHE_ID, partId);
+ long partMetaPage = pageMem.acquirePage(METASTORAGE_CACHE_ID, partMetaId);
+ try {
+ boolean allocated = false;
+ long pageAddr = pageMem.writeLock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
+
+ try {
+ long treeRoot, reuseListRoot;
+
+ if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
+ // Initialize new page.
+ if (readOnly)
+ throw new IgniteCheckedException("metastorage is not initialized");
+
+ PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest();
+
+ io.initNewPage(pageAddr, partMetaId, pageMem.pageSize());
+
+ treeRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
+ reuseListRoot = pageMem.allocatePage(METASTORAGE_CACHE_ID, partId, PageMemory.FLAG_DATA);
+
+ assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
+ assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
+
+ io.setTreeRoot(pageAddr, treeRoot);
+ io.setReuseListRoot(pageAddr, reuseListRoot);
+
+ if (PageHandler.isWalDeltaRecordNeeded(pageMem, METASTORAGE_CACHE_ID, partMetaId, partMetaPage, wal, null))
+ wal.log(new MetaPageInitRecord(
+ METASTORAGE_CACHE_ID,
+ partMetaId,
+ io.getType(),
+ io.getVersion(),
+ treeRoot,
+ reuseListRoot
+ ));
+
+ allocated = true;
+ }
+ else {
+ PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
+
+ treeRoot = io.getTreeRoot(pageAddr);
+ reuseListRoot = io.getReuseListRoot(pageAddr);
+
+ assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA :
+ U.hexLong(treeRoot) + ", part=" + partId + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID;
+ assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA :
+ U.hexLong(reuseListRoot) + ", part=" + partId + ", METASTORAGE_CACHE_ID=" + METASTORAGE_CACHE_ID;
+ }
+
+ this.treeRoot = new RootPage(new FullPageId(treeRoot, METASTORAGE_CACHE_ID), allocated);
+ this.reuseListRoot = new RootPage(new FullPageId(reuseListRoot, METASTORAGE_CACHE_ID), allocated);
+ }
+ finally {
+ pageMem.writeUnlock(METASTORAGE_CACHE_ID, partMetaId, partMetaPage, null, allocated);
+ }
+ }
+ finally {
+ pageMem.releasePage(METASTORAGE_CACHE_ID, partMetaId, partMetaPage);
+ }
+ }
+
+ /**
+ * @return Page memory.
+ */
+ public PageMemory pageMemory() {
+ return memPlc.pageMemory();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
+ freeList.saveMetadata();
+
+ MetastorageRowStore rowStore = tree.rowStore();
+
+ saveStoreMetadata(rowStore, ctx);
+ }
+
+ /**
+ * @param rowStore Store to save metadata.
+ * @throws IgniteCheckedException If failed.
+ */
+ private void saveStoreMetadata(MetastorageRowStore rowStore, Context ctx) throws IgniteCheckedException {
+ FreeListImpl freeList = (FreeListImpl)rowStore.freeList();
+
+ freeList.saveMetadata();
+ }
+
+ /** */
+ public static class FreeListImpl extends AbstractFreeList<MetastorageDataRow> {
+ /** {@inheritDoc} */
+ FreeListImpl(int cacheId, String name, MemoryMetricsImpl memMetrics, MemoryPolicy memPlc,
+ ReuseList reuseList,
+ IgniteWriteAheadLogManager wal, long metaPageId, boolean initNew) throws IgniteCheckedException {
+ super(cacheId, name, memMetrics, memPlc, reuseList, wal, metaPageId, initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IOVersions<? extends AbstractDataPageIO<MetastorageDataRow>> ioVersions() {
+ return SimpleDataPageIO.VERSIONS;
+ }
+
+ /**
+ * Read row from data pages.
+ */
+ final MetastorageDataRow readRow(String key, long link)
+ throws IgniteCheckedException {
+ assert link != 0 : "link";
+
+ long nextLink = link;
+ IncompleteObject incomplete = null;
+ int size = 0;
+
+ boolean first = true;
+
+ do {
+ final long pageId = pageId(nextLink);
+
+ final long page = pageMem.acquirePage(grpId, pageId);
+
+ try {
+ long pageAddr = pageMem.readLock(grpId, pageId, page); // Non-empty data page must not be recycled.
+
+ assert pageAddr != 0L : nextLink;
+
+ try {
+ SimpleDataPageIO io = (SimpleDataPageIO)ioVersions().forPage(pageAddr);
+
+ DataPagePayload data = io.readPayload(pageAddr, itemId(nextLink), pageMem.pageSize());
+
+ nextLink = data.nextLink();
+
+ if (first) {
+ if (nextLink == 0) {
+ // Fast path for a single page row.
+ return new MetastorageDataRow(link, key, SimpleDataPageIO.readPayload(pageAddr + data.offset()));
+ }
+
+ first = false;
+ }
+
+ ByteBuffer buf = pageMem.pageBuffer(pageAddr);
+
+ buf.position(data.offset());
+ buf.limit(data.offset() + data.payloadSize());
+
+ if (size == 0) {
+ if (buf.remaining() >= 2 && incomplete == null) {
+ // Just read size.
+ size = buf.getShort();
+ incomplete = new IncompleteObject(new byte[size]);
+ }
+ else {
+ if (incomplete == null)
+ incomplete = new IncompleteObject(new byte[2]);
+
+ incomplete.readData(buf);
+
+ if (incomplete.isReady()) {
+ size = ByteBuffer.wrap(incomplete.data()).order(buf.order()).getShort();
+ incomplete = new IncompleteObject(new byte[size]);
+ }
+ }
+ }
+
+ if (size != 0 && buf.remaining() > 0)
+ incomplete.readData(buf);
+ }
+ finally {
+ pageMem.readUnlock(grpId, pageId, page);
+ }
+ }
+ finally {
+ pageMem.releasePage(grpId, pageId, page);
+ }
+ }
+ while (nextLink != 0);
+
+ assert incomplete.isReady();
+
+ return new MetastorageDataRow(link, key, incomplete.data());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
new file mode 100644
index 0000000..dde30d7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+import org.apache.ignite.internal.processors.cache.persistence.Storable;
+
+/**
+ *
+ */
+public class MetastorageDataRow implements MetastorageSearchRow, Storable {
+
+ /* **/
+ private long link;
+
+ /* **/
+ private String key;
+
+ /* **/
+ private byte[] value;
+
+ /* **/
+ public MetastorageDataRow(long link, String key, byte[] value) {
+ this.link = link;
+ this.key = key;
+ this.value = value;
+ }
+
+ /* **/
+ public MetastorageDataRow(String key, byte[] value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ /**
+ * @return Key.
+ */
+ public String key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hash() {
+ return key.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int partition() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void link(long link) {
+ this.link = link;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link() {
+ return link;
+ }
+
+ /**
+ * @return Value.
+ */
+ public byte[] value() {
+ return value;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return "key=" + key;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStore.java
new file mode 100644
index 0000000..0806b30
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageRowStore.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
+
+/**
+ *
+ */
+public class MetastorageRowStore {
+
+ /** */
+ private final FreeList freeList;
+
+ /** */
+ protected final IgniteCacheDatabaseSharedManager db;
+
+ /** */
+ public MetastorageRowStore(FreeList freeList, IgniteCacheDatabaseSharedManager db) {
+ this.freeList = freeList;
+ this.db = db;
+ }
+
+ /**
+ * @param link Row link.
+ * @return Data row.
+ */
+ public MetastorageDataRow dataRow(String key, long link) throws IgniteCheckedException {
+ return ((MetaStorage.FreeListImpl)freeList).readRow(key, link);
+ }
+
+ /**
+ * @param link Row link.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void removeRow(long link) throws IgniteCheckedException {
+ assert link != 0;
+ db.checkpointReadLock();
+
+ try {
+ freeList.removeDataRowByLink(link);
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * @param row Row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void addRow(MetastorageDataRow row) throws IgniteCheckedException {
+ db.checkpointReadLock();
+
+ try {
+ freeList.insertDataRow(row);
+ }
+ finally {
+ db.checkpointReadUnlock();
+ }
+ }
+
+ /**
+ * @param link Row link.
+ * @param row New row data.
+ * @return {@code True} if was able to update row.
+ * @throws IgniteCheckedException If failed.
+ */
+ public boolean updateRow(long link, MetastorageDataRow row) throws IgniteCheckedException {
+ return freeList.updateDataRow(link, row);
+ }
+
+ /**
+ * @return Free list.
+ */
+ public FreeList freeList() {
+ return freeList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageSearchRow.java
new file mode 100644
index 0000000..601fbc1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageSearchRow.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+/**
+ *
+ */
+public interface MetastorageSearchRow {
+ /**
+ * @return Key.
+ */
+ public String key();
+
+ /**
+ * @return Link for this row.
+ */
+ public long link();
+
+ /**
+ * @return Key hash code.
+ */
+ public int hash();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
new file mode 100644
index 0000000..445522b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+
+/**
+ *
+ */
+public class MetastorageTree extends BPlusTree<MetastorageSearchRow, MetastorageDataRow> {
+ /** Max key length (bytes num) */
+ public static final int MAX_KEY_LEN = 64;
+
+ /** */
+ private MetastorageRowStore rowStore;
+
+ /**
+ * @param pageMem
+ * @param wal
+ * @param globalRmvId
+ * @param metaPageId
+ * @param reuseList
+ * @throws IgniteCheckedException
+ */
+ public MetastorageTree(int cacheId,
+ PageMemory pageMem,
+ IgniteWriteAheadLogManager wal,
+ AtomicLong globalRmvId,
+ ReuseList reuseList,
+ MetastorageRowStore rowStore,
+ long metaPageId,
+ boolean initNew) throws IgniteCheckedException {
+ super("Metastorage", cacheId, pageMem, wal,
+ globalRmvId, metaPageId, reuseList, MetastorageInnerIO.VERSIONS, MetastoreLeafIO.VERSIONS);
+
+ this.rowStore = rowStore;
+
+ initTree(initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int compare(BPlusIO<MetastorageSearchRow> io, long pageAddr, int idx,
+ MetastorageSearchRow row) throws IgniteCheckedException {
+
+ String key = ((DataLinkIO)io).getKey(pageAddr, idx);
+
+ return key.compareTo(row.key());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected MetastorageDataRow getRow(BPlusIO<MetastorageSearchRow> io, long pageAddr, int idx,
+ Object x) throws IgniteCheckedException {
+ long link = ((DataLinkIO)io).getLink(pageAddr, idx);
+ String key = ((DataLinkIO)io).getKey(pageAddr, idx);
+
+ return rowStore.dataRow(key, link);
+ }
+
+ /**
+ * @return RowStore.
+ */
+ public MetastorageRowStore rowStore() {
+ return rowStore;
+ }
+
+ /**
+ *
+ */
+ private interface DataLinkIO {
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Row link.
+ */
+ public long getLink(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Key size in bytes.
+ */
+ public short getKeySize(long pageAddr, int idx);
+
+ /**
+ * @param pageAddr Page address.
+ * @param idx Index.
+ * @return Key.
+ */
+ public String getKey(long pageAddr, int idx);
+ }
+
+ /**
+ *
+ */
+ public static class MetastorageInnerIO extends BPlusInnerIO<MetastorageSearchRow> implements DataLinkIO {
+ /** */
+ public static final IOVersions<MetastorageInnerIO> VERSIONS = new IOVersions<>(
+ new MetastorageInnerIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ MetastorageInnerIO(int ver) {
+ super(T_DATA_REF_METASTORAGE_INNER, ver, true, 10 + MAX_KEY_LEN);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off,
+ MetastorageSearchRow row) throws IgniteCheckedException {
+ assert row.link() != 0;
+
+ PageUtils.putLong(pageAddr, off, row.link());
+
+ byte[] bytes = row.key().getBytes();
+ assert bytes.length <= MAX_KEY_LEN;
+
+ PageUtils.putShort(pageAddr, off + 8, (short)bytes.length);
+ PageUtils.putBytes(pageAddr, off + 10, bytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<MetastorageSearchRow> srcIo, long srcPageAddr,
+ int srcIdx) throws IgniteCheckedException {
+ int srcOff = srcIo.offset(srcIdx);
+ int dstOff = offset(dstIdx);
+
+ long link = ((DataLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
+ short len = ((DataLinkIO)srcIo).getKeySize(srcPageAddr, srcIdx);
+
+ byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff + 10, len);
+
+ PageUtils.putLong(dstPageAddr, dstOff, link);
+ PageUtils.putShort(dstPageAddr, dstOff + 8, len);
+ PageUtils.putBytes(dstPageAddr, dstOff + 10, payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MetastorageSearchRow getLookupRow(BPlusTree<MetastorageSearchRow, ?> tree, long pageAddr,
+ int idx) throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+ String key = getKey(pageAddr, idx);
+
+ return new MetsatorageSearchRowImpl(key, link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return PageUtils.getLong(pageAddr, offset(idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public short getKeySize(long pageAddr, int idx) {
+ return PageUtils.getShort(pageAddr, offset(idx) + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getKey(long pageAddr, int idx) {
+ int len = PageUtils.getShort(pageAddr, offset(idx) + 8);
+ byte[] bytes = PageUtils.getBytes(pageAddr, offset(idx) + 10, len);
+ return new String(bytes);
+ }
+ }
+
+ /**
+ *
+ */
+ public static class MetastoreLeafIO extends BPlusLeafIO<MetastorageSearchRow> implements DataLinkIO {
+ /** */
+ public static final IOVersions<MetastoreLeafIO> VERSIONS = new IOVersions<>(
+ new MetastoreLeafIO(1)
+ );
+
+ /**
+ * @param ver Page format version.
+ */
+ MetastoreLeafIO(int ver) {
+ super(T_DATA_REF_METASTORAGE_LEAF, ver, 10 + MAX_KEY_LEN);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off,
+ MetastorageSearchRow row) throws IgniteCheckedException {
+ assert row.link() != 0;
+
+ PageUtils.putLong(pageAddr, off, row.link());
+ byte[] bytes = row.key().getBytes();
+
+ assert bytes.length <= MAX_KEY_LEN;
+
+ PageUtils.putShort(pageAddr, off + 8, (short)bytes.length);
+ PageUtils.putBytes(pageAddr, off + 10, bytes);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<MetastorageSearchRow> srcIo, long srcPageAddr,
+ int srcIdx) throws IgniteCheckedException {
+ int srcOff = srcIo.offset(srcIdx);
+ int dstOff = offset(dstIdx);
+
+ long link = ((DataLinkIO)srcIo).getLink(srcPageAddr, srcIdx);
+ short len = ((DataLinkIO)srcIo).getKeySize(srcPageAddr, srcIdx);
+
+ byte[] payload = PageUtils.getBytes(srcPageAddr, srcOff + 10, len);
+
+ PageUtils.putLong(dstPageAddr, dstOff, link);
+ PageUtils.putShort(dstPageAddr, dstOff + 8, len);
+ PageUtils.putBytes(dstPageAddr, dstOff + 10, payload);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MetastorageSearchRow getLookupRow(BPlusTree<MetastorageSearchRow, ?> tree, long pageAddr,
+ int idx) throws IgniteCheckedException {
+ long link = getLink(pageAddr, idx);
+ String key = getKey(pageAddr, idx);
+
+ return new MetsatorageSearchRowImpl(key, link);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getLink(long pageAddr, int idx) {
+ assert idx < getCount(pageAddr) : idx;
+
+ return PageUtils.getLong(pageAddr, offset(idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public short getKeySize(long pageAddr, int idx) {
+ return PageUtils.getShort(pageAddr, offset(idx) + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getKey(long pageAddr, int idx) {
+ int len = PageUtils.getShort(pageAddr, offset(idx) + 8);
+ byte[] bytes = PageUtils.getBytes(pageAddr, offset(idx) + 10, len);
+ return new String(bytes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61a4de80/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetsatorageSearchRowImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetsatorageSearchRowImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetsatorageSearchRowImpl.java
new file mode 100644
index 0000000..363c14d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetsatorageSearchRowImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+
+/**
+ *
+ */
+public class MetsatorageSearchRowImpl implements MetastorageSearchRow {
+ /** */
+ private final String key;
+ /** */
+ private final long link;
+
+ /**
+ * @param key Key.
+ * @param link Link.
+ */
+ public MetsatorageSearchRowImpl(String key, long link) {
+ this.key = key;
+ this.link = link;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String key() {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long link() {
+ return link;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int hash() {
+ return key.hashCode();
+ }
+}