You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/05/02 08:04:08 UTC

svn commit: r1478277 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment: SegmentNodeStoreService.java SegmentStore.java file/FileStore.java memory/MemoryStore.java mongo/MongoStore.java

Author: jukka
Date: Thu May  2 06:04:08 2013
New Revision: 1478277

URL: http://svn.apache.org/r1478277
Log:
OAK-788: File backend for the SegmentMK

Add support for more than one tar file.
Use proper tar formatting.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1478277&r1=1478276&r2=1478277&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Thu May  2 06:04:08 2013
@@ -16,8 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment;
 
-import java.io.File;
-import java.net.UnknownHostException;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Dictionary;
 import java.util.Map;
@@ -69,6 +68,10 @@ public class SegmentNodeStoreService ext
     public SegmentNodeStoreService(final SegmentStore[] store) {
         super(new SegmentStore() {
             @Override
+            public void close() {
+                store[0].close();
+            }
+            @Override
             public Journal getJournal(final String name) {
                 return new Journal() {
                     @Override
@@ -117,16 +120,15 @@ public class SegmentNodeStoreService ext
     }
 
     @Activate
-    public void activate(ComponentContext context) throws UnknownHostException {
+    public void activate(ComponentContext context) throws IOException {
         Dictionary<?, ?> properties = context.getProperties();
         name = "" + properties.get(NAME);
 
         if (properties.get(DIRECTORY) != null) {
-            File directory = new File(properties.get(DIRECTORY).toString());
-            directory.mkdirs();
+            String directory = properties.get(DIRECTORY).toString();
 
             mongo = null;
-            store[0] = new FileStore(new File(directory, "data.tar").getPath());
+            store[0] = new FileStore(directory);
         } else {
             String host = String.valueOf(properties.get(HOST));
             int port = Integer.parseInt(String.valueOf(properties.get(PORT)));
@@ -140,6 +142,7 @@ public class SegmentNodeStoreService ext
 
     @Deactivate
     public void deactivate() {
+        store[0].close();
         if (mongo != null) {
             mongo.close();
         }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java?rev=1478277&r1=1478276&r2=1478277&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentStore.java Thu May  2 06:04:08 2013
@@ -33,4 +33,6 @@ public interface SegmentStore {
 
     void deleteSegment(UUID segmentId);
 
+    void close();
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1478277&r1=1478276&r2=1478277&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Thu May  2 06:04:08 2013
@@ -17,13 +17,19 @@
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Lists.newArrayListWithCapacity;
 import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -40,38 +46,99 @@ import com.google.common.collect.Maps;
 
 public class FileStore implements SegmentStore {
 
+    private static final long MAGIC_BYTES = 0x4f616b0a527845ddL;
+
+    private static final long FILE_SIZE = 256 * 1024 * 1024;
+
+    private static final String FILE_NAME_FORMAT = "data%05d.tar";
+
     private final Map<String, Journal> journals = Maps.newHashMap();
 
-    private final ByteBuffer tar;
+    private final File directory;
+    private int index;
+
+    private MappedByteBuffer rw;
+    private ByteBuffer ro;
 
     private final ConcurrentMap<UUID, Segment> segments =
             Maps.newConcurrentMap();
 
-    public FileStore(String filename, NodeState root) {
-        try {
-            RandomAccessFile file = new RandomAccessFile(filename, "rw");
-            try {
-                tar = file.getChannel().map(READ_WRITE, 0, 1024 * 1024 * 1024);
-            } finally {
-                file.close();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+    public FileStore(File directory, NodeState root) throws IOException {
+        checkNotNull(directory).mkdirs();
+        this.directory = directory;
+        this.index = 0;
+        while (loadSegments()) {
+            this.index++;
         }
 
         journals.put("root", new MemoryJournal(this, root));
     }
 
-    public FileStore(NodeState root) {
-        this("data.tar", root);
+    public FileStore(File directory) throws IOException {
+        this(directory, EMPTY_NODE);
     }
 
-    public FileStore(String filename) {
-        this(filename, EMPTY_NODE);
+    public FileStore(String directory) throws IOException {
+        this(new File(directory));
     }
 
-    public FileStore() {
-        this(EMPTY_NODE);
+    public void close() {
+        rw.force();
+    }
+
+    private boolean loadSegments() throws IOException {
+        String name = String.format(FILE_NAME_FORMAT, index);
+        File file = new File(directory, name);
+        long size = FILE_SIZE;
+        if (file.isFile()) {
+            size = file.length();
+        }
+        RandomAccessFile f = new RandomAccessFile(file, "rw");
+        try {
+            rw = f.getChannel().map(READ_WRITE, 0, size);
+            ro = rw.asReadOnlyBuffer();
+
+            while (ro.remaining() >= 4 * 0x200) {
+                // skip tar header and get the magic bytes; TODO: verify?
+                long magic = ro.getLong(ro.position() + 0x200);
+                if (magic == MAGIC_BYTES) {
+                    ro.position(ro.position() + 0x200 + 8);
+
+                    int length = ro.getInt();
+                    int count = ro.getInt();
+
+                    UUID segmentId = new UUID(ro.getLong(), ro.getLong());
+                    Collection<UUID> referencedSegmentIds =
+                            newArrayListWithCapacity(count);
+                    for (int i = 0; i < count; i++) {
+                        referencedSegmentIds.add(
+                                new UUID(ro.getLong(), ro.getLong()));
+                    }
+
+                    ro.limit(ro.position() + length);
+                    ByteBuffer data = ro.slice();
+                    ro.limit(rw.limit());
+
+                    Segment segment = new Segment(
+                            this, segmentId, data, referencedSegmentIds,
+                            Collections.<String, RecordId>emptyMap(),
+                            Collections.<Template, RecordId>emptyMap());
+                    segments.put(segmentId, segment);
+
+                    // advance to next entry in the file
+                    ro.position((ro.position() + length + 0x1ff) & ~0x1ff);
+                } else {
+                    // still space for more segments: position the write
+                    // buffer at this point and return false to stop looking
+                    // for more entries
+                    rw.position(ro.position());
+                    return false;
+                }
+            }
+            return true;
+        } finally {
+            f.close();
+        }
     }
 
     @Override
@@ -95,32 +162,54 @@ public class FileStore implements Segmen
     }
 
     @Override
-    public void createSegment(
+    public synchronized void createSegment(
             UUID segmentId, byte[] data, int offset, int length,
             Collection<UUID> referencedSegmentIds,
             Map<String, RecordId> strings, Map<Template, RecordId> templates) {
-        tar.put(segmentId.toString().getBytes(UTF_8));
-        tar.putInt(referencedSegmentIds.size());
+        int size = 8 + 4 + 4 + 16 + 16 * referencedSegmentIds.size() + length;
+
+        if (0x200 + ((size + 0x1ff) & ~0x1ff) + 2 * 0x200 > rw.remaining()) {
+            rw.force();
+            String name = String.format(FILE_NAME_FORMAT, index);
+            File file = new File(directory, name);
+            try {
+                RandomAccessFile f = new RandomAccessFile(file, "rw");
+                try {
+                    rw = f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
+                    ro = rw.asReadOnlyBuffer();
+                } finally {
+                    f.close();
+                }
+            } catch (IOException e) {
+                throw new RuntimeException("Unable to create a new segment", e);
+            }
+        }
+
+        rw.put(createTarHeader(segmentId.toString(), size));
+
+        rw.putLong(MAGIC_BYTES);
+        rw.putInt(length);
+        rw.putInt(referencedSegmentIds.size());
+        rw.putLong(segmentId.getMostSignificantBits());
+        rw.putLong(segmentId.getLeastSignificantBits());
         for (UUID referencedSegmentId : referencedSegmentIds) {
-            tar.putLong(referencedSegmentId.getMostSignificantBits());
-            tar.putLong(referencedSegmentId.getLeastSignificantBits());
+            rw.putLong(referencedSegmentId.getMostSignificantBits());
+            rw.putLong(referencedSegmentId.getLeastSignificantBits());
         }
-        tar.putInt(length);
-        int position = tar.position();
-        tar.put(data, offset, length);
-
-        ByteBuffer buffer = tar.asReadOnlyBuffer();
-        buffer.position(position);
-        buffer.limit(position + length);
-        buffer = buffer.slice();
+
+        ro.position(rw.position());
+        rw.put(data, offset, length);
+        ro.limit(rw.position());
+        ByteBuffer buffer = ro.slice();
+        ro.limit(rw.limit());
+
+        rw.position((rw.position() + 0x1ff) & ~0x1ff);
+        ro.position(rw.position());
 
         Segment segment = new Segment(
                 this, segmentId, buffer,
                 referencedSegmentIds, strings, templates);
-        if (segments.putIfAbsent(segmentId, segment) != null) {
-            throw new IllegalStateException(
-                    "Segment override: " + segmentId);
-        }
+        checkState(segments.put(segmentId, segment) == null);
     }
 
     @Override
@@ -130,4 +219,76 @@ public class FileStore implements Segmen
         }
     }
 
+    private static byte[] createTarHeader(String name, int length) {
+        byte[] header = new byte[] {
+                // File name
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0,
+                // File mode
+                '0', '0', '0', '0', '4', '0', '0', 0,
+                // User's numeric user ID
+                '0', '0', '0', '0', '0', '0', '0', 0,
+                // Group's numeric user ID
+                '0', '0', '0', '0', '0', '0', '0', 0,
+                // File size in bytes (octal basis)
+                '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0',  0,
+                // Last modification time in numeric Unix time format (octal)
+                '0', '0', '0', '0', '0', '0', '0', '0', '0', '0', '0',  0,
+                // Checksum for header record
+                ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ',
+                // Type flag
+                '0',
+                // Name of linked file
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0,
+                // unused
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
+        };
+
+        byte[] n = name.getBytes(UTF_8);
+        System.arraycopy(n, 0, header, 0, n.length);
+
+        byte[] l = Integer.toOctalString(length).getBytes(UTF_8);
+        System.arraycopy(l, 0, header, 124 + 11 - l.length, l.length);
+
+        long time = System.currentTimeMillis() / 1000;
+        byte[] t = Long.toOctalString(time).getBytes(UTF_8);
+        System.arraycopy(t, 0, header, 136 + 11 - t.length, t.length);
+
+        int checksum = 0;
+        for (int i = 0; i < header.length; i++) {
+            checksum += header[i] & 0xff;
+        }
+        byte[] c = Integer.toOctalString(checksum).getBytes(UTF_8);
+        System.arraycopy(c, 0, header, 148 + 6 - c.length, c.length);
+        header[148 + 6] = 0;
+
+        return header;
+    }
+
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java?rev=1478277&r1=1478276&r2=1478277&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/memory/MemoryStore.java Thu May  2 06:04:08 2013
@@ -49,6 +49,10 @@ public class MemoryStore implements Segm
     }
 
     @Override
+    public void close() {
+    }
+
+    @Override
     public synchronized Journal getJournal(final String name) {
         Journal journal = journals.get(name);
         if (journal == null) {

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java?rev=1478277&r1=1478276&r2=1478277&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/mongo/MongoStore.java Thu May  2 06:04:08 2013
@@ -76,6 +76,10 @@ public class MongoStore implements Segme
     }
 
     @Override
+    public void close() {
+    }
+
+    @Override
     public synchronized Journal getJournal(String name) {
         Journal journal = journals.get(name);
         if (journal == null) {