You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/11/06 21:08:44 UTC

svn commit: r1539450 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: Segment.java SegmentStore.java SegmentWriter.java file/FileStore.java memory/MemoryStore.java mongo/MongoStore.java

Author: jukka
Date: Wed Nov  6 20:08:43 2013
New Revision: 1539450

URL: http://svn.apache.org/r1539450
Log:
OAK-1036: SegmentMK: Auto-flushing SegmentNodeBuilder

Inline the referenced segment ids to a new segment already in SegmentWriter

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Nov  6 20:08:43 2013
@@ -24,7 +24,6 @@ import static org.apache.jackrabbit.oak.
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
@@ -103,25 +102,6 @@ public class Segment {
         this.data = checkNotNull(data);
     }
 
-    public Segment(
-            SegmentStore store, UUID uuid, Collection<UUID> refids,
-            byte[] buffer, int offset, int length) {
-        this.store = checkNotNull(store);
-        this.uuid = checkNotNull(uuid);
-
-        checkNotNull(refids);
-        checkNotNull(buffer);
-        checkPositionIndexes(offset, offset + length, buffer.length);
-
-        this.data = ByteBuffer.allocate(refids.size() * 16 + length);
-        for (UUID refid : refids) {
-            data.putLong(refid.getMostSignificantBits());
-            data.putLong(refid.getLeastSignificantBits());
-        }
-        data.put(buffer, offset, length);
-        data.rewind();
-    }
-
     /**
      * Maps the given record offset to the respective position within the
      * internal {@link #data} array. The validity of a record with the given

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java Wed Nov  6 20:08:43 2013
@@ -16,7 +16,6 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
-import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
@@ -32,14 +31,11 @@ public interface SegmentStore {
      * Writes the given segment to the segment store.
      *
      * @param segmentId segment identifier
-     * @param referencedSegmentIds identifiers of all the referenced segments
      * @param bytes byte buffer that contains the raw contents of the segment
      * @param offset start offset within the byte buffer
      * @param length length of the segment
      */
-    void writeSegment(
-            UUID segmentId, Collection<UUID> referencedSegmentIds,
-            byte[] bytes, int offset, int length);
+    void writeSegment(UUID segmentId, byte[] bytes, int offset, int length);
 
     void deleteSegment(UUID segmentId);
 

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Nov  6 20:08:43 2013
@@ -23,6 +23,7 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkPositionIndex;
 import static com.google.common.base.Preconditions.checkPositionIndexes;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayListWithExpectedSize;
 import static com.google.common.collect.Maps.newHashMap;
 import static java.util.Collections.emptyMap;
 import static org.apache.jackrabbit.oak.api.Type.NAME;
