You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/09/29 09:14:42 UTC

svn commit: r1762750 [1/2] - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ test/java/org/apache/jackrabbit/oak/segment/ test/java/org/apache/jackrabbit/oak/segment/standby/

Author: frm
Date: Thu Sep 29 09:14:42 2016
New Revision: 1762750

URL: http://svn.apache.org/viewvc?rev=1762750&view=rev
Log:
OAK-4659 - Address records by logic identifiers instead of offsets

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbersTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbersTest.java   (with props)
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/IntSetTest.java
      - copied, changed from r1762744, jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ShortSetTest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbersTest.java   (with props)
Removed:
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ShortSetTest.java
Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BlockRecord.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CachingSegmentReader.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ListRecord.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MapRecord.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Record.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordId.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordIdSet.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentParser.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentPropertyState.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentStream.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Template.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CompactionAndCleanupIT.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/standby/MBeanTest.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BlockRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BlockRecord.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BlockRecord.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/BlockRecord.java Thu Sep 29 09:14:42 2016
@@ -54,7 +54,7 @@ class BlockRecord extends Record {
             length = size - position;
         }
         if (length > 0) {
-            getSegment().readBytes(getOffset(position), buffer, offset, length);
+            getSegment().readBytes(getRecordNumber(), position, buffer, offset, length);
         }
         return length;
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CachingSegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CachingSegmentReader.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CachingSegmentReader.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CachingSegmentReader.java Thu Sep 29 09:14:42 2016
@@ -87,7 +87,7 @@ public class CachingSegmentReader implem
         final SegmentId segmentId = id.getSegmentId();
         long msb = segmentId.getMostSignificantBits();
         long lsb = segmentId.getLeastSignificantBits();
