You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/08/30 12:24:32 UTC
[26/45] ignite git commit: IGNITE-4191: MVCC and transactional SQL
support. Joint multi-man-years efforts of Semen Boikov, Igor Seliverstov,
Alexander Paschenko, Igor Sapego, Sergey Kalashnikov, Roman Kondakov,
Pavel Kuznetsov, Ivan Pavlukhin, Andrey Mas
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogInnerIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogInnerIO.java
new file mode 100644
index 0000000..95c10ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogInnerIO.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusInnerIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/** */
+public class TxLogInnerIO extends BPlusInnerIO<TxKey> implements TxLogIO {
+ /** */
+ public static final IOVersions<TxLogInnerIO> VERSIONS = new IOVersions<>(new TxLogInnerIO(1));
+
+ /**
+ * @param ver Page format version.
+ */
+ protected TxLogInnerIO(int ver) {
+ super(T_TX_LOG_INNER, ver, true, 8 + 8 + 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, TxKey row) {
+ TxRow row0 = (TxRow)row;
+
+ setMajor(pageAddr, off, row0.major());
+ setMinor(pageAddr, off, row0.minor());
+ setState(pageAddr, off, row0.state());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<TxKey> srcIo, long srcPageAddr, int srcIdx) {
+ TxLogIO srcIo0 = (TxLogIO)srcIo;
+
+ int srcOff = srcIo.offset(srcIdx);
+ int dstOff = offset(dstIdx);
+
+ setMajor(dstPageAddr, dstOff, srcIo0.getMajor(srcPageAddr, srcOff));
+ setMinor(dstPageAddr, dstOff, srcIo0.getMinor(srcPageAddr, srcOff));
+ setState(dstPageAddr, dstOff, srcIo0.getState(srcPageAddr, srcOff));
+ }
+
+ /** {@inheritDoc} */
+ @Override public TxKey getLookupRow(BPlusTree<TxKey, ?> tree, long pageAddr, int idx) {
+ int off = offset(idx);
+
+ return new TxRow(
+ getMajor(pageAddr, off),
+ getMinor(pageAddr, off),
+ getState(pageAddr, off));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(long pageAddr, int off, TxKey row) {
+ int cmp = Long.compare(PageUtils.getLong(pageAddr, off), row.major());
+
+ return cmp != 0 ? cmp : Long.compare(getMinor(pageAddr, off), row.minor());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMajor(long pageAddr, int off) {
+ return PageUtils.getLong(pageAddr, off);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMajor(long pageAddr, int off, long major) {
+ PageUtils.putLong(pageAddr, off, major);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMinor(long pageAddr, int off) {
+ return PageUtils.getLong(pageAddr, off + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMinor(long pageAddr, int off, long minor) {
+ PageUtils.putLong(pageAddr, off + 8, minor);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte getState(long pageAddr, int off) {
+ return PageUtils.getByte(pageAddr, off + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setState(long pageAddr, int off, byte state) {
+ PageUtils.putByte(pageAddr, off + 16, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogLeafIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogLeafIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogLeafIO.java
new file mode 100644
index 0000000..e037fbe
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogLeafIO.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusLeafIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
+
+/** */
+public class TxLogLeafIO extends BPlusLeafIO<TxKey> implements TxLogIO {
+ /** */
+ public static final IOVersions<TxLogLeafIO> VERSIONS = new IOVersions<>(new TxLogLeafIO(1));
+
+ /**
+ * @param ver Page format version.
+ */
+ protected TxLogLeafIO(int ver) {
+ super(T_TX_LOG_LEAF, ver, 17);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void storeByOffset(long pageAddr, int off, TxKey row) {
+ TxRow row0 = (TxRow)row;
+
+ setMajor(pageAddr, off, row0.major());
+ setMinor(pageAddr, off, row0.minor());
+ setState(pageAddr, off, row0.state());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void store(long dstPageAddr, int dstIdx, BPlusIO<TxKey> srcIo, long srcPageAddr, int srcIdx) {
+ TxLogIO srcIo0 = (TxLogIO)srcIo;
+
+ int srcOff = srcIo.offset(srcIdx);
+ int dstOff = offset(dstIdx);
+
+ setMajor(dstPageAddr, dstOff, srcIo0.getMajor(srcPageAddr, srcOff));
+ setMinor(dstPageAddr, dstOff, srcIo0.getMinor(srcPageAddr, srcOff));
+ setState(dstPageAddr, dstOff, srcIo0.getState(srcPageAddr, srcOff));
+ }
+
+ /** {@inheritDoc} */
+ @Override public TxKey getLookupRow(BPlusTree<TxKey, ?> tree, long pageAddr, int idx) {
+ int off = offset(idx);
+
+ return new TxRow(
+ getMajor(pageAddr, off),
+ getMinor(pageAddr, off),
+ getState(pageAddr, off));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compare(long pageAddr, int off, TxKey row) {
+ int cmp = Long.compare(getMajor(pageAddr, off), row.major());
+
+ return cmp != 0 ? cmp : Long.compare(getMinor(pageAddr, off), row.minor());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMajor(long pageAddr, int off) {
+ return PageUtils.getLong(pageAddr, off);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMajor(long pageAddr, int off, long major) {
+ PageUtils.putLong(pageAddr, off, major);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long getMinor(long pageAddr, int off) {
+ return PageUtils.getLong(pageAddr, off + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setMinor(long pageAddr, int off, long minor) {
+ PageUtils.putLong(pageAddr, off + 8, minor);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte getState(long pageAddr, int off) {
+ return PageUtils.getByte(pageAddr, off + 16);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setState(long pageAddr, int off, byte state) {
+ PageUtils.putByte(pageAddr, off + 16, state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogTree.java
new file mode 100644
index 0000000..60fbc84
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxLogTree.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
+import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
+import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
+
+/**
+ *
+ */
+public class TxLogTree extends BPlusTree<TxKey, TxRow> {
+ /**
+ * @param pageMem Page memory.
+ * @param wal Write ahead log manager
+ * @param metaPageId Tree metapage id.
+ * @param reuseList Reuse list
+ * @param failureProcessor Failure processor.
+ * @param initNew {@code True} if new tree should be created.
+ * @throws IgniteCheckedException If fails.
+ */
+ public TxLogTree(PageMemory pageMem,
+ IgniteWriteAheadLogManager wal, long metaPageId,
+ ReuseList reuseList, FailureProcessor failureProcessor,
+ boolean initNew) throws IgniteCheckedException {
+ super(TxLog.TX_LOG_CACHE_NAME, TxLog.TX_LOG_CACHE_ID, pageMem, wal, new AtomicLong(), metaPageId,
+ reuseList, TxLogInnerIO.VERSIONS, TxLogLeafIO.VERSIONS, failureProcessor);
+
+ initTree(initNew);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int compare(BPlusIO<TxKey> io, long pageAddr, int idx, TxKey row) {
+ return ((TxLogIO)io).compare(pageAddr, io.offset(idx), row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public TxRow getRow(BPlusIO<TxKey> io, long pageAddr,
+ int idx, Object ignored) throws IgniteCheckedException {
+ return (TxRow) io.getLookupRow(this, pageAddr, idx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxRow.java
new file mode 100644
index 0000000..0d161c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxRow.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+/**
+ *
+ */
+public class TxRow extends TxKey {
+ /** */
+ private byte state;
+
+ /**
+ * @param major Major version.
+ * @param minor Minor version.
+ * @param state Transaction state.
+ */
+ TxRow(long major, long minor, byte state) {
+ super(major, minor);
+
+ this.state = state;
+ }
+
+ /**
+ * @return Transaction state.
+ */
+ public byte state() {
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxState.java
new file mode 100644
index 0000000..65a1f25
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/txlog/TxState.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.mvcc.txlog;
+
+/**
+ *
+ */
+public final class TxState {
+ /** */
+ public static final byte NA = 0x0;
+ /** */
+ public static final byte PREPARED = 0x1;
+ /** */
+ public static final byte ABORTED = 0x2;
+ /** */
+ public static final byte COMMITTED = 0x3;
+
+ /**
+ * Private constructor.
+ */
+ private TxState() {}
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
index 44f0a3f..64517ac 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRow.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.processors.cache.persistence;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
/**
* Cache data row.
*/
-public interface CacheDataRow extends CacheSearchRow, Storable {
+public interface CacheDataRow extends MvccUpdateVersionAware, CacheSearchRow, Storable {
/**
* @return Cache value.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
index 9f2e031..b8245df 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheDataRowAdapter.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.IncompleteObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
@@ -42,6 +43,10 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_CRD_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA;
+import static org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter.RowData.LINK_WITH_HEADER;
/**
* Cache data row adapter.
@@ -60,6 +65,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
protected CacheObject val;
/** */
+ @GridToStringInclude
protected long expireTime = -1;
/** */
@@ -153,6 +159,8 @@ public class CacheDataRowAdapter implements CacheDataRow {
nextLink = data.nextLink();
+ int hdrLen = 0;
+
if (first) {
if (nextLink == 0) {
// Fast path for a single page row.
@@ -162,12 +170,21 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
first = false;
+
+ // Assume that row header is always located entirely on the very first page.
+ hdrLen = readHeader(pageAddr, data.offset());
+
+ if (rowData == LINK_WITH_HEADER)
+ return;
}
ByteBuffer buf = pageMem.pageBuffer(pageAddr);
- buf.position(data.offset());
- buf.limit(data.offset() + data.payloadSize());
+ int off = data.offset() + hdrLen;
+ int payloadSize = data.payloadSize() - hdrLen;
+
+ buf.position(off);
+ buf.limit(off + payloadSize);
boolean keyOnly = rowData == RowData.KEY_ONLY;
@@ -190,6 +207,18 @@ public class CacheDataRowAdapter implements CacheDataRow {
}
/**
+ * Reads row header (i.e. MVCC info) which should be located on the very first page od data.
+ *
+ * @param addr Address.
+ * @param off Offset
+ * @return Number of bytes read.
+ */
+ protected int readHeader(long addr, int off) {
+ // No-op.
+ return 0;
+ }
+
+ /**
* @param sharedCtx Cache shared context.
* @param coctx Cache object context.
* @param buf Buffer.
@@ -199,7 +228,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @throws IgniteCheckedException If failed.
* @return Read object.
*/
- private IncompleteObject<?> readFragment(
+ protected IncompleteObject<?> readFragment(
GridCacheSharedContext<?, ?> sharedCtx,
CacheObjectContext coctx,
ByteBuffer buf,
@@ -268,7 +297,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param readCacheId {@code true} If need to read cache ID.
* @throws IgniteCheckedException If failed.
*/
- private void readFullRow(
+ protected void readFullRow(
GridCacheSharedContext<?, ?> sharedCtx,
CacheObjectContext coctx,
long addr,
@@ -277,6 +306,11 @@ public class CacheDataRowAdapter implements CacheDataRow {
throws IgniteCheckedException {
int off = 0;
+ off += readHeader(addr, off);
+
+ if (rowData == LINK_WITH_HEADER)
+ return;
+
if (readCacheId) {
cacheId = PageUtils.getInt(addr, off);
@@ -326,7 +360,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param buf Buffer.
* @param incomplete Incomplete.
*/
- private IncompleteObject<?> readIncompleteCacheId(
+ protected IncompleteObject<?> readIncompleteCacheId(
ByteBuffer buf,
IncompleteObject<?> incomplete
) {
@@ -371,7 +405,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @return Incomplete object.
* @throws IgniteCheckedException If failed.
*/
- private IncompleteCacheObject readIncompleteKey(
+ protected IncompleteCacheObject readIncompleteKey(
CacheObjectContext coctx,
ByteBuffer buf,
IncompleteCacheObject incomplete
@@ -396,7 +430,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @return Incomplete object.
* @throws IgniteCheckedException If failed.
*/
- private IncompleteCacheObject readIncompleteValue(
+ protected IncompleteCacheObject readIncompleteValue(
CacheObjectContext coctx,
ByteBuffer buf,
IncompleteCacheObject incomplete
@@ -419,7 +453,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @param incomplete Incomplete object.
* @return Incomplete object.
*/
- private IncompleteObject<?> readIncompleteExpireTime(
+ protected IncompleteObject<?> readIncompleteExpireTime(
ByteBuffer buf,
IncompleteObject<?> incomplete
) {
@@ -463,7 +497,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
* @return Incomplete object.
* @throws IgniteCheckedException If failed.
*/
- private IncompleteObject<?> readIncompleteVersion(
+ protected IncompleteObject<?> readIncompleteVersion(
ByteBuffer buf,
IncompleteObject<?> incomplete
) throws IgniteCheckedException {
@@ -573,6 +607,60 @@ public class CacheDataRowAdapter implements CacheDataRow {
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override public int size() throws IgniteCheckedException {
+ int len = key().valueBytesLength(null);
+
+ len += value().valueBytesLength(null) + CacheVersionIO.size(version(), false) + 8;
+
+ return len + (cacheId() != 0 ? 4 : 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int headerSize() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return MVCC_CRD_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int mvccOperationCounter() {
+ return MVCC_OP_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte mvccTxState() {
+ return TxState.NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCoordinatorVersion() {
+ return MVCC_CRD_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCounter() {
+ return MVCC_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int newMvccOperationCounter() {
+ return MVCC_OP_COUNTER_NA;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte newMvccTxState() {
+ return TxState.NA;
+ }
+
/**
*
*/
@@ -584,7 +672,13 @@ public class CacheDataRowAdapter implements CacheDataRow {
KEY_ONLY,
/** */
- NO_KEY
+ NO_KEY,
+
+ /** */
+ LINK_ONLY,
+
+ /** */
+ LINK_WITH_HEADER
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
index 1637eb0..c3cfb83 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/CacheSearchRow.java
@@ -18,11 +18,12 @@
package org.apache.ignite.internal.processors.cache.persistence;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
/**
*
*/
-public interface CacheSearchRow {
+public interface CacheSearchRow extends MvccVersionAware {
/**
* @return Cache key.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
new file mode 100644
index 0000000..f96cdd9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DatabaseLifecycleListener.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import org.apache.ignite.IgniteCheckedException;
+
+/**
+ *
+ */
+public interface DatabaseLifecycleListener {
+
+ /**
+ * @param mgr Database shared manager.
+ *
+ */
+ void onInitDataRegions(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+
+ /**
+ * @param mgr Page store manager.
+ *
+ */
+ void beforeMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+
+ /**
+ * @param mgr Database shared manager.
+ *
+ */
+ void afterMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+
+ /**
+ * @param mgr Database shared manager.
+ */
+ void afterInitialise(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException;
+
+ /**
+ * @param mgr Database shared manager.
+ */
+ void beforeStop(IgniteCacheDatabaseSharedManager mgr);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index bd8a62f..7d91645 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -111,6 +111,7 @@ import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
@@ -409,8 +410,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override protected void initDataRegions(DataStorageConfiguration memCfg) throws IgniteCheckedException {
- super.initDataRegions(memCfg);
+ @Override protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
+ super.initDataRegions0(memCfg);
addDataRegion(
memCfg,
@@ -812,6 +813,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
checkpointReadLock();
try {
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) {
+ lsnr.beforeMemoryRestore(this);
+ }
+
if (!F.isEmpty(cachesToStart)) {
for (DynamicCacheDescriptor desc : cachesToStart) {
if (CU.affinityNode(cctx.localNode(), desc.cacheConfiguration().getNodeFilter()))
@@ -851,6 +856,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
metaStorage.init(this);
notifyMetastorageReadyForReadWrite();
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) {
+ lsnr.afterMemoryRestore(this);
+ }
+
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
@@ -1351,7 +1361,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
- .rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
+ .rebuildIndexesFromHash(Collections.singleton(cacheCtx.cacheId()));
assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
@@ -2088,6 +2098,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
+ // TODO IGNITE-7792 add generic mapping.
+ if (grpId == TxLog.TX_LOG_CACHE_ID)
+ return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
+
// TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
@@ -2391,7 +2405,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
}
else
- updateState(part, (int)io.getPartitionState(pageAddr));
+ changed = updateState(part, (int)io.getPartitionState(pageAddr));
}
finally {
pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
@@ -3852,20 +3866,21 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
PageMemoryEx pageMem;
- if (grpId != MetaStorage.METASTORAGE_CACHE_ID) {
+ // TODO IGNITE-7792 add generic mapping.
+ if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
+ pageMem = (PageMemoryEx)metaStorage.pageMemory();
+ else if (grpId == TxLog.TX_LOG_CACHE_ID)
+ pageMem = (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
+ else {
CacheGroupContext grp = context().cache().cacheGroup(grpId);
- if (grp == null)
- continue;
+ DataRegion region = grp != null ?grp .dataRegion() : null;
- if (!grp.dataRegion().config().isPersistenceEnabled())
+ if (region == null || !region.config().isPersistenceEnabled())
continue;
- pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+ pageMem = (PageMemoryEx)region.pageMemory();
}
- else
- pageMem = (PageMemoryEx)metaStorage.pageMemory();
-
Integer tag = pageMem.getForCheckpoint(
fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 826f619..4c45352 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@@ -57,6 +59,8 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalP
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
@@ -76,8 +80,11 @@ import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
+import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -735,11 +742,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
- @Override @Nullable protected WALHistoricalIterator historicalIterator(
+ @Override @Nullable protected IgniteHistoricalIterator historicalIterator(
CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException {
if (partCntrs == null || partCntrs.isEmpty())
return null;
+ if (grp.mvccEnabled()) // TODO IGNITE-7384
+ return super.historicalIterator(partCntrs, missing);
+
GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database();
FileWALPointer minPtr = null;
@@ -1093,6 +1103,16 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public int size() throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int headerSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
@Override public long link() {
return 0;
}
@@ -1111,6 +1131,46 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
@Override public int cacheId() {
return entry.cacheId();
}
+
+ /** {@inheritDoc} */
+ @Override public long mvccCoordinatorVersion() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccCounter() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public int mvccOperationCounter() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCoordinatorVersion() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public long newMvccCounter() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public int newMvccOperationCounter() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte mvccTxState() {
+ return 0; // TODO IGNITE-7384
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte newMvccTxState() {
+ return 0; // TODO IGNITE-7384
+ }
}
/**
@@ -1505,6 +1565,30 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public long nextMvccUpdateCounter() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null ? 0 : delegate0.nextMvccUpdateCounter();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long mvccUpdateCounter() {
+ try {
+ CacheDataStore delegate0 = init0(true);
+
+ return delegate0 == null ? 0 : delegate0.mvccUpdateCounter();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) {
throw new IllegalStateException("Should be never called.");
}
@@ -1589,6 +1673,126 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public boolean mvccInitialValue(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer)
+ throws IgniteCheckedException
+ {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer, newMvccVer);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mvccInitialValueIfAbsent(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte txState,
+ byte newTxState)
+ throws IgniteCheckedException
+ {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccInitialValueIfAbsent(cctx, key, val, ver, expireTime, mvccVer, newMvccVer,
+ txState, newTxState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean mvccUpdateRowWithPreloadInfo(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ @Nullable CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccVersion mvccVer,
+ MvccVersion newMvccVer,
+ byte mvccTxState,
+ byte newMvccTxState) throws IgniteCheckedException {
+
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccUpdateRowWithPreloadInfo(cctx,
+ key,
+ val,
+ ver,
+ expireTime,
+ mvccVer,
+ newMvccVer,
+ mvccTxState,
+ newMvccTxState);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccUpdateResult mvccUpdate(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ CacheObject val,
+ GridCacheVersion ver,
+ long expireTime,
+ MvccSnapshot mvccVer,
+ boolean primary,
+ boolean needHistory,
+ boolean noCreate) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccUpdate(
+ cctx, key, val, ver, expireTime, mvccVer, primary, needHistory, noCreate);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccUpdateResult mvccRemove(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ MvccSnapshot mvccVer,
+ boolean primary,
+ boolean needHistory) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccRemove(cctx, key, mvccVer, primary, needHistory);
+ }
+
+ /** {@inheritDoc} */
+ @Override public MvccUpdateResult mvccLock(
+ GridCacheContext cctx,
+ KeyCacheObject key,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccLock(cctx, key, mvccSnapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccUpdateNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccUpdateNative(cctx, primary, key, val, ver, expireTime, mvccSnapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLongList mvccRemoveNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.mvccRemoveNative(cctx, primary, key, mvccSnapshot);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ delegate.mvccRemoveAll(cctx, key);
+ }
+
+ /** {@inheritDoc} */
@Override public CacheDataRow createRow(
GridCacheContext cctx,
KeyCacheObject key,
@@ -1604,6 +1808,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public int cleanup(GridCacheContext cctx,
+ @Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ return delegate.cleanup(cctx, cleanupRows);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void updateTxState(GridCacheContext cctx, CacheSearchRow row) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(false);
+
+ delegate.updateTxState(cctx, row);
+ }
+
+ /** {@inheritDoc} */
@Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
@@ -1634,6 +1853,40 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot snapshot)
+ throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.mvccFind(cctx, key, snapshot);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<IgniteBiTuple<Object, MvccVersion>> mvccFindAllVersions(GridCacheContext cctx, KeyCacheObject key)
+ throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.mvccFindAllVersions(cctx, key);
+
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx,
+ KeyCacheObject key, Object x) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.mvccAllVersionsCursor(cctx, key, x);
+
+ return EMPTY_CURSOR;
+ }
+
+
+ /** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
@@ -1644,6 +1897,27 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.cursor(x);
+
+ return EMPTY_CURSOR;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot)
+ throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.cursor(mvccSnapshot);
+
+ return EMPTY_CURSOR;
+ }
+
+ /** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(
int cacheId,
KeyCacheObject lower,
@@ -1671,6 +1945,21 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
+ KeyCacheObject lower,
+ KeyCacheObject upper,
+ Object x,
+ MvccSnapshot mvccSnapshot)
+ throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot);
+
+ return EMPTY_CURSOR;
+ }
+
+ /** {@inheritDoc} */
@Override public void destroy() throws IgniteCheckedException {
// No need to destroy delegate.
}
@@ -1686,6 +1975,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
}
/** {@inheritDoc} */
+ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
+ MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
+ CacheDataStore delegate = init0(true);
+
+ if (delegate != null)
+ return delegate.cursor(cacheId, mvccSnapshot);
+
+ return EMPTY_CURSOR;
+ }
+
+ /** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
CacheDataStore delegate0 = init0(true);
@@ -1855,7 +2155,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple
/**
*
*/
- private static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() {
+ public static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() {
/** {@inheritDoc} */
@Override public boolean next() {
return false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index f17e527..3ade265 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -223,6 +223,16 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
if (dataRegionsInitialized)
return;
+ initDataRegions0(memCfg);
+
+ dataRegionsInitialized = true;
+ }
+
+ /**
+ * @param memCfg Database config.
+ * @throws IgniteCheckedException If failed to initialize swap path.
+ */
+ protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
DataRegionConfiguration[] dataRegionCfgs = memCfg.getDataRegionConfigurations();
int dataRegions = dataRegionCfgs == null ? 0 : dataRegionCfgs.length;
@@ -251,8 +261,17 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
CU.isPersistenceEnabled(memCfg)
);
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) {
+ lsnr.onInitDataRegions(this);
+ }
+ }
- dataRegionsInitialized = true;
+ /**
+ * @param kctx Kernal context.
+ * @return Database lifecycle listeners.
+ */
+ protected List<DatabaseLifecycleListener> getDatabaseListeners(GridKernalContext kctx) {
+ return kctx.internalSubscriptionProcessor().getDatabaseListeners();
}
/**
@@ -260,7 +279,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
* @param dataRegionCfg Data region config.
* @throws IgniteCheckedException If failed to initialize swap path.
*/
- protected void addDataRegion(
+ public void addDataRegion(
DataStorageConfiguration dataStorageCfg,
DataRegionConfiguration dataRegionCfg,
boolean trackable
@@ -984,6 +1003,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
final DataRegionMetricsImpl memMetrics
) {
return new DirectMemoryProvider() {
+ /** */
private final DirectMemoryProvider memProvider = memoryProvider0;
@Override public void initialize(long[] chunkSizes) {
@@ -1039,10 +1059,18 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap
startMemoryPolicies();
initPageMemoryDataStructures(memCfg);
+
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(kctx)) {
+ lsnr.afterInitialise(this);
+ }
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
+ for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext())) {
+ lsnr.beforeStop(this);
+ }
+
if (dataRegionMap != null) {
for (DataRegion memPlc : dataRegionMap.values()) {
memPlc.pageMemory().stop();
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
index a4266fa..6248765 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IndexStorageImpl.java
@@ -220,7 +220,7 @@ public class IndexStorageImpl implements IndexStorage {
}
/** {@inheritDoc} */
- @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
+ @Override public IndexItem getRow(final BPlusIO<IndexItem> io, final long pageAddr,
final int idx, Object ignore) throws IgniteCheckedException {
return readRow(pageAddr, ((IndexIO)io).getOffset(pageAddr, idx));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
index ee7c255..6900b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
/**
@@ -100,6 +101,8 @@ public class RowStore {
try {
freeList.insertDataRow(row);
+
+ assert row.link() != 0L;
}
finally {
ctx.database().checkpointReadUnlock();
@@ -110,8 +113,8 @@ public class RowStore {
/**
* @param link Row link.
* @param row New row data.
- * @throws IgniteCheckedException If failed.
* @return {@code True} if was able to update row.
+ * @throws IgniteCheckedException If failed.
*/
public boolean updateRow(long link, CacheDataRow row) throws IgniteCheckedException {
assert !persistenceEnabled || ctx.database().checkpointLockIsHeldByThread();
@@ -123,6 +126,29 @@ public class RowStore {
}
/**
+ * Run page handler operation over the row.
+ *
+ * @param link Row link.
+ * @param pageHnd Page handler.
+ * @param arg Page handler argument.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <S, R> void updateDataRow(long link, PageHandler<S, R> pageHnd, S arg) throws IgniteCheckedException {
+ if (!persistenceEnabled)
+ freeList.updateDataRow(link, pageHnd, arg);
+ else {
+ ctx.database().checkpointReadLock();
+
+ try {
+ freeList.updateDataRow(link, pageHnd, arg);
+ }
+ finally {
+ ctx.database().checkpointReadUnlock();
+ }
+ }
+ }
+
+ /**
* @return Free list.
*/
public FreeList freeList() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
index ae200df..133f0a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/Storable.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.persistence;
+import org.apache.ignite.IgniteCheckedException;
+
/**
* Simple interface for data, store in some RowStore.
*/
@@ -35,4 +37,16 @@ public interface Storable {
* @return Partition.
*/
public int partition();
+
+ /**
+ * @return Row size in page.
+ * @throws IgniteCheckedException If failed.
+ */
+ public int size() throws IgniteCheckedException;
+
+ /**
+ * @return Row header size in page. Header is indivisible part of row
+ * which is entirely available on the very first page followed by the row link.
+ */
+ public int headerSize();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
index 933d28e..aa09241 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java
@@ -279,6 +279,23 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
}
/** {@inheritDoc} */
+ @Override public void initialize(int cacheId, int partitions, String workingDir, AllocatedPageTracker tracker)
+ throws IgniteCheckedException {
+ if (!idxCacheStores.containsKey(cacheId)) {
+ CacheStoreHolder holder = initDir(
+ new File(storeWorkDir, workingDir),
+ cacheId,
+ partitions,
+ tracker
+ );
+
+ CacheStoreHolder old = idxCacheStores.put(cacheId, holder);
+
+ assert old == null : "Non-null old store holder for cacheId: " + cacheId;
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void initializeForCache(CacheGroupDescriptor grpDesc, StoredCacheData cacheData) throws IgniteCheckedException {
int grpId = grpDesc.groupId();
@@ -298,9 +315,9 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
if (!idxCacheStores.containsKey(grpId)) {
CacheStoreHolder holder = initDir(
new File(storeWorkDir, META_STORAGE_NAME),
- grpId,
- 1,
- AllocatedPageTracker.NO_OP);
+ grpId,
+ 1,
+ AllocatedPageTracker.NO_OP );
CacheStoreHolder old = idxCacheStores.put(grpId, holder);
@@ -502,7 +519,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
* @param cacheWorkDir Work directory.
* @param grpId Group ID.
* @param partitions Number of partitions.
- * @param allocatedTracker Metrics updater
+ * @param allocatedTracker Metrics updater.
* @return Cache store holder.
* @throws IgniteCheckedException If failed.
*/
@@ -1012,7 +1029,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen
/**
*
*/
- public CacheStoreHolder(FilePageStore idxStore, FilePageStore[] partStores) {
+ CacheStoreHolder(FilePageStore idxStore, FilePageStore[] partStores) {
this.idxStore = idxStore;
this.partStores = partStores;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
index bcedd8c..4e1f783 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/AbstractFreeList.java
@@ -100,7 +100,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
- int rowSize = io.getRowSize(row);
+ int rowSize = row.size();
boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
@@ -146,7 +146,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
- int rowSize = io.getRowSize(row);
+ int rowSize = row.size();
int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace > 0 : oldFreeSpace;
@@ -466,7 +466,7 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
/** {@inheritDoc} */
@Override public void insertDataRow(T row) throws IgniteCheckedException {
- int rowSize = ioVersions().latest().getRowSize(row);
+ int rowSize = row.size();
int written = 0;
@@ -546,6 +546,20 @@ public abstract class AbstractFreeList<T extends Storable> extends PagesList imp
}
/** {@inheritDoc} */
+ @Override public <S, R> R updateDataRow(long link, PageHandler<S, R> pageHnd, S arg) throws IgniteCheckedException {
+ assert link != 0;
+
+ long pageId = PageIdUtils.pageId(link);
+ int itemId = PageIdUtils.itemId(link);
+
+ R updRes = write(pageId, pageHnd, arg, itemId, null);
+
+ assert updRes != null; // Can't fail here.
+
+ return updRes;
+ }
+
+ /** {@inheritDoc} */
@Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
assert link != 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
index bdca21c..e73124e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/freelist/FreeList.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.persistence.freelist;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.processors.cache.persistence.Storable;
+import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
/**
*/
@@ -40,6 +41,17 @@ public interface FreeList<T extends Storable> {
/**
* @param link Row link.
+ * @param pageHnd Page handler.
+ * @param arg Handler argument.
+ * @param <S> Argument type.
+ * @param <R> Result type.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public <S, R> R updateDataRow(long link, PageHandler<S, R> pageHnd, S arg) throws IgniteCheckedException;
+
+ /**
+ * @param link Row link.
* @throws IgniteCheckedException If failed.
*/
public void removeDataRowByLink(long link) throws IgniteCheckedException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
index 271efdf..95d8e81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageDataRow.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence.metastorage;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.persistence.Storable;
/**
@@ -65,6 +66,16 @@ public class MetastorageDataRow implements MetastorageSearchRow, Storable {
}
/** {@inheritDoc} */
+ @Override public int size() throws IgniteCheckedException {
+ return 4 + value().length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int headerSize() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
@Override
public void link(long link) {
this.link = link;
http://git-wip-us.apache.org/repos/asf/ignite/blob/00bb4d4d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
index 19a145f..00db5cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/MetastorageTree.java
@@ -79,7 +79,7 @@ public class MetastorageTree extends BPlusTree<MetastorageSearchRow, Metastorage
}
/** {@inheritDoc} */
- @Override protected MetastorageDataRow getRow(BPlusIO<MetastorageSearchRow> io, long pageAddr, int idx,
+ @Override public MetastorageDataRow getRow(BPlusIO<MetastorageSearchRow> io, long pageAddr, int idx,
Object x) throws IgniteCheckedException {
long link = ((DataLinkIO)io).getLink(pageAddr, idx);
String key = ((DataLinkIO)io).getKey(pageAddr, idx);