@@ -33,7 +34,7 @@ import static org.apache.jackrabbit.oak.
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -115,17 +116,21 @@ public class SegmentWriter {
 
     public SegmentWriter(SegmentStore store) {
         this.store = store;
-        this.dummySegment = new Segment(
-                store, UUID.randomUUID(), Collections.<UUID>emptyList(),
-                new byte[0], 0, 0);
+        this.dummySegment =
+                new Segment(store, UUID.randomUUID(), ByteBuffer.allocate(0));
     }
 
     public synchronized Segment getCurrentSegment(UUID id) {
         if (equal(id, uuid)) {
             if (currentSegment == null) {
-                currentSegment = new Segment(
-                        store, uuid, uuids.keySet(),
-                        buffer, buffer.length - length, length);
+                ByteBuffer b = ByteBuffer.allocate(16 * uuids.size() + length);
+                for (UUID refid : uuids.keySet()) {
+                    b.putLong(refid.getMostSignificantBits());
+                    b.putLong(refid.getLeastSignificantBits());
+                }
+                b.put(buffer, buffer.length - length, length);
+                b.rewind();
+                currentSegment = new Segment(store, uuid, b);
             }
             return currentSegment;
         } else {
@@ -139,9 +144,16 @@ public class SegmentWriter {
 
     public synchronized void flush() {
         if (length > 0) {
-            store.writeSegment(
-                    uuid, uuids.keySet(),
-                    buffer, buffer.length - length, length);
+            length += 16 * uuids.size();
+
+            ByteBuffer b = ByteBuffer.wrap(
+                    buffer, buffer.length - length, 16 * uuids.size());
+            for (UUID refid : uuids.keySet()) {
+                b.putLong(refid.getMostSignificantBits());
+                b.putLong(refid.getLeastSignificantBits());
+            }
+
+            store.writeSegment(uuid, buffer, buffer.length - length, length);
 
             uuid = UUID.randomUUID();
             uuids.clear();
@@ -167,13 +179,11 @@ public class SegmentWriter {
         }
 
         int fullSize = size + ids.size() * Segment.RECORD_ID_BYTES;
-        checkArgument(fullSize > 0);
-
         int alignment = Segment.RECORD_ALIGN_BYTES - 1;
         int alignedSize = (fullSize + alignment) & ~alignment;
-        int segmentReferenceCount = uuids.size() + segmentIds.size();
-        if (length + alignedSize > buffer.length
-                || segmentReferenceCount > Segment.SEGMENT_REFERENCE_LIMIT) {
+        int refs = uuids.size() + segmentIds.size();
+        if (refs * 16 + length + alignedSize > buffer.length - 1
+                || refs > Segment.SEGMENT_REFERENCE_LIMIT) {
             flush();
         }
 
@@ -516,60 +526,56 @@ public class SegmentWriter {
 
     private RecordId internalWriteStream(InputStream stream)
             throws IOException {
-        // First read the head of the stream. This covers most small
-        // values and the frequently accessed head of larger ones.
-        // The head gets inlined in the current segment.
-        byte[] head = new byte[Segment.MEDIUM_LIMIT];
-        int headLength = ByteStreams.read(stream, head, 0, head.length);
+        byte[] data = new byte[Segment.MAX_SEGMENT_SIZE];
+        int n = ByteStreams.read(stream, data, 0, data.length);
 
-        if (headLength < Segment.SMALL_LIMIT) {
+        // Special case for short binaries (up to about 16kB):
+        // store them directly as small- or medium-sized value records
+        if (n < Segment.MEDIUM_LIMIT) {
             synchronized (this) {
-                RecordId id = prepare(1 + headLength);
-                buffer[position++] = (byte) headLength;
-                System.arraycopy(head, 0, buffer, position, headLength);
-                position += headLength;
-                return id;
-            }
-        } else if (headLength < Segment.MEDIUM_LIMIT) {
-            synchronized (this) {
-                RecordId id = prepare(2 + headLength);
-                int len = (headLength - Segment.SMALL_LIMIT) | 0x8000;
-                buffer[position++] = (byte) (len >> 8);
-                buffer[position++] = (byte) len;
-                System.arraycopy(head, 0, buffer, position, headLength);
-                position += headLength;
+                RecordId id;
+                if (n < Segment.SMALL_LIMIT) {
+                    id = prepare(1 + n);
+                    buffer[position++] = (byte) n;
+                } else {
+                    id = prepare(2 + n);
+                    int len = (n - Segment.SMALL_LIMIT) | 0x8000;
+                    buffer[position++] = (byte) (len >> 8);
+                    buffer[position++] = (byte) len;
+                }
+                System.arraycopy(data, 0, buffer, position, n);
+                position += n;
                 return id;
             }
-        } else {
-            // If the stream filled the full head buffer, it's likely
-            // that the bulk of the data is still to come. Read it
-            // in larger chunks and save in separate segments.
-
-            long length = 0;
-            List<RecordId> blockIds = new ArrayList<RecordId>();
-
-            byte[] bulk = new byte[Segment.MAX_SEGMENT_SIZE];
-            System.arraycopy(head, 0, bulk, 0, headLength);
-            int bulkLength = headLength + ByteStreams.read(
-                    stream, bulk, headLength, bulk.length - headLength);
-            while (bulkLength > 0) {
-                UUID segmentId = UUID.randomUUID();
-                int align = Segment.RECORD_ALIGN_BYTES - 1;
-                int bulkAlignLength = (bulkLength + align) & ~align;
-                store.writeSegment(
-                        segmentId, Collections.<UUID>emptyList(),
-                        bulk, 0, bulkAlignLength);
-                for (int pos = Segment.MAX_SEGMENT_SIZE - bulkAlignLength;
-                        pos < Segment.MAX_SEGMENT_SIZE;
-                        pos += BLOCK_SIZE) {
-                    blockIds.add(new RecordId(segmentId, pos));
-                }
-                length += bulkLength;
-                bulkLength = ByteStreams.read(stream, bulk, 0, bulk.length);
+        }
+
+        long length = n;
+        List<RecordId> blockIds = newArrayListWithExpectedSize(n / 4096);
+
+        // Write full bulk segments
+        while (n == buffer.length) {
+            UUID id = UUID.randomUUID();
+            store.writeSegment(id, data, 0, data.length);
+
+            for (int i = 0; i < data.length; i += BLOCK_SIZE) {
+                blockIds.add(new RecordId(id, i));
             }
 
-            return writeValueRecord(length, writeList(blockIds));
+            n = ByteStreams.read(stream, data, 0, data.length);
+            length += n;
+        }
+
+
+        // Inline the remaining blocks in the current segments
+        for (int p = 0; p < n; p += BLOCK_SIZE) {
+            int size = Math.min(n - p, BLOCK_SIZE);
+            synchronized (this) {
+                blockIds.add(prepare(size));
+                System.arraycopy(data, p, buffer, position, size);
+            }
         }
+
+        return writeValueRecord(length, writeList(blockIds));
     }
 
     private RecordId writeProperty(

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Nov  6 20:08:43 2013
@@ -26,7 +26,6 @@ import static org.apache.jackrabbit.oak.
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collection;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.UUID;
@@ -139,24 +138,7 @@ public class FileStore extends AbstractS
 
     @Override
     public synchronized void writeSegment(
-            UUID segmentId, Collection<UUID> referencedSegmentIds,
-            byte[] data, int offset, int length) {
-        if (!referencedSegmentIds.isEmpty()) {
-            int size = 16 * referencedSegmentIds.size() + length;
-            ByteBuffer buffer = ByteBuffer.allocate(size);
-
-            for (UUID referencedSegmentId : referencedSegmentIds) {
-                buffer.putLong(referencedSegmentId.getMostSignificantBits());
-                buffer.putLong(referencedSegmentId.getLeastSignificantBits());
-            }
-
-            buffer.put(data, offset, length);
-
-            data = buffer.array();
-            offset = 0;
-            length = size;
-        }
-
+            UUID segmentId, byte[] data, int offset, int length) {
         try {
             writeEntry(segmentId, data, offset, length);
         } catch (IOException e) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java Wed Nov  6 20:08:43 2013
@@ -18,7 +18,7 @@ package org.apache.jackrabbit.oak.plugin
 
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
-import java.util.Collection;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -75,10 +75,11 @@ public class MemoryStore extends Abstrac
 
     @Override
     public void writeSegment(
-            UUID segmentId, Collection<UUID> referencedSegmentIds,
-            byte[] data, int offset, int length) {
-        Segment segment = new Segment(
-                this, segmentId, referencedSegmentIds, data, offset, length);
+            UUID segmentId, byte[] data, int offset, int length) {
+        ByteBuffer buffer = ByteBuffer.allocate(length);
+        buffer.put(data, offset, length);
+        buffer.rewind();
+        Segment segment = new Segment(this, segmentId, buffer);
         if (segments.putIfAbsent(segment.getSegmentId(), segment) != null) {
             throw new IllegalStateException(
                     "Segment override: " + segment.getSegmentId());

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java?rev=1539450&r1=1539449&r2=1539450&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java Wed Nov  6 20:08:43 2013
@@ -22,8 +22,7 @@ import static com.mongodb.ReadPreference
 import static com.mongodb.ReadPreference.primary;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
-import java.util.Collection;
-import java.util.List;
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.UUID;
 
@@ -32,7 +31,6 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.Segment;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.mongodb.BasicDBObject;
 import com.mongodb.DB;
@@ -83,17 +81,20 @@ public class MongoStore extends Abstract
 
     @Override
     public void writeSegment(
-            UUID segmentId, Collection<UUID> referencedSegmentIds,
-            byte[] data, int offset, int length) {
+            UUID segmentId, byte[] data, int offset, int length) {
         byte[] d = new byte[length];
         System.arraycopy(data, offset, d, 0, length);
-        insertSegment(segmentId, d, referencedSegmentIds);
+
+        BasicDBObject segment = new BasicDBObject();
+        segment.put("_id", segmentId.toString());
+        segment.put("data", data);
+        segments.insert(segment, concern);
     }
 
     @Override
     protected Segment loadSegment(UUID segmentId) {
         DBObject id = new BasicDBObject("_id", segmentId.toString());
-        DBObject fields = new BasicDBObject(of("data", 1, "uuids", 1));
+        DBObject fields = new BasicDBObject(of("data", 1));
 
         DBObject segment = segments.findOne(id, fields, nearest());
         if (segment == null) {
@@ -105,26 +106,7 @@ public class MongoStore extends Abstract
         }
 
         byte[] data = (byte[]) segment.get("data");
-        List<?> list = (List<?>) segment.get("uuids");
-        List<UUID> uuids = Lists.newArrayListWithCapacity(list.size());
-        for (Object object : list) {
-            uuids.add(UUID.fromString(object.toString()));
-        }
-        return new Segment(this, segmentId, uuids, data, 0, data.length);
-    }
-
-    private void insertSegment(
-            UUID segmentId, byte[] data, Collection<UUID> uuids) {
-        List<String> list = Lists.newArrayListWithCapacity(uuids.size());
-        for (UUID uuid : uuids) {
-            list.add(uuid.toString());
-        }
-
-        BasicDBObject segment = new BasicDBObject();
-        segment.put("_id", segmentId.toString());
-        segment.put("data", data);
-        segment.put("uuids", list);
-        segments.insert(segment, concern);
+        return new Segment(this, segmentId, ByteBuffer.wrap(data));
     }
 
     @Override