-        return stringCache.get(msb, lsb, id.getOffset(), new Function<Integer, String>() {
+        return stringCache.get(msb, lsb, id.getRecordNumber(), new Function<Integer, String>() {
             @Nonnull
             @Override
             public String apply(Integer offset) {
@@ -111,7 +111,7 @@ public class CachingSegmentReader implem
         final SegmentId segmentId = id.getSegmentId();
         long msb = segmentId.getMostSignificantBits();
         long lsb = segmentId.getLeastSignificantBits();
-        return templateCache.get(msb, lsb, id.getOffset(), new Function<Integer, Template>() {
+        return templateCache.get(msb, lsb, id.getRecordNumber(), new Function<Integer, Template>() {
             @Nonnull
             @Override
             public Template apply(Integer offset) {

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java?rev=1762750&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java Thu Sep 29 09:14:42 2016
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import java.util.Iterator;
+
+/**
+ * An implementation of a record number to offset table that assumes that a
+ * record number is also a valid offset in the segment. This implementation is
+ * useful when an instance of a table has still to be provided, but record
+ * numbers have no logical semantics (e.g. for bulk segments).
+ * <p>
+ * This implementation is trivially thread-safe.
+ */
+class IdentityRecordNumbers implements RecordNumbers {
+
+    @Override
+    public int getOffset(int recordNumber) {
+        return recordNumber;
+    }
+
+    @Override
+    public Iterator<Entry> iterator() {
+        throw new UnsupportedOperationException("invalid usage of the record-number-to-offset table");
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/IdentityRecordNumbers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java?rev=1762750&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java Thu Sep 29 09:14:42 2016
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * An immutable record number to offset table. It is initialized at construction
+ * time and can never be changed afterwards.
+ * <p>
+ * This implementation is trivially thread-safe.
+ */
+class ImmutableRecordNumbers implements RecordNumbers {
+
+    private final Map<Integer, Integer> recordNumbers;
+
+    /**
+     * Create a new immutable record number to offset table.
+     *
+     * @param recordNumbers a map of record numbers to offsets. It can't be
+     *                      {@code null}.
+     */
+    ImmutableRecordNumbers(Map<Integer, Integer> recordNumbers) {
+        this.recordNumbers = Maps.newHashMap(recordNumbers);
+    }
+
+    @Override
+    public int getOffset(int recordNumber) {
+        Integer offset = recordNumbers.get(recordNumber);
+
+        if (offset == null) {
+            return -1;
+        }
+
+        return offset;
+    }
+
+    @Override
+    public Iterator<Entry> iterator() {
+        return new RecordNumbersIterator(recordNumbers.entrySet().iterator());
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ImmutableRecordNumbers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ListRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ListRecord.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ListRecord.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/ListRecord.java Thu Sep 29 09:14:42 2016
@@ -62,7 +62,7 @@ class ListRecord extends Record {
             int bucketIndex = index / bucketSize;
             int bucketOffset = index % bucketSize;
             Segment segment = getSegment();
-            RecordId id = segment.readRecordId(getOffset(0, bucketIndex));
+            RecordId id = segment.readRecordId(getRecordNumber(), 0, bucketIndex);
             ListRecord bucket = new ListRecord(
                     id, Math.min(bucketSize, size - bucketIndex * bucketSize));
             return bucket.getEntry(bucketOffset);
@@ -95,13 +95,13 @@ class ListRecord extends Record {
             ids.add(getRecordId());
         } else if (bucketSize == 1) {
             for (int i = 0; i < count; i++) {
-                ids.add(segment.readRecordId(getOffset(0, index + i)));
+                ids.add(segment.readRecordId(getRecordNumber(), 0, index + i));
             }
         } else {
             while (count > 0) {
                 int bucketIndex = index / bucketSize;
                 int bucketOffset = index % bucketSize;
-                RecordId id = segment.readRecordId(getOffset(0, bucketIndex));
+                RecordId id = segment.readRecordId(getRecordNumber(), 0, bucketIndex);
                 ListRecord bucket = new ListRecord(
                         id, Math.min(bucketSize, size - bucketIndex * bucketSize));
                 int n = Math.min(bucket.size() - bucketOffset, count);

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MapRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MapRecord.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MapRecord.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MapRecord.java Thu Sep 29 09:14:42 2016
@@ -104,26 +104,26 @@ public class MapRecord extends Record {
 
     boolean isLeaf() {
         Segment segment = getSegment();
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).isLeaf();
         }
         return !isBranch(head);
     }
 
     public boolean isDiff() {
-        return isDiff(getSegment().readInt(getOffset(0)));
+        return isDiff(getSegment().readInt(getRecordNumber()));
     }
 
     MapRecord[] getBuckets() {
         Segment segment = getSegment();
         MapRecord[] buckets = new MapRecord[BUCKETS_PER_LEVEL];
-        int bitmap = segment.readInt(getOffset(4));
+        int bitmap = segment.readInt(getRecordNumber(), 4);
         int ids = 0;
         for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
             if ((bitmap & (1 << i)) != 0) {
-                buckets[i] = reader.readMap(segment.readRecordId(getOffset(8, ids++)));
+                buckets[i] = reader.readMap(segment.readRecordId(getRecordNumber(), 8, ids++));
             } else {
                 buckets[i] = null;
             }
@@ -133,11 +133,11 @@ public class MapRecord extends Record {
 
     private List<MapRecord> getBucketList(Segment segment) {
         List<MapRecord> buckets = newArrayListWithCapacity(BUCKETS_PER_LEVEL);
-        int bitmap = segment.readInt(getOffset(4));
+        int bitmap = segment.readInt(getRecordNumber(), 4);
         int ids = 0;
         for (int i = 0; i < BUCKETS_PER_LEVEL; i++) {
             if ((bitmap & (1 << i)) != 0) {
-                RecordId id = segment.readRecordId(getOffset(8, ids++));
+                RecordId id = segment.readRecordId(getRecordNumber(), 8, ids++);
                 buckets.add(reader.readMap(id));
             }
         }
@@ -146,9 +146,9 @@ public class MapRecord extends Record {
 
     int size() {
         Segment segment = getSegment();
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).size();
         }
         return getSize(head);
@@ -159,16 +159,16 @@ public class MapRecord extends Record {
         int hash = getHash(name);
         Segment segment = getSegment();
 
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            if (hash == segment.readInt(getOffset(4))) {
-                RecordId key = segment.readRecordId(getOffset(8));
+            if (hash == segment.readInt(getRecordNumber(), 4)) {
+                RecordId key = segment.readRecordId(getRecordNumber(), 8);
                 if (name.equals(reader.readString(key))) {
-                    RecordId value = segment.readRecordId(getOffset(8, 1));
+                    RecordId value = segment.readRecordId(getRecordNumber(), 8, 1);
                     return new MapEntry(reader, name, key, value);
                 }
             }
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).getEntry(name);
         }
 
@@ -181,14 +181,14 @@ public class MapRecord extends Record {
         if (isBranch(size, level)) {
             // this is an intermediate branch record
             // check if a matching bucket exists, and recurse 
-            int bitmap = segment.readInt(getOffset(4));
+            int bitmap = segment.readInt(getRecordNumber(), 4);
             int mask = (1 << BITS_PER_LEVEL) - 1;
             int shift = 32 - (level + 1) * BITS_PER_LEVEL;
             int index = (hash >> shift) & mask;
             int bit = 1 << index;
             if ((bitmap & bit) != 0) {
                 int ids = bitCount(bitmap & (bit - 1));
-                RecordId id = segment.readRecordId(getOffset(8, ids));
+                RecordId id = segment.readRecordId(getRecordNumber(), 8, ids);
                 return reader.readMap(id).getEntry(name);
             } else {
                 return null;
@@ -211,13 +211,11 @@ public class MapRecord extends Record {
             int i = p + (int) ((q - p) * (h - pH) / (qH - pH));
             assert p <= i && i <= q;
 
-            long iH = segment.readInt(getOffset(4 + i * 4)) & HASH_MASK;
+            long iH = segment.readInt(getRecordNumber(), 4 + i * 4) & HASH_MASK;
             int diff = Long.valueOf(iH).compareTo(Long.valueOf(h));
             if (diff == 0) {
-                RecordId keyId = segment.readRecordId(
-                        getOffset(4 + size * 4, i * 2));
-                RecordId valueId = segment.readRecordId(
-                        getOffset(4 + size * 4, i * 2 + 1));
+                RecordId keyId = segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2);
+                RecordId valueId = segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2 + 1);
                 diff = reader.readString(keyId).compareTo(name);
                 if (diff == 0) {
                     return new MapEntry(reader, name, keyId, valueId);
@@ -239,13 +237,13 @@ public class MapRecord extends Record {
         checkNotNull(key);
         Segment segment = getSegment();
 
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            if (hash == segment.readInt(getOffset(4))
-                    && key.equals(segment.readRecordId(getOffset(8)))) {
-                return segment.readRecordId(getOffset(8, 1));
+            if (hash == segment.readInt(getRecordNumber(), 4)
+                    && key.equals(segment.readRecordId(getRecordNumber(), 8))) {
+                return segment.readRecordId(getRecordNumber(), 8, 1);
             }
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).getValue(hash, key);
         }
 
@@ -258,14 +256,14 @@ public class MapRecord extends Record {
         if (isBranch(size, level)) {
             // this is an intermediate branch record
             // check if a matching bucket exists, and recurse
-            int bitmap = segment.readInt(getOffset(4));
+            int bitmap = segment.readInt(getRecordNumber(), 4);
             int mask = (1 << BITS_PER_LEVEL) - 1;
             int shift = 32 - (level + 1) * BITS_PER_LEVEL;
             int index = (hash >> shift) & mask;
             int bit = 1 << index;
             if ((bitmap & bit) != 0) {
                 int ids = bitCount(bitmap & (bit - 1));
-                RecordId id = segment.readRecordId(getOffset(8, ids));
+                RecordId id = segment.readRecordId(getRecordNumber(), 8, ids);
                 return reader.readMap(id).getValue(hash, key);
             } else {
                 return null;
@@ -275,15 +273,12 @@ public class MapRecord extends Record {
         // this is a leaf record; scan the list to find a matching entry
         Long h = hash & HASH_MASK;
         for (int i = 0; i < size; i++) {
-            int hashOffset = getOffset(4 + i * 4);
-            int diff = h.compareTo(segment.readInt(hashOffset) & HASH_MASK);
+            int diff = h.compareTo(segment.readInt(getRecordNumber(), 4 + i * 4) & HASH_MASK);
             if (diff > 0) {
                 return null;
             } else if (diff == 0) {
-                int keyOffset = getOffset(4 + size * 4, i * 2);
-                if (key.equals(segment.readRecordId(keyOffset))) {
-                    int valueOffset = getOffset(4 + size * 4, i * 2 + 1);
-                    return segment.readRecordId(valueOffset);
+                if (key.equals(segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2))) {
+                    return segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2 + 1);
                 }
             }
         }
@@ -293,9 +288,9 @@ public class MapRecord extends Record {
     Iterable<String> getKeys() {
         Segment segment = getSegment();
 
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).getKeys();
         }
 
@@ -317,7 +312,7 @@ public class MapRecord extends Record {
 
         RecordId[] ids = new RecordId[size];
         for (int i = 0; i < size; i++) {
-            ids[i] = segment.readRecordId(getOffset(4 + size * 4, i * 2));
+            ids[i] = segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2);
         }
 
         String[] keys = new String[size];
@@ -335,11 +330,11 @@ public class MapRecord extends Record {
             final RecordId diffKey, final RecordId diffValue) {
         Segment segment = getSegment();
 
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            RecordId key = segment.readRecordId(getOffset(8));
-            RecordId value = segment.readRecordId(getOffset(8, 1));
-            RecordId base = segment.readRecordId(getOffset(8, 2));
+            RecordId key = segment.readRecordId(getRecordNumber(), 8);
+            RecordId value = segment.readRecordId(getRecordNumber(), 8, 1);
+            RecordId base = segment.readRecordId(getRecordNumber(), 8, 2);
             return reader.readMap(base).getEntries(key, value);
         }
 
@@ -366,12 +361,12 @@ public class MapRecord extends Record {
 
         MapEntry[] entries = new MapEntry[size];
         for (int i = 0; i < size; i++) {
-            RecordId key = segment.readRecordId(getOffset(4 + size * 4, i * 2));
+            RecordId key = segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2);
             RecordId value;
             if (key.equals(diffKey)) {
                 value = diffValue;
             } else {
-                value = segment.readRecordId(getOffset(4 + size * 4, i * 2 + 1));
+                value = segment.readRecordId(getRecordNumber(), 4 + size * 4, i * 2 + 1);
             }
             String name = reader.readString(key);
             entries[i] = new MapEntry(reader, name, key, value);
@@ -385,13 +380,13 @@ public class MapRecord extends Record {
         }
 
         Segment segment = getSegment();
-        int head = segment.readInt(getOffset(0));
+        int head = segment.readInt(getRecordNumber());
         if (isDiff(head)) {
-            int hash = segment.readInt(getOffset(4));
-            RecordId keyId = segment.readRecordId(getOffset(8));
+            int hash = segment.readInt(getRecordNumber(), 4);
+            RecordId keyId = segment.readRecordId(getRecordNumber(), 8);
             final String key = reader.readString(keyId);
-            final RecordId value = segment.readRecordId(getOffset(8, 1));
-            MapRecord base = reader.readMap(segment.readRecordId(getOffset(8, 2)));
+            final RecordId value = segment.readRecordId(getRecordNumber(), 8, 1);
+            MapRecord base = reader.readMap(segment.readRecordId(getRecordNumber(), 8, 2));
 
             boolean rv = base.compare(before, new DefaultNodeStateDiff() {
                 @Override
@@ -427,13 +422,13 @@ public class MapRecord extends Record {
         }
 
         Segment beforeSegment = before.getSegment();
-        int beforeHead = beforeSegment.readInt(before.getOffset(0));
+        int beforeHead = beforeSegment.readInt(before.getRecordNumber());
         if (isDiff(beforeHead)) {
-            int hash = beforeSegment.readInt(before.getOffset(4));
-            RecordId keyId = beforeSegment.readRecordId(before.getOffset(8));
+            int hash = beforeSegment.readInt(before.getRecordNumber(), 4);
+            RecordId keyId = beforeSegment.readRecordId(before.getRecordNumber(), 8);
             final String key = reader.readString(keyId);
-            final RecordId value = beforeSegment.readRecordId(before.getOffset(8, 1));
-            MapRecord base = reader.readMap(beforeSegment.readRecordId(before.getOffset(8, 2)));
+            final RecordId value = beforeSegment.readRecordId(before.getRecordNumber(), 8, 1);
+            MapRecord base = reader.readMap(beforeSegment.readRecordId(before.getRecordNumber(), 8, 2));
 
             boolean rv = this.compare(base, new DefaultNodeStateDiff() {
                 @Override

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java?rev=1762750&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java Thu Sep 29 09:14:42 2016
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A thread-safe, mutable record numbers to offset table.
+ */
+class MutableRecordNumbers implements RecordNumbers {
+
+    private final Object lock = new Object();
+
+    private final Map<Integer, Integer> recordNumbers = Maps.newHashMap();
+
+    @Override
+    public int getOffset(int recordNumber) {
+        Integer offset = recordNumbers.get(recordNumber);
+
+        if (offset != null) {
+            return offset;
+        }
+
+        synchronized (lock) {
+            offset = recordNumbers.get(recordNumber);
+
+            if (offset != null) {
+                return offset;
+            }
+
+            return -1;
+        }
+    }
+
+    @Override
+    public Iterator<Entry> iterator() {
+        Map<Integer, Integer> recordNumbers;
+
+        synchronized (lock) {
+            recordNumbers = Maps.newHashMap(this.recordNumbers);
+        }
+
+        return new RecordNumbersIterator(recordNumbers.entrySet().iterator());
+    }
+
+    /**
+     * Return the size of this table.
+     *
+     * @return the size of this table.
+     */
+    public int size() {
+        synchronized (lock) {
+            return recordNumbers.size();
+        }
+    }
+
+    /**
+     * Add a new offset to this table and generate a record number for it.
+     *
+     * @param offset an offset to be added to this table.
+     * @return the record number associated to the offset.
+     */
+    int addOffset(int offset) {
+        int recordNumber;
+
+        synchronized (lock) {
+            recordNumber = recordNumbers.size();
+            recordNumbers.put(recordNumber, offset);
+        }
+
+        return recordNumber;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/MutableRecordNumbers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Record.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Record.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Record.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Record.java Thu Sep 29 09:14:42 2016
@@ -34,7 +34,7 @@ class Record {
     }
 
     private static boolean fastEquals(@Nonnull Record a, @Nonnull Record b) {
-        return a == b || (a.offset == b.offset && a.segmentId.equals(b.segmentId));
+        return a == b || (a.recordNumber == b.recordNumber && a.segmentId.equals(b.segmentId));
     }
 
     /**
@@ -43,9 +43,9 @@ class Record {
     private final SegmentId segmentId;
 
     /**
-     * Segment offset of this record.
+     * Segment recordNumber of this record.
      */
-    private final int offset;
+    private final int recordNumber;
 
     /**
      * Creates a new object for the identified record.
@@ -53,12 +53,12 @@ class Record {
      * @param id record identified
      */
     protected Record(@Nonnull RecordId id) {
-        this(id.getSegmentId(), id.getOffset());
+        this(id.getSegmentId(), id.getRecordNumber());
     }
 
-    protected Record(@Nonnull SegmentId segmentId, int offset) {
+    protected Record(@Nonnull SegmentId segmentId, int recordNumber) {
         this.segmentId = segmentId;
-        this.offset = offset;
+        this.recordNumber = recordNumber;
     }
 
     /**
@@ -70,45 +70,17 @@ class Record {
         return segmentId.getSegment();
     }
 
+    protected int getRecordNumber() {
+        return recordNumber;
+    }
+
     /**
      * Returns the identifier of this record.
      *
      * @return record identifier
      */
     public RecordId getRecordId() {
-        return new RecordId(segmentId, offset);
-    }
-
-    /**
-     * Returns the segment offset of this record.
-     *
-     * @return segment offset of this record
-     */
-    protected final int getOffset() {
-        return offset;
-    }
-
-    /**
-     * Returns the segment offset of the given byte position in this record.
-     *
-     * @param position byte position within this record
-     * @return segment offset of the given byte position
-     */
-    protected final int getOffset(int position) {
-        return getOffset() + position;
-    }
-
-    /**
-     * Returns the segment offset of a byte position in this record.
-     * The position is calculated from the given number of raw bytes and
-     * record identifiers.
-     *
-     * @param bytes number of raw bytes before the position
-     * @param ids number of record identifiers before the position
-     * @return segment offset of the specified byte position
-     */
-    protected final int getOffset(int bytes, int ids) {
-        return getOffset(bytes + ids * Segment.RECORD_ID_BYTES);
+        return new RecordId(segmentId, recordNumber);
     }
 
     //------------------------------------------------------------< Object >--
@@ -120,7 +92,7 @@ class Record {
 
     @Override
     public int hashCode() {
-        return segmentId.hashCode() ^ offset;
+        return segmentId.hashCode() ^ recordNumber;
     }
 
     @Override

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordId.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordId.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordId.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordId.java Thu Sep 29 09:14:42 2016
@@ -22,7 +22,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.Integer.parseInt;
 import static org.apache.jackrabbit.oak.segment.Segment.RECORD_ALIGN_BITS;
-import static org.apache.jackrabbit.oak.segment.Segment.pack;
+import static org.apache.jackrabbit.oak.segment.Segment.RECORD_ID_BYTES;
 
 import java.util.UUID;
 import java.util.regex.Matcher;
@@ -38,7 +38,9 @@ public final class RecordId implements C
 
     private static final Pattern PATTERN = Pattern.compile(
             "([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"
-            + "(:(0|[1-9][0-9]*)|\\.([0-9a-f]{4}))");
+            + "(:(0|[1-9][0-9]*)|\\.([0-9a-f]{8}))");
+
+    static final int SERIALIZED_RECORD_ID_BYTES = 20;
 
     public static RecordId[] EMPTY_ARRAY = new RecordId[0];
 
@@ -54,7 +56,7 @@ public final class RecordId implements C
             if (matcher.group(3) != null) {
                 offset = parseInt(matcher.group(3));
             } else {
-                offset = parseInt(matcher.group(4), 16) << RECORD_ALIGN_BITS;
+                offset = parseInt(matcher.group(4), 16);
             }
 
             return new RecordId(segmentId, offset);
@@ -68,8 +70,6 @@ public final class RecordId implements C
     private final int offset;
 
     public RecordId(SegmentId segmentId, int offset) {
-        checkArgument(offset < Segment.MAX_SEGMENT_SIZE);
-        checkArgument((offset % (1 << RECORD_ALIGN_BITS)) == 0);
         this.segmentId = checkNotNull(segmentId);
         this.offset = offset;
     }
@@ -78,7 +78,7 @@ public final class RecordId implements C
         return segmentId;
     }
 
-    public int getOffset() {
+    public int getRecordNumber() {
         return offset;
     }
 
@@ -94,27 +94,16 @@ public final class RecordId implements C
         return segmentId.getSegment();
     }
 
-    private static void writeLong(byte[] buffer, int pos, long value) {
-        for (int k = 0; k < 8; k++) {
-            buffer[pos + k] = (byte) (value >> (56 - (k << 3)));
-        }
-    }
-
-    private static void writeShort(byte[] buffer, int pos, short value) {
-        buffer[pos] = (byte) (value >> 8);
-        buffer[pos + 1] = (byte) value;
-    }
-
     /**
      * Serialise this record id into an array of bytes: {@code (msb, lsb, offset >> 2)}
      * @return  this record id as byte array
      */
     @Nonnull
     byte[] getBytes() {
-        byte[] buffer = new byte[18];
-        writeLong(buffer, 0, segmentId.getMostSignificantBits());
-        writeLong(buffer, 8, segmentId.getLeastSignificantBits());
-        writeShort(buffer, 16, pack(offset));
+        byte[] buffer = new byte[SERIALIZED_RECORD_ID_BYTES];
+        BinaryUtils.writeLong(buffer, 0, segmentId.getMostSignificantBits());
+        BinaryUtils.writeLong(buffer, 8, segmentId.getLeastSignificantBits());
+        BinaryUtils.writeInt(buffer, 16, offset);
         return buffer;
     }
 
@@ -134,7 +123,7 @@ public final class RecordId implements C
 
     @Override
     public String toString() {
-        return String.format("%s.%04x", segmentId, offset >> RECORD_ALIGN_BITS);
+        return String.format("%s.%08x", segmentId, offset);
     }
 
     /**

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordIdSet.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordIdSet.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordIdSet.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordIdSet.java Thu Sep 29 09:14:42 2016
@@ -22,7 +22,6 @@ package org.apache.jackrabbit.oak.segmen
 import static com.google.common.collect.Maps.newHashMap;
 import static java.lang.System.arraycopy;
 import static java.util.Arrays.binarySearch;
-import static org.apache.jackrabbit.oak.segment.Segment.pack;
 
 import java.util.Map;
 
@@ -33,7 +32,7 @@ import java.util.Map;
  * it contains.
  */
 public class RecordIdSet {
-    private final Map<String, ShortSet> seenIds = newHashMap();
+    private final Map<String, IntSet> seenIds = newHashMap();
 
     /**
      * Add {@code id} to this set if not already present
@@ -42,12 +41,12 @@ public class RecordIdSet {
      */
     public boolean addIfNotPresent(RecordId id) {
         String segmentId = id.getSegmentId().toString();
-        ShortSet offsets = seenIds.get(segmentId);
+        IntSet offsets = seenIds.get(segmentId);
         if (offsets == null) {
-            offsets = new ShortSet();
+            offsets = new IntSet();
             seenIds.put(segmentId, offsets);
         }
-        return offsets.add(pack(id.getOffset()));
+        return offsets.add(id.getRecordNumber());
     }
 
     /**
@@ -57,23 +56,23 @@ public class RecordIdSet {
      */
     public boolean contains(RecordId id) {
         String segmentId = id.getSegmentId().toString();
-        ShortSet offsets = seenIds.get(segmentId);
-        return offsets != null && offsets.contains(pack(id.getOffset()));
+        IntSet offsets = seenIds.get(segmentId);
+        return offsets != null && offsets.contains(id.getRecordNumber());
     }
 
-    static class ShortSet {
-        short[] elements;
+    static class IntSet {
+        int[] elements;
 
-        boolean add(short n) {
+        boolean add(int n) {
             if (elements == null) {
-                elements = new short[1];
+                elements = new int[1];
                 elements[0] = n;
                 return true;
             } else {
                 int k = binarySearch(elements, n);
                 if (k < 0) {
                     int l = -k - 1;
-                    short[] e = new short[elements.length + 1];
+                    int[] e = new int[elements.length + 1];
                     arraycopy(elements, 0, e, 0, l);
                     e[l] = n;
                     int c = elements.length - l;
@@ -88,7 +87,7 @@ public class RecordIdSet {
             }
         }
 
-        boolean contains(short n) {
+        boolean contains(int n) {
             return elements != null && binarySearch(elements, n) >= 0;
         }
     }

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java?rev=1762750&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java Thu Sep 29 09:14:42 2016
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
+
+/**
+ * A table to translate record numbers to offsets.
+ */
+interface RecordNumbers extends Iterable<Entry> {
+
+    /**
+     * Translate a record number to an offset.
+     *
+     * @param recordNumber A record number.
+     * @return the offset corresponding to the record number, or {@code -1} if
+     * no offset is associated to the record number.
+     */
+    int getOffset(int recordNumber);
+
+    /**
+     * Represents a pair of a record number and its corresponding offset.
+     */
+    interface Entry {
+
+        /**
+         * The record number part of this pair.
+         *
+         * @return a record number.
+         */
+        int getRecordNumber();
+
+        /**
+         * The offset part of this pair.
+         *
+         * @return an offset.
+         */
+        int getOffset();
+
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbers.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java?rev=1762750&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java Thu Sep 29 09:14:42 2016
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
+
+/**
+ * Utility class implementing an iterator over record numbers to offset pairs.
+ * It wraps an underlying iterator looping over map entries, where each entry is
+ * a tuple of integers.
+ */
+class RecordNumbersIterator implements Iterator<Entry> {
+
+    private static class Entry implements RecordNumbers.Entry {
+
+        private final Map.Entry<Integer, Integer> entry;
+
+        public Entry(Map.Entry<Integer, Integer> entry) {
+            this.entry = entry;
+        }
+
+        @Override
+        public int getRecordNumber() {
+            return entry.getKey();
+        }
+
+        @Override
+        public int getOffset() {
+            return entry.getValue();
+        }
+
+    }
+
+    private final Iterator<Map.Entry<Integer, Integer>> iterator;
+
+    RecordNumbersIterator(Iterator<Map.Entry<Integer, Integer>> iterator) {
+        this.iterator = iterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public RecordNumbers.Entry next() {
+        return new Entry(iterator.next());
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/RecordNumbersIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/Segment.java Thu Sep 29 09:14:42 2016
@@ -23,6 +23,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkPositionIndexes;
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Maps.newHashMap;
+import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
 import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
 import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
 import static org.apache.jackrabbit.oak.segment.SegmentVersion.LATEST_VERSION;
@@ -49,6 +50,7 @@ import org.apache.commons.io.output.Byte
 import org.apache.jackrabbit.oak.api.PropertyState;
 import org.apache.jackrabbit.oak.api.Type;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
+import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
 
 /**
  * A list of records.
@@ -60,14 +62,14 @@ import org.apache.jackrabbit.oak.plugins
  */
 public class Segment {
 
-    static final int HEADER_SIZE = 18;
+    static final int HEADER_SIZE = 22;
 
     /**
      * Number of bytes used for storing a record identifier. One byte
      * is used for identifying the segment and two for the record offset
      * within that segment.
      */
-    static final int RECORD_ID_BYTES = 4 + 2;
+    static final int RECORD_ID_BYTES = 4 + 4;
 
     /**
      * The limit on segment references within one segment. Since record
@@ -122,6 +124,8 @@ public class Segment {
 
     public static final int REFERENCED_SEGMENT_ID_COUNT_OFFSET = 14;
 
+    public static final int RECORD_NUMBER_COUNT_OFFSET = 18;
+
     @Nonnull
     private final SegmentStore store;
 
@@ -143,22 +147,9 @@ public class Segment {
     private final Map<Integer, SegmentId> segmentIdCache = newHashMap();
 
     /**
-     * Unpacks a 4 byte aligned segment offset.
-     * @param offset  4 byte aligned segment offset
-     * @return unpacked segment offset
-     */
-    public static int unpack(short offset) {
-        return (offset & 0xffff) << RECORD_ALIGN_BITS;
-    }
-
-    /**
-     * Packs a segment offset into a 4 byte aligned address packed into a {@code short}.
-     * @param offset  segment offset
-     * @return  encoded segment offset packed into a {@code short}
+     * The table translating record numbers to offsets.
      */
-    public static short pack(int offset) {
-        return (short) (offset >> RECORD_ALIGN_BITS);
-    }
+    private final RecordNumbers recordNumbers;
 
     /**
      * Align an {@code address} on the given {@code boundary}
@@ -194,8 +185,10 @@ public class Segment {
                     }
             });
             this.version = SegmentVersion.fromByte(segmentVersion);
+            this.recordNumbers = readRecordNumberOffsets();
         } else {
             this.version = LATEST_VERSION;
+            this.recordNumbers = new IdentityRecordNumbers();
         }
     }
 
@@ -211,9 +204,34 @@ public class Segment {
         }
     }
 
+    /**
+     * Read the serialized table mapping record numbers to offsets.
+     *
+     * @return An instance of {@link RecordNumbers}, never {@code null}.
+     */
+    private RecordNumbers readRecordNumberOffsets() {
+        Map<Integer, Integer> recordNumberOffsets = newHashMapWithExpectedSize(getRecordNumberCount());
+
+        int position = data.position();
+
+        position += HEADER_SIZE;
+        position += getReferencedSegmentIdCount() * 16;
+
+        for (int i = 0; i < getRecordNumberCount(); i++) {
+            int recordNumber = data.getInt(position);
+            position += 4;
+            int offset = data.getInt(position);
+            position += 4;
+            recordNumberOffsets.put(recordNumber, offset);
+        }
+
+        return new ImmutableRecordNumbers(recordNumberOffsets);
+    }
+
     Segment(@Nonnull SegmentStore store,
             @Nonnull SegmentReader reader,
             @Nonnull byte[] buffer,
+            @Nonnull RecordNumbers recordNumbers,
             @Nonnull String info) {
         this.store = checkNotNull(store);
         this.reader = checkNotNull(reader);
@@ -221,6 +239,7 @@ public class Segment {
         this.info = checkNotNull(info);
         this.data = ByteBuffer.wrap(checkNotNull(buffer));
         this.version = SegmentVersion.fromByte(buffer[3]);
+        this.recordNumbers = recordNumbers;
         id.loaded(this);
     }
 
@@ -228,18 +247,36 @@ public class Segment {
         return version;
     }
 
+    private int pos(int recordNumber, int length) {
+        return pos(recordNumber, 0, 0, length);
+    }
+
+    private int pos(int recordNumber, int rawOffset, int length) {
+        return pos(recordNumber, rawOffset, 0, length);
+    }
+
     /**
-     * Maps the given record offset to the respective position within the
+     * Maps the given record number to the respective position within the
      * internal {@link #data} array. The validity of a record with the given
-     * length at the given offset is also verified.
+     * length at the given record number is also verified.
      *
-     * @param offset record offset
-     * @param length record length
+     * @param recordNumber   record number
+     * @param rawOffset      offset to add to the base position of the record
+     * @param recordIdOffset offset to add to to the base position of the
+     *                       record, multiplied by the length of a record ID
+     * @param length         record length
      * @return position within the data array
      */
-    private int pos(int offset, int length) {
-        checkPositionIndexes(offset, offset + length, MAX_SEGMENT_SIZE);
-        int pos = data.limit() - MAX_SEGMENT_SIZE + offset;
+    private int pos(int recordNumber, int rawOffset, int recordIdOffset, int length) {
+        int offset = recordNumbers.getOffset(recordNumber);
+
+        if (offset == -1) {
+            throw new IllegalStateException("invalid record number");
+        }
+
+        int base = offset + rawOffset + recordIdOffset * RECORD_ID_BYTES;
+        checkPositionIndexes(base, base + length, MAX_SEGMENT_SIZE);
+        int pos = data.limit() - MAX_SEGMENT_SIZE + base;
         checkState(pos >= data.position());
         return pos;
     }
@@ -256,6 +293,10 @@ public class Segment {
         return data.getInt(REFERENCED_SEGMENT_ID_COUNT_OFFSET);
     }
 
+    public int getRecordNumberCount() {
+        return data.getInt(RECORD_NUMBER_COUNT_OFFSET);
+    }
+
     public UUID getReferencedSegmentId(int index) {
         checkArgument(index < getReferencedSegmentIdCount());
 
@@ -300,7 +341,8 @@ public class Segment {
 
         position += HEADER_SIZE;
         position += getReferencedSegmentIdCount() * 16;
-        position += index * 3;
+        position += getRecordNumberCount() * 8;
+        position += index * 5;
 
         return RecordType.values()[data.get(position) & 0xff];
     }
@@ -312,10 +354,11 @@ public class Segment {
 
         position += HEADER_SIZE;
         position += getReferencedSegmentIdCount() * 16;
-        position += index * 3;
+        position += getRecordNumberCount() * 8;
+        position += index * 5;
         position += 1;
 
-        return (data.getShort(position) & 0xffff) << RECORD_ALIGN_BITS;
+        return data.getInt(position);
     }
 
     private volatile String info;
@@ -347,48 +390,66 @@ public class Segment {
         return data.remaining();
     }
 
-    byte readByte(int offset) {
-        return data.get(pos(offset, 1));
+    byte readByte(int recordNumber) {
+        return readByte(recordNumber, 0);
+    }
+
+    byte readByte(int recordNumber, int offset) {
+        return data.get(pos(recordNumber, offset, 1));
+    }
+
+    short readShort(int recordNumber) {
+        return data.getShort(pos(recordNumber, 2));
     }
 
-    short readShort(int offset) {
-        return data.getShort(pos(offset, 2));
+    int readInt(int recordNumber) {
+        return data.getInt(pos(recordNumber, 4));
     }
 
-    int readInt(int offset) {
-        return data.getInt(pos(offset, 4));
+    int readInt(int recordNumber, int offset) {
+        return data.getInt(pos(recordNumber, offset, 4));
     }
 
-    long readLong(int offset) {
-        return data.getLong(pos(offset, 8));
+    long readLong(int recordNumber) {
+        return data.getLong(pos(recordNumber, 8));
     }
 
     /**
      * Reads the given number of bytes starting from the given position
      * in this segment.
      *
-     * @param position position within segment
+     * @param recordNumber position within segment
      * @param buffer target buffer
      * @param offset offset within target buffer
      * @param length number of bytes to read
      */
-    void readBytes(int position, byte[] buffer, int offset, int length) {
+    void readBytes(int recordNumber, byte[] buffer, int offset, int length) {
+        readBytes(recordNumber, 0, buffer, offset, length);
+    }
+
+    void readBytes(int recordNumber, int position, byte[] buffer, int offset, int length) {
         checkNotNull(buffer);
         checkPositionIndexes(offset, offset + length, buffer.length);
         ByteBuffer d = data.duplicate();
-        d.position(pos(position, length));
+        d.position(pos(recordNumber, position, length));
         d.get(buffer, offset, length);
     }
 
-    RecordId readRecordId(int offset) {
-        int pos = pos(offset, RECORD_ID_BYTES);
-        return internalReadRecordId(pos);
+    RecordId readRecordId(int recordNumber, int rawOffset, int recordIdOffset) {
+        return internalReadRecordId(pos(recordNumber, rawOffset, recordIdOffset, RECORD_ID_BYTES));
+    }
+
+    RecordId readRecordId(int recordNumber, int rawOffset) {
+        return readRecordId(recordNumber, rawOffset, 0);
+    }
+
+    RecordId readRecordId(int recordNumber) {
+        return readRecordId(recordNumber, 0, 0);
     }
 
     private RecordId internalReadRecordId(int pos) {
         SegmentId segmentId = dereferenceSegmentId(data.getInt(pos));
-        int offset = (data.getShort(pos + 4) & 0xffff) << RECORD_ALIGN_BITS;
-        return new RecordId(segmentId, offset);
+        return new RecordId(segmentId, data.getInt(pos + 4));
     }
 
     private SegmentId dereferenceSegmentId(int reference) {
@@ -450,19 +511,20 @@ public class Segment {
     }
 
     @Nonnull
-    Template readTemplate(int offset) {
-        int head = readInt(offset);
+    Template readTemplate(int recordNumber) {
+        int head = readInt(recordNumber);
         boolean hasPrimaryType = (head & (1 << 31)) != 0;
         boolean hasMixinTypes = (head & (1 << 30)) != 0;
         boolean zeroChildNodes = (head & (1 << 29)) != 0;
         boolean manyChildNodes = (head & (1 << 28)) != 0;
         int mixinCount = (head >> 18) & ((1 << 10) - 1);
         int propertyCount = head & ((1 << 18) - 1);
-        offset += 4;
+
+        int offset = 4;
 
         PropertyState primaryType = null;
         if (hasPrimaryType) {
-            RecordId primaryId = readRecordId(offset);
+            RecordId primaryId = readRecordId(recordNumber, offset);
             primaryType = PropertyStates.createProperty(
                     "jcr:primaryType", reader.readString(primaryId), Type.NAME);
             offset += RECORD_ID_BYTES;
@@ -472,7 +534,7 @@ public class Segment {
         if (hasMixinTypes) {
             String[] mixins = new String[mixinCount];
             for (int i = 0; i < mixins.length; i++) {
-                RecordId mixinId = readRecordId(offset);
+                RecordId mixinId = readRecordId(recordNumber, offset);
                 mixins[i] =  reader.readString(mixinId);
                 offset += RECORD_ID_BYTES;
             }
@@ -484,24 +546,24 @@ public class Segment {
         if (manyChildNodes) {
             childName = Template.MANY_CHILD_NODES;
         } else if (!zeroChildNodes) {
-            RecordId childNameId = readRecordId(offset);
+            RecordId childNameId = readRecordId(recordNumber, offset);
             childName = reader.readString(childNameId);
             offset += RECORD_ID_BYTES;
         }
 
         PropertyTemplate[] properties;
-        properties = readProps(propertyCount, offset);
+        properties = readProps(propertyCount, recordNumber, offset);
         return new Template(reader, primaryType, mixinTypes, properties, childName);
     }
 
-    private PropertyTemplate[] readProps(int propertyCount, int offset) {
+    private PropertyTemplate[] readProps(int propertyCount, int recordNumber, int offset) {
         PropertyTemplate[] properties = new PropertyTemplate[propertyCount];
         if (propertyCount > 0) {
-            RecordId id = readRecordId(offset);
+            RecordId id = readRecordId(recordNumber, offset);
             ListRecord propertyNames = new ListRecord(id, properties.length);
             offset += RECORD_ID_BYTES;
             for (int i = 0; i < propertyCount; i++) {
-                byte type = readByte(offset++);
+                byte type = readByte(recordNumber, offset++);
                 properties[i] = new PropertyTemplate(i,
                         reader.readString(propertyNames.getEntry(i)), Type.fromTag(
                                 Math.abs(type), type < 0));
@@ -511,11 +573,11 @@ public class Segment {
     }
 
     long readLength(RecordId id) {
-        return id.getSegment().readLength(id.getOffset());
+        return id.getSegment().readLength(id.getRecordNumber());
     }
 
-    long readLength(int offset) {
-        return internalReadLength(pos(offset, 1));
+    long readLength(int recordNumber) {
+        return internalReadLength(pos(recordNumber, 1));
     }
 
     private long internalReadLength(int pos) {
@@ -559,6 +621,10 @@ public class Segment {
                     writer.format("reference %02x: %s%n", i, getReferencedSegmentId(i));
                 }
 
+                for (Entry entry : recordNumbers) {
+                    writer.format("record number %08x: %08x", entry.getRecordNumber(), entry.getOffset());
+                }
+
                 for (int i = 0; i < getRootCount(); i++) {
                     writer.format("root %d: %s at %04x%n", i, getRootType(i), getRootOffset(i));
                 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBlob.java Thu Sep 29 09:14:42 2016
@@ -59,38 +59,35 @@ public class SegmentBlob extends Record
         this.blobStore = blobStore;
     }
 
-    private InputStream getInlineStream(
-            Segment segment, int offset, int length) {
+    private InputStream getInlineStream(Segment segment, int offset, int length) {
         byte[] inline = new byte[length];
-        segment.readBytes(offset, inline, 0, length);
+        segment.readBytes(getRecordNumber(), offset, inline, 0, length);
         return new SegmentStream(getRecordId(), inline);
     }
 
     @Override @Nonnull
     public InputStream getNewStream() {
         Segment segment = getSegment();
-        int offset = getOffset();
-        byte head = segment.readByte(offset);
+        byte head = segment.readByte(getRecordNumber());
         if ((head & 0x80) == 0x00) {
             // 0xxx xxxx: small value
-            return getInlineStream(segment, offset + 1, head);
+            return getInlineStream(segment, 1, head);
         } else if ((head & 0xc0) == 0x80) {
             // 10xx xxxx: medium value
-            int length = (segment.readShort(offset) & 0x3fff) + SMALL_LIMIT;
-            return getInlineStream(segment, offset + 2, length);
+            int length = (segment.readShort(getRecordNumber()) & 0x3fff) + SMALL_LIMIT;
+            return getInlineStream(segment, 2, length);
         } else if ((head & 0xe0) == 0xc0) {
             // 110x xxxx: long value
-            long length = (segment.readLong(offset) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
+            long length = (segment.readLong(getRecordNumber()) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
             int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE);
-            ListRecord list = new ListRecord(
-                    segment.readRecordId(offset + 8), listSize);
+            ListRecord list = new ListRecord(segment.readRecordId(getRecordNumber(), 8), listSize);
             return new SegmentStream(getRecordId(), list, length);
         } else if ((head & 0xf0) == 0xe0) {
             // 1110 xxxx: external value, short blob ID
-            return getNewStream(readShortBlobId(segment, offset, head));
+            return getNewStream(readShortBlobId(segment, getRecordNumber(), head));
         } else if ((head & 0xf8) == 0xf0) {
             // 1111 0xxx: external value, long blob ID
-            return getNewStream(readLongBlobId(segment, offset));
+            return getNewStream(readLongBlobId(segment, getRecordNumber()));
         } else {
             throw new IllegalStateException(String.format(
                     "Unexpected value record type: %02x", head & 0xff));
@@ -100,23 +97,22 @@ public class SegmentBlob extends Record
     @Override
     public long length() {
         Segment segment = getSegment();
-        int offset = getOffset();
-        byte head = segment.readByte(offset);
+        byte head = segment.readByte(getRecordNumber());
         if ((head & 0x80) == 0x00) {
             // 0xxx xxxx: small value
             return head;
         } else if ((head & 0xc0) == 0x80) {
             // 10xx xxxx: medium value
-            return (segment.readShort(offset) & 0x3fff) + SMALL_LIMIT;
+            return (segment.readShort(getRecordNumber()) & 0x3fff) + SMALL_LIMIT;
         } else if ((head & 0xe0) == 0xc0) {
             // 110x xxxx: long value
-            return (segment.readLong(offset) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
+            return (segment.readLong(getRecordNumber()) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
         } else if ((head & 0xf0) == 0xe0) {
             // 1110 xxxx: external value, short blob ID
-            return getLength(readShortBlobId(segment, offset, head));
+            return getLength(readShortBlobId(segment, getRecordNumber(), head));
         } else if ((head & 0xf8) == 0xf0) {
             // 1111 0xxx: external value, long blob ID
-            return getLength(readLongBlobId(segment, offset));
+            return getLength(readLongBlobId(segment, getRecordNumber()));
         } else {
             throw new IllegalStateException(String.format(
                     "Unexpected value record type: %02x", head & 0xff));
@@ -150,26 +146,25 @@ public class SegmentBlob extends Record
 
     public boolean isExternal() {
         Segment segment = getSegment();
-        int offset = getOffset();
-        byte head = segment.readByte(offset);
+        byte head = segment.readByte(getRecordNumber());
         // 1110 xxxx or 1111 0xxx: external value
         return (head & 0xf0) == 0xe0 || (head & 0xf8) == 0xf0;
     }
 
     @CheckForNull
     public String getBlobId() {
-        return readBlobId(getSegment(), getOffset());
+        return readBlobId(getSegment(), getRecordNumber());
     }
 
     @CheckForNull
-    static String readBlobId(@Nonnull Segment segment, int offset) {
-        byte head = segment.readByte(offset);
+    static String readBlobId(@Nonnull Segment segment, int recordNumber) {
+        byte head = segment.readByte(recordNumber);
         if ((head & 0xf0) == 0xe0) {
             // 1110 xxxx: external value, small blob ID
-            return readShortBlobId(segment, offset, head);
+            return readShortBlobId(segment, recordNumber, head);
         } else if ((head & 0xf8) == 0xf0) {
             // 1111 0xxx: external value, long blob ID
-            return readLongBlobId(segment, offset);
+            return readLongBlobId(segment, recordNumber);
         } else {
             return null;
         }
@@ -205,28 +200,27 @@ public class SegmentBlob extends Record
 
     //-----------------------------------------------------------< private >--
 
-    private static String readShortBlobId(Segment segment, int offset, byte head) {
-        int length = (head & 0x0f) << 8 | (segment.readByte(offset + 1) & 0xff);
+    private static String readShortBlobId(Segment segment, int recordNumber, byte head) {
+        int length = (head & 0x0f) << 8 | (segment.readByte(recordNumber, 1) & 0xff);
         byte[] bytes = new byte[length];
-        segment.readBytes(offset + 2, bytes, 0, length);
+        segment.readBytes(recordNumber, 2, bytes, 0, length);
         return new String(bytes, UTF_8);
     }
 
-    private static String readLongBlobId(Segment segment, int offset) {
-        RecordId blobId = segment.readRecordId(offset + 1);
-        return blobId.getSegment().readString(blobId.getOffset());
+    private static String readLongBlobId(Segment segment, int recordNumber) {
+        RecordId blobId = segment.readRecordId(recordNumber, 1);
+        return blobId.getSegment().readString(blobId.getRecordNumber());
     }
 
     private List<RecordId> getBulkRecordIds() {
         Segment segment = getSegment();
-        int offset = getOffset();
-        byte head = segment.readByte(offset);
+        byte head = segment.readByte(getRecordNumber());
         if ((head & 0xe0) == 0xc0) {
             // 110x xxxx: long value
-            long length = (segment.readLong(offset) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
+            long length = (segment.readLong(getRecordNumber()) & 0x1fffffffffffffffL) + MEDIUM_LIMIT;
             int listSize = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE);
             ListRecord list = new ListRecord(
-                segment.readRecordId(offset + 8), listSize);
+                    segment.readRecordId(getRecordNumber(), 8), listSize);
             return list.getEntries();
         } else {
             return null;

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriter.java Thu Sep 29 09:14:42 2016
@@ -31,7 +31,6 @@ import static java.lang.System.currentTi
 import static java.lang.System.identityHashCode;
 import static org.apache.jackrabbit.oak.segment.Segment.GC_GENERATION_OFFSET;
 import static org.apache.jackrabbit.oak.segment.Segment.HEADER_SIZE;
-import static org.apache.jackrabbit.oak.segment.Segment.MAX_SEGMENT_SIZE;
 import static org.apache.jackrabbit.oak.segment.Segment.RECORD_ID_BYTES;
 import static org.apache.jackrabbit.oak.segment.Segment.align;
 import static org.apache.jackrabbit.oak.segment.SegmentId.isDataSegmentId;
@@ -45,6 +44,7 @@ import java.util.Set;
 import javax.annotation.CheckForNull;
 import javax.annotation.Nonnull;
 
+import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +73,7 @@ public class SegmentBufferWriter impleme
     /**
      * Enable an extra check logging warnings should this writer create segments
      * referencing segments from an older generation.
+     *
      * @see #checkGCGeneration(SegmentId)
      */
     private static final boolean ENABLE_GENERATION_CHECK = Boolean.getBoolean("enable-generation-check");
@@ -115,6 +116,8 @@ public class SegmentBufferWriter impleme
 
     private final Map<SegmentId, Integer> referencedSegmentIds = newHashMap();
 
+    private MutableRecordNumbers recordNumbers = new MutableRecordNumbers();
+
     @Nonnull
     private final SegmentStore store;
 
@@ -215,13 +218,14 @@ public class SegmentBufferWriter impleme
         position = buffer.length;
         roots.clear();
         referencedSegmentIds.clear();
+        recordNumbers = new MutableRecordNumbers();
 
         String metaInfo =
             "{\"wid\":\"" + wid + '"' +
             ",\"sno\":" + tracker.getSegmentCount() +
             ",\"t\":" + currentTimeMillis() + "}";
         try {
-            segment = new Segment(store, reader, buffer, metaInfo);
+            segment = new Segment(store, reader, buffer, recordNumbers, metaInfo);
 
             statistics = new Statistics();
             statistics.id = segment.getSegmentId();
@@ -281,17 +285,10 @@ public class SegmentBufferWriter impleme
             roots.remove(recordId);
         }
 
-        int offset = recordId.getOffset();
-
-        checkState(0 <= offset && offset < MAX_SEGMENT_SIZE);
-        checkState(offset == align(offset, 1 << Segment.RECORD_ALIGN_BITS));
-
-        SegmentId segmentId = recordId.getSegmentId();
+        checkGCGeneration(recordId.getSegmentId());
 
-        checkGCGeneration(segmentId);
-
-        writeInt(writeSegmentIdReference(segmentId));
-        writeShort((short) ((offset >> Segment.RECORD_ALIGN_BITS) & 0xffff));
+        writeInt(writeSegmentIdReference(recordId.getSegmentId()));
+        writeInt(recordId.getRecordNumber());
 
         statistics.recordIdCount++;
 
@@ -370,7 +367,10 @@ public class SegmentBufferWriter impleme
             int referencedSegmentIdCount = referencedSegmentIds.size();
             statistics.segmentIdCount = referencedSegmentIdCount;
 
-            int totalLength = align(HEADER_SIZE + referencedSegmentIdCount * 16 + rootcount * 3 + length, 16);
+            int recordNumberCount = recordNumbers.size();
+            BinaryUtils.writeInt(buffer, Segment.RECORD_NUMBER_COUNT_OFFSET, recordNumberCount);
+
+            int totalLength = align(HEADER_SIZE + referencedSegmentIdCount * 16 + rootcount * 5 + recordNumberCount * 8 + length, 16);
 
             if (totalLength > buffer.length) {
                 throw new IllegalStateException("too much data for a segment");
@@ -394,9 +394,14 @@ public class SegmentBufferWriter impleme
                 length = buffer.length;
             }
 
+            for (Entry entry : recordNumbers) {
+                pos = BinaryUtils.writeInt(buffer, pos, entry.getRecordNumber());
+                pos = BinaryUtils.writeInt(buffer, pos, entry.getOffset());
+            }
+
             for (Map.Entry<RecordId, RecordType> entry : roots.entrySet()) {
                 pos = BinaryUtils.writeByte(buffer, pos, (byte) entry.getValue().ordinal());
-                pos = BinaryUtils.writeShort(buffer, pos, (short) (entry.getKey().getOffset() >> Segment.RECORD_ALIGN_BITS));
+                pos = BinaryUtils.writeInt(buffer, pos, entry.getKey().getRecordNumber());
             }
 
             SegmentId segmentId = segment.getSegmentId();
@@ -435,8 +440,9 @@ public class SegmentBufferWriter impleme
         // unreferenced segments.
 
         int rootCount = roots.size() + 1;
+        int recordNumbersCount = recordNumbers.size() + 1;
         int referencedIdCount = referencedSegmentIds.size() + ids.size();
-        int headerSize = HEADER_SIZE + rootCount * 3 + referencedIdCount * 16;
+        int headerSize = HEADER_SIZE + rootCount * 5 + referencedIdCount * 16 + recordNumbersCount * 8;
         int segmentSize = align(headerSize + recordSize + length, 16);
 
         // If the size estimate looks too big, recompute it with a more
@@ -472,7 +478,7 @@ public class SegmentBufferWriter impleme
                 }
             }
 
-            headerSize = HEADER_SIZE + rootCount * 3 + referencedIdCount * 16;
+            headerSize = HEADER_SIZE + rootCount * 5 + referencedIdCount * 16 + recordNumbersCount * 8;
             segmentSize = align(headerSize + recordSize + length, 16);
         }
 
@@ -486,7 +492,8 @@ public class SegmentBufferWriter impleme
         position = buffer.length - length;
         checkState(position >= 0);
 
-        RecordId id = new RecordId(segment.getSegmentId(), position);
+        int recordNumber = recordNumbers.addOffset(position);
+        RecordId id = new RecordId(segment.getSegmentId(), recordNumber);
         roots.put(id, type);
         return id;
     }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java?rev=1762750&r1=1762749&r2=1762750&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeState.java Thu Sep 29 09:14:42 2016
@@ -34,7 +34,7 @@ import static org.apache.jackrabbit.oak.
 import static org.apache.jackrabbit.oak.api.Type.STRINGS;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
-import static org.apache.jackrabbit.oak.segment.Segment.unpack;
+import static org.apache.jackrabbit.oak.segment.Segment.RECORD_ID_BYTES;
 import static org.apache.jackrabbit.oak.spi.state.AbstractNodeState.checkValidName;
 
 import java.nio.ByteBuffer;
@@ -91,7 +91,7 @@ public class SegmentNodeState extends Re
         if (templateId == null) {
             // no problem if updated concurrently,
             // as each concurrent thread will just get the same value
-            templateId = getSegment().readRecordId(getOffset(0, 1));
+            templateId = getSegment().readRecordId(getRecordNumber(), 0, 1);
         }
         return templateId;
     }
@@ -107,7 +107,7 @@ public class SegmentNodeState extends Re
 
     MapRecord getChildNodeMap() {
         Segment segment = getSegment();
-        return reader.readMap(segment.readRecordId(getOffset(0, 2)));
+        return reader.readMap(segment.readRecordId(getRecordNumber(), 0, 2));
     }
 
     /**
@@ -122,7 +122,7 @@ public class SegmentNodeState extends Re
         ByteBuffer buffer = ByteBuffer.wrap(getStableIdBytes());
         long msb = buffer.getLong();
         long lsb = buffer.getLong();
-        int offset = unpack(buffer.getShort());
+        int offset = buffer.getInt();
         return new UUID(msb, lsb) + ":" + offset;
     }
 
@@ -136,7 +136,7 @@ public class SegmentNodeState extends Re
      */
     byte[] getStableIdBytes() {
         // The first record id of this node points to the stable id.
-        RecordId id = getSegment().readRecordId(getOffset());
+        RecordId id = getSegment().readRecordId(getRecordNumber());
 
         if (id.equals(getRecordId())) {
             // If that id is equal to the record id of this node then the stable
@@ -146,8 +146,8 @@ public class SegmentNodeState extends Re
         } else {
             // Otherwise that id points to the serialised (msb, lsb, offset)
             // stable id.
-            byte[] buffer = new byte[18];
-            id.getSegment().readBytes(id.getOffset(), buffer, 0, buffer.length);
+            byte[] buffer = new byte[RecordId.SERIALIZED_RECORD_ID_BYTES];
+            id.getSegment().readBytes(id.getRecordNumber(), buffer, 0, buffer.length);
             return buffer;
         }
     }
@@ -214,9 +214,8 @@ public class SegmentNodeState extends Re
         if (template.getChildName() != Template.ZERO_CHILD_NODES) {
             ids++;
         }
-        RecordId rid = segment.readRecordId(getOffset(0, ids));
-        ListRecord pIds = new ListRecord(rid,
-                template.getPropertyTemplates().length);
+        RecordId rid = segment.readRecordId(getRecordNumber(), 0, ids);
+        ListRecord pIds = new ListRecord(rid, template.getPropertyTemplates().length);
         return pIds.getEntry(propertyTemplate.getIndex());
     }
 
@@ -244,9 +243,7 @@ public class SegmentNodeState extends Re
         }
 
         if (propertyTemplates.length > 0) {
-            ListRecord pIds = new ListRecord(
-                    segment.readRecordId(getOffset(0, ids)),
-                    propertyTemplates.length);
+            ListRecord pIds = new ListRecord(segment.readRecordId(getRecordNumber(), 0, ids), propertyTemplates.length);
             for (int i = 0; i < propertyTemplates.length; i++) {
                 RecordId propertyId = pIds.getEntry(i);
                 list.add(reader.readProperty(propertyId, propertyTemplates[i]));
@@ -367,12 +364,12 @@ public class SegmentNodeState extends Re
         Segment segment = getSegment();
         RecordId id = getRecordId(segment, template, propertyTemplate);
         segment = id.getSegment();
-        int size = segment.readInt(id.getOffset());
+        int size = segment.readInt(id.getRecordNumber());
         if (size == 0) {
             return emptyList();
         }
 
-        id = segment.readRecordId(id.getOffset() + 4);
+        id = segment.readRecordId(id.getRecordNumber(), 4);
         if (size == 1) {
             return singletonList(reader.readString(id));
         }
@@ -419,8 +416,7 @@ public class SegmentNodeState extends Re
             }
         } else if (childName != Template.ZERO_CHILD_NODES
                 && childName.equals(name)) {
-            Segment segment = getSegment();
-            RecordId childNodeId = segment.readRecordId(getOffset(0, 2));
+            RecordId childNodeId = getSegment().readRecordId(getRecordNumber(), 0, 2);
             return reader.readNode(childNodeId);
         }
         checkValidName(name);
@@ -447,8 +443,7 @@ public class SegmentNodeState extends Re
         } else if (childName == Template.MANY_CHILD_NODES) {
             return getChildNodeMap().getEntries();
         } else {
-            Segment segment = getSegment();
-            RecordId childNodeId = segment.readRecordId(getOffset(0, 2));
+            RecordId childNodeId = getSegment().readRecordId(getRecordNumber(), 0, 2);
             return Collections.singletonList(new MemoryChildNodeEntry(
                     childName, reader.readNode(childNodeId)));
         }