You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/01/24 12:19:22 UTC

ignite git commit: ignite-3477 wal record.

Repository: ignite
Updated Branches:
  refs/heads/ignite-3477-1 fb48c50ff -> 5401b0fd6


ignite-3477 wal record.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5401b0fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5401b0fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5401b0fd

Branch: refs/heads/ignite-3477-1
Commit: 5401b0fd672bbb04071a497d09388db162eba2ce
Parents: fb48c50
Author: sboikov <sb...@gridgain.com>
Authored: Tue Jan 24 15:19:13 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Jan 24 15:19:13 2017 +0300

----------------------------------------------------------------------
 .../internal/pagemem/wal/record/WALRecord.java  |  5 +-
 .../wal/record/delta/DataPageUpdateRecord.java  | 79 ++++++++++++++++++++
 .../cache/database/freelist/FreeListImpl.java   | 44 ++++++++---
 .../cache/database/tree/io/DataPageIO.java      | 11 ++-
 4 files changed, 125 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 959ad7d..f761f68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -154,7 +154,10 @@ public abstract class WALRecord {
         PAGE_LIST_META_RESET_COUNT_RECORD,
 
         /** Switch segment record. */
-        SWITCH_SEGMENT_RECORD
+        SWITCH_SEGMENT_RECORD,
+
+        /** */
+        DATA_PAGE_UPDATE_RECORD
         ;
 
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
new file mode 100644
index 0000000..65b7172
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/DataPageUpdateRecord.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record.delta;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
+
+/**
+ * Update existing record in data page.
+ */
+public class DataPageUpdateRecord extends PageDeltaRecord {
+    /** */
+    private int itemId;
+
+    /** */
+    private byte[] payload;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param pageId Page ID.
+     * @param itemId Item ID.
+     * @param payload Record data.
+     */
+    public DataPageUpdateRecord(
+        int cacheId,
+        long pageId,
+        int itemId,
+        byte[] payload
+    ) {
+        super(cacheId, pageId);
+
+        this.payload = payload;
+        this.itemId = itemId;
+    }
+
+    /**
+     * @return Item ID.
+     */
+    public int itemId() {
+        return itemId;
+    }
+
+    /**
+     * @return Insert record payload.
+     */
+    public byte[] payload() {
+        return payload;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException {
+        assert payload != null;
+
+        DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
+
+        io.updateRow(pageAddr, itemId, pageMem.pageSize(), payload, null, 0);
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.DATA_PAGE_UPDATE_RECORD;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
index 833df6e..87d5e4d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/freelist/FreeListImpl.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
+import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
@@ -77,7 +78,28 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
                 throws IgniteCheckedException {
                 DataPageIO io = (DataPageIO)iox;
 
-                return io.updateRow(pageAddr, itemId, pageSize(), row, getRowSize(row));
+                int rowSize = getRowSize(row);
+
+                boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, getRowSize(row));
+
+                if (updated && isWalDeltaRecordNeeded(wal, page)) {
+                    // TODO This record must contain only a reference to a logical WAL record with the actual data.
+                    byte[] payload = new byte[rowSize];
+
+                    DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
+
+                    assert data.payloadSize() == rowSize;
+
+                    PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
+
+                    wal.log(new DataPageUpdateRecord(
+                        cacheId,
+                        page.id(),
+                        itemId,
+                        payload));
+                }
+
+                return updated;
             }
         };
 
@@ -112,7 +134,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
 
             /**
              * @param page Page.
-             * @param buf Buffer.
+             * @param pageAddr Page address.
              * @param io IO.
              * @param row Row.
              * @param rowSize Row size.
@@ -121,22 +143,22 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
              */
             private int addRow(
                 Page page,
-                long buf,
+                long pageAddr,
                 DataPageIO io,
                 CacheDataRow row,
                 int rowSize
             ) throws IgniteCheckedException {
-                io.addRow(buf, row, rowSize, pageSize());
+                io.addRow(pageAddr, row, rowSize, pageSize());
 
                 if (isWalDeltaRecordNeeded(wal, page)) {
                     // TODO This record must contain only a reference to a logical WAL record with the actual data.
                     byte[] payload = new byte[rowSize];
 
-                    DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
+                    DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
 
                     assert data.payloadSize() == rowSize;
 
-                    PageUtils.getBytes(buf, data.offset(), payload, 0, rowSize);
+                    PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
 
                     wal.log(new DataPageInsertRecord(
                         cacheId,
@@ -149,7 +171,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
 
             /**
              * @param page Page.
-             * @param buf Buffer.
+             * @param pageAddr Page address.
              * @param io IO.
              * @param row Row.
              * @param written Written size.
@@ -159,7 +181,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
              */
             private int addRowFragment(
                 Page page,
-                long buf,
+                long pageAddr,
                 DataPageIO io,
                 CacheDataRow row,
                 int written,
@@ -168,7 +190,7 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
                 // Read last link before the fragment write, because it will be updated there.
                 long lastLink = row.link();
 
-                int payloadSize = io.addRowFragment(pageMem, buf, row, written, rowSize, pageSize());
+                int payloadSize = io.addRowFragment(pageMem, pageAddr, row, written, rowSize, pageSize());
 
                 assert payloadSize > 0 : payloadSize;
 
@@ -176,9 +198,9 @@ public class FreeListImpl extends PagesList implements FreeList, ReuseList {
                     // TODO This record must contain only a reference to a logical WAL record with the actual data.
                     byte[] payload = new byte[payloadSize];
 
-                    DataPagePayload data = io.readPayload(buf, PageIdUtils.itemId(row.link()), pageSize());
+                    DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
 
-                    PageUtils.getBytes(buf, data.offset(), payload, 0, payloadSize);
+                    PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
 
                     wal.log(new DataPageInsertFragmentRecord(cacheId, page.id(), payload, lastLink));
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5401b0fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index 08fa30e..fdb812f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.internal.SB;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Data pages IO.
@@ -601,6 +602,7 @@ public class DataPageIO extends PageIO {
      * @param pageAddr Page address.
      * @param itemId Item ID.
      * @param pageSize Page size.
+     * @param payload Row data.
      * @param row Row.
      * @param rowSize Row size.
      * @return {@code True} if entry is not fragmented.
@@ -610,16 +612,21 @@ public class DataPageIO extends PageIO {
         final long pageAddr,
         int itemId,
         int pageSize,
-        CacheDataRow row,
+        @Nullable byte[] payload,
+        @Nullable CacheDataRow row,
         final int rowSize) throws IgniteCheckedException {
         assert checkIndex(itemId) : itemId;
+        assert row != null ^ payload != null;
 
         final int dataOff = getDataOffset(pageAddr, itemId, pageSize);
 
         if (isFragmented(pageAddr, dataOff))
             return false;
 
-        writeRowData(pageAddr, dataOff, rowSize, row, false);
+        if (row != null)
+            writeRowData(pageAddr, dataOff, rowSize, row, false);
+        else
+            writeRowData(pageAddr, dataOff, payload);
 
         return true;
     }