You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/02/06 18:06:19 UTC

svn commit: r1443071 - in /jackrabbit/oak/trunk/oak-core/src: main/java/org/apache/jackrabbit/oak/plugins/segment/ test/java/org/apache/jackrabbit/oak/plugins/segment/

Author: jukka
Date: Wed Feb  6 17:06:18 2013
New Revision: 1443071

URL: http://svn.apache.org/viewvc?rev=1443071&view=rev
Log:
OAK-593: Segment-based MK

Implement basic record types and a simple in-memory segment store for testing

Added:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java
      - copied, changed from r1443031, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java   (with props)
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/
    jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java   (with props)
Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
+class BlockRecord extends Record {
+
+    private final int size;
+
+    BlockRecord(SegmentReader reader, RecordId id, int size) {
+        super(reader, id);
+        this.size = size;
+    }
+
+    /**
+     * Reads bytes from this block. Up to the given number of bytes are
+     * read starting from the given position within this block. The number
+     * of bytes read is returned.
+     *
+     * @param position position within this block
+     * @param buffer target buffer
+     * @param offset offset within the target buffer
+     * @param length maximum number of bytes to read
+     * @return number of bytes that could be read
+     */
+    public int read(int position, byte[] buffer, int offset, int length) {
+        checkElementIndex(position, size);
+        checkNotNull(buffer);
+        checkPositionIndexes(offset, offset + length, buffer.length);
+
+        if (position + length > size) {
+            length = size - position;
+        }
+        if (length > 0) {
+            getReader().readBytes(
+                    getRecordId(), position, buffer, offset, length);
+        }
+        return length;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/BlockRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/CacheStore.java Wed Feb  6 17:06:18 2013
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.jackrabbit.oak.plugins.segment;
 
 import java.util.UUID;
@@ -6,43 +22,33 @@ import java.util.concurrent.ExecutionExc
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
 
 public class CacheStore implements SegmentStore {
 
     private final SegmentStore store;
 
-    private final LoadingCache<UUID, byte[]> cache;
+    private final LoadingCache<UUID, Segment> cache;
 
     public CacheStore(final SegmentStore store, long cacheSize) {
         this.store = store;
         this.cache = CacheBuilder.newBuilder()
                 .maximumWeight(cacheSize)
-                .weigher(new Weigher<UUID, byte[]>() {
+                .weigher(Segment.weigher())
+                .build(new CacheLoader<UUID, Segment>() {
                     @Override
-                    public int weigh(UUID key, byte[] value) {
-                        return value.length;
-                    }
-                }).build(new CacheLoader<UUID, byte[]>() {
-                    @Override
-                    public byte[] load(UUID key) throws Exception {
+                    public Segment load(UUID key) throws Exception {
                         return store.readSegment(key);
                     }
                 });
     }
 
     @Override
-    public RecordId getHead(String journal) {
-        return store.getHead(journal);
-    }
-
-    @Override
-    public boolean updateHead(String journal, RecordId base, RecordId head) {
-        return store.updateHead(journal, base, head);
+    public int getMaxSegmentSize() {
+        return store.getMaxSegmentSize();
     }
 
     @Override
-    public byte[] readSegment(UUID segmentId) {
+    public Segment readSegment(UUID segmentId) {
         try {
             return cache.get(segmentId);
         } catch (ExecutionException e) {
@@ -52,6 +58,12 @@ public class CacheStore implements Segme
     }
 
     @Override
+    public void createSegment(Segment segment) {
+        store.createSegment(segment);
+        cache.put(segment.getSegmentId(), segment);
+    }
+
+    @Override
     public void createSegment(
             UUID segmentId, byte[] data, int offset, int length) {
         store.createSegment(segmentId, data, offset, length);
@@ -60,6 +72,6 @@ public class CacheStore implements Segme
     @Override
     public void deleteSegment(UUID segmentId) {
         store.deleteSegment(segmentId);
+        cache.invalidate(segmentId);
     }
-
 }

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+
+class ListRecord extends Record {
+
+    static final int LEVEL_SIZE = 1 << 8; // 256
+
+    private final int size;
+
+    private final int bucketSize;
+
+    ListRecord(SegmentReader reader, RecordId id, int size) {
+        super(reader, id);
+        checkArgument(size >= 0);
+        this.size = size;
+
+        int bs = 1;
+        while (bs * LEVEL_SIZE < size) {
+            bs *= LEVEL_SIZE;
+        }
+        this.bucketSize = bs;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public RecordId getEntry(int index) {
+        checkElementIndex(index, size);
+
+        if (size == 1) {
+            return getRecordId();
+        } else {
+            int bucketIndex = index / bucketSize;
+            int bucketOffset = index % bucketSize;
+            RecordId bucketId = readRecordId(bucketIndex * 4);
+            ListRecord bucket =
+                new ListRecord(getReader(), bucketId, bucketSize);
+            return bucket.getEntry(bucketOffset);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/ListRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+public class MemoryStore implements SegmentStore {
+
+    private static final int MAX_SEGMENT_SIZE = 1 << 20; // 1MB
+
+    private final Map<UUID, Segment> segments =
+        Collections.synchronizedMap(new HashMap<UUID, Segment>());
+
+    @Override
+    public int getMaxSegmentSize() {
+        return MAX_SEGMENT_SIZE;
+    }
+
+    @Override
+    public Segment readSegment(UUID segmentId) {
+        Segment segment = segments.get(segmentId);
+        if (segment != null) {
+            return segment;
+        } else {
+            throw new IllegalStateException("Segment not found: " + segmentId);
+        }
+    }
+
+    @Override
+    public void createSegment(Segment segment) {
+        if (segments.put(segment.getSegmentId(), segment) != null) {
+            throw new IllegalStateException(
+                    "Segment override: " + segment.getSegmentId());
+        }
+    }
+
+    @Override
+    public void createSegment(
+            UUID segmentId, byte[] data, int offset, int length) {
+        byte[] segment = new byte[length];
+        System.arraycopy(data, offset, segment, 0, length);
+        createSegment(new Segment(segmentId, segment, new UUID[0]));
+    }
+
+    @Override
+    public void deleteSegment(UUID segmentId) {
+        if (segments.remove(segmentId) == null) {
+            throw new IllegalStateException("Missing segment: " + segmentId);
+        }
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/MemoryStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java (from r1443031, jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java)
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java?p2=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java&p1=jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java&r1=1443031&r2=1443071&rev=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Record.java Wed Feb  6 17:06:18 2013
@@ -1,33 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.jackrabbit.oak.plugins.segment;
-
-import java.util.UUID;
-
-public interface SegmentStore {
-
-    RecordId getHead(String journal);
-
-    boolean updateHead(String journal, RecordId base, RecordId head);
-
-    byte[] readSegment(UUID segmentId);
-
-    void createSegment(UUID segmentId, byte[] data, int offset, int length);
-
-    void deleteSegment(UUID segmentId);
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+class Record {
+
+    private final SegmentReader reader;
+
+    private final RecordId id;
+
+    protected Record(SegmentReader reader, RecordId id) {
+        this.reader = reader;
+        this.id = id;
+    }
+
+    protected SegmentReader getReader() {
+        return reader;
+    }
+
+    protected RecordId readRecordId(int position) {
+        return reader.readRecordId(id, position);
+    }
+
+    public RecordId getRecordId() {
+        return id;
+    }
+
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/RecordId.java Wed Feb  6 17:06:18 2013
@@ -22,6 +22,8 @@ import com.google.common.base.Preconditi
 
 public final class RecordId {
 
+    public static RecordId[] EMPTY_ARRAY = new RecordId[0];
+
     private final UUID segmentId;
 
     private final int offset;

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.google.common.cache.Weigher;
+
+class Segment {
+
+    static final int SMALL_LIMIT = 1 << 7;
+
+    static final int MEDIUM_LIMIT = 1 << 14 + SMALL_LIMIT;
+
+    private final UUID uuid;
+
+    private final byte[] data;
+
+    private final UUID[] uuids;
+
+    Segment(UUID uuid, byte[] data, UUID[] uuids) {
+        this.uuid = uuid;
+        this.data = data;
+        this.uuids = uuids;
+    }
+
+    public UUID getSegmentId() {
+        return uuid;
+    }
+
+    public int size() {
+        return data.length;
+    }
+
+    /**
+     * Reads the given number of bytes starting from the given position
+     * in this segment.
+     *
+     * @param position position within segment
+     * @param buffer target buffer
+     * @param offset offset within target buffer
+     * @param length number of bytes to read
+     */
+    public void readBytes(int position, byte[] buffer, int offset, int length) {
+        checkPositionIndexes(position, position + length, data.length);
+        checkNotNull(buffer);
+        checkPositionIndexes(offset, offset + length, buffer.length);
+
+        System.arraycopy(data, position, buffer, offset, length);
+    }
+
+    RecordId readRecordId(int offset) {
+        return new RecordId(
+                uuids[data[offset] & 0xff],
+                (data[offset + 1] & 0xff) << 16
+                | (data[offset + 2] & 0xff) << 8
+                | (data[offset + 3] & 0xff));
+    }
+
+    public long readLength(int offset) {
+        checkPositionIndexes(offset, offset + 8, data.length);
+        return ByteBuffer.wrap(data).getLong(offset);
+    }
+
+    public static Weigher<UUID, Segment> weigher() {
+        return new Weigher<UUID, Segment>() {
+            @Override
+            public int weigh(UUID key, Segment value) {
+                return value.size();
+            }
+        };
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/Segment.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentReader.java Wed Feb  6 17:06:18 2013
@@ -16,14 +16,89 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
-import java.io.InputStream;
-
-public interface SegmentReader {
-
-    String readString(RecordId recordId);
-
-    InputStream readStream(RecordId recordId);
-
-    void readBytes(RecordId recordId, byte[] bytes, int offset, int length);
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
+
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+
+public class SegmentReader {
+
+    private final SegmentStore store;
+
+    public SegmentReader(SegmentStore store) {
+        this.store = store;
+    }
+
+    public String readString(RecordId recordId) {
+        SegmentStream stream = readStream(recordId);
+        try {
+            if (stream.getLength() > Integer.MAX_VALUE) {
+                throw new IllegalStateException(
+                        "Too long value: " + stream.getLength());
+            }
+            byte[] data = new byte[(int) stream.getLength()];
+            ByteStreams.readFully(stream, data);
+            return new String(data, Charsets.UTF_8);
+        } catch (IOException e) {
+            throw new IllegalStateException("Unexpected IOException", e);
+        } finally {
+            stream.close();
+        }
+    }
+
+    public SegmentStream readStream(RecordId recordId) {
+        Segment segment = store.readSegment(recordId.getSegmentId());
+        int offset = recordId.getOffset();
+        long length = segment.readLength(offset);
+        int size = (int) ((length + BLOCK_SIZE - 1) / BLOCK_SIZE);
+        ListRecord list = new ListRecord(
+                this, segment.readRecordId(offset + 8), size);
+        return new SegmentStream(this, list, length);
+    }
+
+    public void readBytes(
+            RecordId recordId, int position,
+            byte[] buffer, int offset, int length) {
+        checkNotNull(recordId);
+        checkArgument(position >= 0);
+        checkNotNull(buffer);
+        checkPositionIndexes(offset, offset + length, buffer.length);
+
+        Segment segment = store.readSegment(recordId.getSegmentId());
+        segment.readBytes(
+                recordId.getOffset() + position, buffer, offset, length);
+    }
+
+    public RecordId readRecordId(RecordId recordId, int position) {
+        checkNotNull(recordId);
+        checkArgument(position >= 0);
+
+        Segment segment = store.readSegment(recordId.getSegmentId());
+        return segment.readRecordId(recordId.getOffset() + position);
+    }
+
+    public ListRecord readList(RecordId recordId, int numberOfEntries) {
+        checkNotNull(recordId);
+        checkArgument(numberOfEntries >= 0);
+
+        if (numberOfEntries > 0) {
+            Segment segment = store.readSegment(recordId.getSegmentId());
+            RecordId id = segment.readRecordId(recordId.getOffset());
+            return new ListRecord(this, id, numberOfEntries);
+        } else {
+            return new ListRecord(this, recordId, numberOfEntries);
+        }
+    }
+
+    public BlockRecord readBlock(RecordId recordId, int size) {
+        checkNotNull(recordId);
+        checkArgument(size > 0);
+        return new BlockRecord(this, recordId, size);
+    }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java Wed Feb  6 17:06:18 2013
@@ -20,13 +20,13 @@ import java.util.UUID;
 
 public interface SegmentStore {
 
-    RecordId getHead(String journal);
+    int getMaxSegmentSize();
 
-    boolean updateHead(String journal, RecordId base, RecordId head);
+    Segment readSegment(UUID segmentId);
 
-    byte[] readSegment(UUID segmentId);
+    void createSegment(Segment segment);
 
-    void createSegment(UUID segmentId, byte[] data, int offset, int length);
+    void createSegment(UUID segmentId, byte[] bytes, int offset, int len);
 
     void deleteSegment(UUID segmentId);
 

Added: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+import static org.apache.jackrabbit.oak.plugins.segment.SegmentWriter.BLOCK_SIZE;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class SegmentStream extends InputStream {
+
+    private final SegmentReader reader;
+
+    private final ListRecord blocks;
+
+    private final long length;
+
+    private long position = 0;
+
+    private long mark = 0;
+
+    SegmentStream(SegmentReader reader, ListRecord blocks, long length) {
+        this.reader = reader;
+        this.blocks = blocks;
+        this.length = length;
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        mark = position;
+    }
+
+    @Override
+    public synchronized void reset() {
+        position = mark;
+    }
+
+    @Override
+    public int read() throws IOException {
+        byte[] b = new byte[1];
+        if (read(b) != -1) {
+            return b[0] & 0xff;
+        } else {
+            return -1;
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        checkNotNull(b);
+        checkPositionIndexes(off, off + len, b.length);
+        if (len == 0) {
+            return 0;
+        } else if (position == length) {
+            return -1;
+        } else {
+            int blockIndex = (int) (position / SegmentWriter.BLOCK_SIZE);
+            int blockOffset = (int) (position % SegmentWriter.BLOCK_SIZE);
+
+            if (blockOffset + len > SegmentWriter.BLOCK_SIZE) {
+                len = SegmentWriter.BLOCK_SIZE - blockOffset;
+            }
+            if (position + len > length) {
+                len = (int) (length - position);
+            }
+
+            BlockRecord block =
+                reader.readBlock(blocks.getEntry(blockIndex), BLOCK_SIZE);
+            len = block.read(blockOffset, b, off, len);
+            position += len;
+            return len;
+        }
+    }
+
+    @Override
+    public long skip(long n) {
+        if (position + n > length) {
+            n = length - position;
+        } else if (position + n < 0) {
+            n = -position;
+        }
+        position += n;
+        return n;
+    }
+
+    @Override
+    public void close() {
+        position = length;
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java?rev=1443071&r1=1443070&r2=1443071&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentWriter.java Wed Feb  6 17:06:18 2013
@@ -16,15 +16,203 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkPositionIndexes;
+
 import java.io.IOException;
 import java.io.InputStream;
-
-public interface SegmentWriter {
-
-    RecordId writeString(String string);
-
-    RecordId writeStream(InputStream stream) throws IOException;
-
-    RecordId writeBytes(byte[] bytes, int offset, int length);
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+public class SegmentWriter {
+
+    private static final int INITIAL_BUFFER_SIZE = 1 << 12; // 4kB
+
+    static final int BLOCK_SIZE = 1 << 12; // 4kB
+
+    static final int INLINE_LIMIT = 16 * BLOCK_SIZE; // 64kB
+
+    private final SegmentStore store;
+
+    private final int blocksPerSegment;
+
+    private final int blockSegmentSize;
+
+    private UUID uuid = UUID.randomUUID();
+
+    private List<UUID> uuids = new ArrayList<UUID>(255);
+
+    private ByteBuffer buffer = ByteBuffer.allocate(INITIAL_BUFFER_SIZE);
+
+    public SegmentWriter(SegmentStore store) {
+        this.store = store;
+        this.blocksPerSegment = store.getMaxSegmentSize() / BLOCK_SIZE;
+        this.blockSegmentSize = blocksPerSegment * BLOCK_SIZE;
+    }
+
+    public void flush() {
+        if (buffer.position() > 0) {
+            byte[] data = new byte[buffer.position()];
+            buffer.flip();
+            buffer.get(data);
+
+            store.createSegment(new Segment(
+                    uuid, data, uuids.toArray(new UUID[0])));
+
+            uuid = UUID.randomUUID();
+            uuids.clear();
+            buffer.clear();
+        }
+    }
+
+    private RecordId prepare(int size) {
+        return prepare(size, Collections.<RecordId>emptyList());
+    }
+
+    private synchronized RecordId prepare(int size, Collection<RecordId> ids) {
+        Set<UUID> segmentIds = new HashSet<UUID>();
+        for (RecordId id : ids) {
+            UUID segmentId = id.getSegmentId();
+            if (!uuid.equals(segmentId) && !uuids.contains(segmentId)) {
+                segmentIds.add(segmentId);
+            }
+        }
+
+        int fullSize = size + 4 * ids.size();
+        if (buffer.position() + fullSize > store.getMaxSegmentSize()) {
+            flush();
+        }
+        if (fullSize > buffer.remaining()) {
+            int n = Math.min(buffer.capacity() * 2, store.getMaxSegmentSize());
+            ByteBuffer newBuffer = ByteBuffer.allocate(n);
+            buffer.flip();
+            newBuffer.put(buffer);
+            buffer = newBuffer;
+        }
+        return new RecordId(uuid, buffer.position());
+    }
+
+    /**
+     * Writes a block record containing the given block of bytes.
+     *
+     * @param bytes source buffer
+     * @param offset offset within the source buffer
+     * @param length number of bytes to write
+     * @return block record identifier
+     */
+    public synchronized RecordId writeBlock(
+            byte[] bytes, int offset, int length) {
+        checkNotNull(bytes);
+        checkPositionIndexes(offset, offset + length, bytes.length);
+
+        RecordId blockId = prepare(length);
+        buffer.put(bytes, offset, length);
+        return blockId;
+    }
+
+    /**
+     * Writes a list record containing the given list of record identifiers.
+     *
+     * @param ids list of record identifiers
+     * @return list record identifier
+     */
+    public synchronized RecordId writeList(List<RecordId> ids) {
+        checkNotNull(ids);
+
+        int size = ids.size();
+        if (size == 0) {
+            return prepare(0);
+        } else if (size == 1) {
+            return ids.iterator().next();
+        } else {
+            List<RecordId> thisLevel = ids;
+            do {
+                List<RecordId> nextLevel = new ArrayList<RecordId>();
+                for (List<RecordId> bucket :
+                        Lists.partition(thisLevel, ListRecord.LEVEL_SIZE)) {
+                    RecordId bucketId = prepare(0, bucket);
+                    for (RecordId id : bucket) {
+                        writeRecordId(id);
+                    }
+                    nextLevel.add(bucketId);
+                }
+                thisLevel = nextLevel;
+            } while (thisLevel.size() > 1);
+            return thisLevel.iterator().next();
+        }
+    }
+
+    /**
+     * Writes a value record containing the given sequence of bytes.
+     *
+     * @param bytes source buffer
+     * @param offset offset within the source buffer
+     * @param length number of bytes in the value
+     * @return value record identifier
+     */
+    public synchronized RecordId writeValue(
+            byte[] bytes, int offset, int length) {
+        List<RecordId> blockIds = new ArrayList<RecordId>();
+        int remaining = length;
+
+        // First create block segments for a large bulk of the value
+        while (remaining > INLINE_LIMIT) {
+            int len = Math.min(remaining, blockSegmentSize);
+            UUID segmentId = UUID.randomUUID();
+            store.createSegment(segmentId, bytes, offset, len);
+            for (int position = 0; position < len; position += BLOCK_SIZE) {
+                blockIds.add(new RecordId(segmentId, position));
+            }
+            offset += len;
+            remaining -= len;
+        }
+
+        // Then inline any remaining full blocks
+        while (remaining > BLOCK_SIZE) {
+            blockIds.add(writeBlock(bytes, offset, BLOCK_SIZE));
+            offset += BLOCK_SIZE;
+            remaining -= BLOCK_SIZE;
+        }
+
+        // Finally add the last partial block (if any)
+        if (remaining > 0) {
+            blockIds.add(writeBlock(bytes, offset, remaining));
+        }
+
+        // Store the list of blocks along with the length of the value
+        RecordId listId = writeList(blockIds);
+        RecordId valueId = prepare(8, Collections.singleton(listId));
+        buffer.putLong(length);
+        writeRecordId(listId);
+        return valueId;
+    }
+
+    public RecordId writeString(String string) {
+        byte[] data = string.getBytes(Charsets.UTF_8);
+        return writeValue(data, 0, data.length);
+    }
+
+    public RecordId writeStream(InputStream stream) throws IOException {
+        throw new UnsupportedOperationException(); // TODO
+    }
+
+    private void writeRecordId(RecordId id) {
+        UUID segmentId = id.getSegmentId();
+        int index = uuids.indexOf(segmentId);
+        if (index == -1) {
+            index = uuids.size();
+            uuids.add(segmentId);
+        }
+        buffer.putInt(index << 24 | id.getOffset());
+    }
 
 }

Added: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java?rev=1443071&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java (added)
+++ jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java Wed Feb  6 17:06:18 2013
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.segment;
+
+import static org.apache.jackrabbit.oak.plugins.segment.ListRecord.LEVEL_SIZE;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+
+public class RecordTest {
+
+    private String hello = "Hello, World!";
+
+    private byte[] bytes = hello.getBytes(Charsets.UTF_8);
+
+    private SegmentStore store = new MemoryStore();
+
+    private SegmentWriter writer = new SegmentWriter(store);
+
+    private SegmentReader reader = new SegmentReader(store);
+
+    private final Random random = new Random(0xcafefaceL);
+
+    @Test
+    public void testBlockRecord() {
+        RecordId blockId = writer.writeBlock(bytes, 0, bytes.length);
+        writer.flush();
+        BlockRecord block = new BlockRecord(reader, blockId, bytes.length);
+
+        // Check reading with all valid positions and lengths
+        for (int n = 1; n < bytes.length; n++) {
+            for (int i = 0; i + n <= bytes.length; i++) {
+                Arrays.fill(bytes, i, i + n, (byte) '.');
+                assertEquals(n, block.read(i, bytes, i, n));
+                assertEquals(hello, new String(bytes, Charsets.UTF_8));
+            }
+        }
+
+        // Check reading with a too long length
+        byte[] large = new byte[bytes.length * 2];
+        assertEquals(bytes.length, block.read(0, large, 0, large.length));
+        assertEquals(hello, new String(large, 0, bytes.length, Charsets.UTF_8));
+    }
+
+    @Test
+    public void testListRecord() {
+        RecordId blockId = writer.writeBlock(bytes, 0, bytes.length);
+
+        ListRecord zero = writeList(0, blockId);
+        ListRecord one = writeList(1, blockId);
+        ListRecord level1 = writeList(LEVEL_SIZE, blockId);
+        ListRecord level1p = writeList(LEVEL_SIZE + 1, blockId);
+        ListRecord level2 = writeList(LEVEL_SIZE * LEVEL_SIZE, blockId);
+        ListRecord level2p = writeList(LEVEL_SIZE * LEVEL_SIZE + 1, blockId);
+        writer.flush();
+
+        assertEquals(0, zero.size());
+        assertEquals(1, one.size());
+        assertEquals(blockId, one.getEntry(0));
+        assertEquals(LEVEL_SIZE, level1.size());
+        assertEquals(blockId, level1.getEntry(0));
+        assertEquals(blockId, level1.getEntry(LEVEL_SIZE - 1));
+        assertEquals(LEVEL_SIZE + 1, level1p.size());
+        assertEquals(blockId, level1p.getEntry(0));
+        assertEquals(blockId, level1p.getEntry(LEVEL_SIZE));
+        assertEquals(LEVEL_SIZE * LEVEL_SIZE, level2.size());
+        assertEquals(blockId, level2.getEntry(0));
+        assertEquals(blockId, level2.getEntry(LEVEL_SIZE * LEVEL_SIZE - 1));
+        assertEquals(LEVEL_SIZE * LEVEL_SIZE + 1, level2p.size());
+        assertEquals(blockId, level2p.getEntry(0));
+        assertEquals(blockId, level2p.getEntry(LEVEL_SIZE * LEVEL_SIZE));
+    }
+
+    private ListRecord writeList(int size, RecordId id) {
+        List<RecordId> list = Collections.nCopies(size, id);
+        return new ListRecord(reader, writer.writeList(list), size);
+    }
+
+    @Test
+    public void testStreamRecord() throws IOException {
+        checkRandomStreamRecord(0);
+        checkRandomStreamRecord(1);
+        checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE);
+        checkRandomStreamRecord(SegmentWriter.BLOCK_SIZE + 1);
+        checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT);
+        checkRandomStreamRecord(SegmentWriter.INLINE_LIMIT + 1);
+        checkRandomStreamRecord(store.getMaxSegmentSize());
+        checkRandomStreamRecord(store.getMaxSegmentSize() + 1);
+        checkRandomStreamRecord(store.getMaxSegmentSize() * 2);
+        checkRandomStreamRecord(store.getMaxSegmentSize() * 2 + 1);
+    }
+
+    private void checkRandomStreamRecord(int size) throws IOException {
+        byte[] source = new byte[size];
+        random.nextBytes(source);
+
+        RecordId valueId = writer.writeValue(source, 0, size);
+        writer.flush();
+
+        InputStream stream = reader.readStream(valueId);
+        try {
+            byte[] b = new byte[349]; // prime number
+            int offset = 0;
+            for (int n = stream.read(b); n != -1; n = stream.read(b)) {
+                for (int i = 0; i < n; i++) {
+                    assertEquals(source[offset + i], b[i]);
+                }
+                offset += n;
+            }
+            assertEquals(offset, size);
+            assertEquals(-1, stream.read());
+        } finally {
+            stream.close();
+        }
+    }
+
+    @Test
+    public void testStringRecord() {
+        RecordId empty = writer.writeString("");
+        RecordId space = writer.writeString(" ");
+        RecordId hello = writer.writeString("Hello, World!");
+
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < 1000; i++) {
+            builder.append((char) ('0' + i % 10));
+        }
+        RecordId large = writer.writeString(builder.toString());
+
+        writer.flush();
+
+        assertEquals("", reader.readString(empty));
+        assertEquals(" ", reader.readString(space));
+        assertEquals("Hello, World!", reader.readString(hello));
+        assertEquals(builder.toString(), reader.readString(large));
+    }
+
+}

Propchange: jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/segment/RecordTest.java
------------------------------------------------------------------------------
    svn:eol-style = native