You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/24 12:19:22 UTC
ignite git commit: ignite-3477 wal record.
Repository: ignite
Updated Branches:
refs/heads/ignite-3477-1 fb48c50ff -> 5401b0fd6
ignite-3477 wal record.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5401b0fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5401b0fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5401b0fd
Branch: refs/heads/ignite-3477-1
Commit: 5401b0fd672bbb04071a497d09388db162eba2ce
Parents: fb48c50
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jan 24 15:19:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jan 24 15:19:13 2017 +0300
----------------------------------------------------------------------
.../internal/pagemem/wal/record/WALRecord.java | 5 +-
.../wal/record/delta/DataPageUpdateRecord.java | 79 ++++++++++++++++++++
.../cache/database/freelist/FreeListImpl.java | 44 ++++++++---
.../cache/database/tree/io/DataPageIO.java | 11 ++-
4 files changed, 125 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 959ad7d..f761f68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -154,7 +154,10 @@ public abstract class WALRecord {
PAGE_LIST_META_RESET_COUNT_RECORD,
/** Switch segment record. */
- SWITCH_SEGMENT_RECORD
+ SWITCH_SEGMENT_RECORD,
+
+ /** */
+ DATA_PAGE_UPDATE_RECORD
;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
new file mode 100644
index 0000000..65b7172
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
+
+/**
+ * Update existing record in data page.
+ */
+public class DataPageUpdateRecord extends PageDeltaRecord {
+ /** */
+ private int itemId;
+
+ /** */
+ private byte[] payload;
+
+ /**
+ * @param cacheId Cache ID.
+ * @param pageId Page ID.
+ * @param itemId Item ID.
+ * @param payload Record data.
+ */
+ public DataPageUpdateRecord(
+ int cacheId,
+ long pageId,
+ int itemId,
+ byte[] payload
+ ) {
+ super(cacheId, pageId);
+
+ this.payload = payload;
+ this.itemId = itemId;
+ }
+
+ /**
+ * @return Item ID.
+ */
+ public int itemId() {
+ return itemId;
+ }
+
+ /**
+ * @return Insert record payload.
+ */
+ public byte[] payload() {
+ return payload;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+ assert payload != null;
+
+ DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+
+ io.updateRow(pageAddr, itemId, pageMem.pageSize(), payload, null, 0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.DATA_PAGE_UPDATE_RECORD;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
index 833df6e..87d5e4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
@@ -77,7 +78,28 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
throws IgniteCheckedException {
DataPageIO io = (DataPageIO)iox;
- return io.updateRow(pageAddr, itemId, pageSize(), row, getRowSize(row));
+ int rowSize = getRowSize(row);
+
+ boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, getRowSize(row));
+
+ if (updated && isWalDeltaRecordNeeded(wal, page)) {
+ // TODO This record must contain only a reference to a logical WAL record with the actual data.
+ byte[] payload = new byte[rowSize];
+
+ DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
+
+ assert data.payloadSize() == rowSize;
+
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+ wal.log(new DataPageUpdateRecord(
+ cacheId,
+ page.id(),
+ itemId,
+ payload));
+ }
+
+ return updated;
}
};
@@ -112,7 +134,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
/**
* @param page Page.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param rowSize Row size.
@@ -121,22 +143,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
private int addRow(
Page page,
- long buf,
+ long pageAddr,
DataPageIO io,
CacheDataRow row,
int rowSize
) throws IgniteCheckedException {
- io.addRow(buf, row, rowSize, pageSize());
+ io.addRow(pageAddr, row, rowSize, pageSize());
if (isWalDeltaRecordNeeded(wal, page)) {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
- DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
+ DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
assert data.payloadSize() == rowSize;
- PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize);
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
wal.log(new DataPageInsertRecord(
cacheId,
@@ -149,7 +171,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
/**
* @param page Page.
- * @param buf Buffer.
+ * @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param written Written size.
@@ -159,7 +181,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
*/
private int addRowFragment(
Page page,
- long buf,
+ long pageAddr,
DataPageIO io,
CacheDataRow row,
int written,
@@ -168,7 +190,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
// Read last link before the fragment write, because it will be updated there.
long lastLink = row.link();
- int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize());
+ int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
assert payloadSize > 0 : payloadSize;
@@ -176,9 +198,9 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[payloadSize];
- DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
+ DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
- PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize);
+ PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index 08fa30e..fdb812f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.jetbrains.annotations.Nullable;
/**
* Data pages IO.
@@ -601,6 +602,7 @@ public class DataPageIO extends PageIO {
* @param pageAddr Page address.
* @param itemId Item ID.
* @param pageSize Page size.
+ * @param payload Row data.
* @param row Row.
* @param rowSize Row size.
* @return {@code True} if entry is not fragmented.
@@ -610,16 +612,21 @@ public class DataPageIO extends PageIO {
final long pageAddr,
int itemId,
int pageSize,
- CacheDataRow row,
+ @Nullable byte[] payload,
+ @Nullable CacheDataRow row,
final int rowSize) throws IgniteCheckedException {
assert checkIndex(itemId) : itemId;
+ assert row != null ^ payload != null;
final int dataOff = getDataOffset(pageAddr, itemId, pageSize);
if (isFragmented(pageAddr, dataOff))
return false;
- writeRowData(pageAddr, dataOff, rowSize, row, false);
+ if (row != null)
+ writeRowData(pageAddr, dataOff, rowSize, row, false);
+ else
+ writeRowData(pageAddr, dataOff, payload);
return true;
}