You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ju...@apache.org on 2013/09/18 20:18:26 UTC

svn commit: r1524513 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file: FileJournal.java FileStore.java TarFile.java

Author: jukka
Date: Wed Sep 18 18:18:25 2013
New Revision: 1524513

URL: http://svn.apache.org/r1524513
Log:
OAK-1001: SegmentMK: 32bit support for the file backend

Enabled the new 32bit code

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileJournal.java Wed Sep 18 18:18:25 2013
@@ -16,6 +16,8 @@
  */
 package org.apache.jackrabbit.oak.plugins.segment.file;
 
+import java.io.IOException;
+
 import org.apache.jackrabbit.oak.plugins.segment.RecordId;
 import org.apache.jackrabbit.oak.plugins.segment.memory.MemoryJournal;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
@@ -37,8 +39,12 @@ class FileJournal extends MemoryJournal 
     @Override
     public synchronized boolean setHead(RecordId base, RecordId head) {
         if (super.setHead(base, head)) {
-            store.writeJournals();
-            return true;
+            try {
+                store.writeJournals();
+                return true;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         } else {
             return false;
         }
@@ -47,6 +53,10 @@ class FileJournal extends MemoryJournal 
     @Override
     public synchronized void merge() {
         super.merge();
-        store.writeJournals();
+        try {
+            store.writeJournals();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
     }
 }
\ No newline at end of file

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/FileStore.java Wed Sep 18 18:18:25 2013
@@ -21,16 +21,12 @@ import static com.google.common.base.Pre
 import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.collect.Lists.newArrayListWithCapacity;
 import static com.google.common.collect.Lists.newLinkedList;
-import static com.google.common.collect.Maps.newConcurrentMap;
 import static com.google.common.collect.Maps.newHashMap;
-import static java.nio.channels.FileChannel.MapMode.READ_WRITE;
 import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.EMPTY_NODE;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -47,96 +43,32 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.segment.Template;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 
 public class FileStore implements SegmentStore {
 
-    /** Logger instance */
-    private static final Logger log = LoggerFactory.getLogger(FileStore.class);
-
     private static final long SEGMENT_MAGIC = 0x4f616b0a527845ddL;
 
     private static final long JOURNAL_MAGIC = 0xdf36544212c0cb24L;
 
-    private static final long PADDING_MAGIC = 0x786da7779516c12L;
-
-    private static final String JOURNALS_UUID = new UUID(0, 0).toString();
-
-    private static final String PADDING_UUID = new UUID(-1, -1).toString();
+    static final UUID JOURNALS_UUID = new UUID(0, 0);
 
-    private static final long FILE_SIZE = 256 * 1024 * 1024;
+    private static final int FILE_SIZE = 256 * 1024 * 1024;
 
     private static final String FILE_NAME_FORMAT = "data%05d.tar";
 
-    private static final int SEGMENT_SIZE = 0x200; // 512
-
-    private static final byte[] PADDING_BYTES = new byte[SEGMENT_SIZE];
-
-    private class SegmentReference {
-
-        private final UUID id;
-
-        private final ByteBuffer buffer;
-
-        private final int position;
-
-        SegmentReference(UUID id, ByteBuffer buffer, int position) {
-            this.id = id;
-            this.buffer = checkNotNull(buffer);
-            this.position = position;
-        }
-
-        UUID getSegmentId() {
-            return id;
-        }
-
-        Segment getSegment() {
-            ByteBuffer ro = buffer.asReadOnlyBuffer();
-            ro.position(position);
-
-            int length = ro.getInt();
-            int count = ro.getInt();
-
-            checkState(id.equals(new UUID(ro.getLong(), ro.getLong())));
-
-            Collection<UUID> referencedIds = newArrayListWithCapacity(count);
-            for (int i = 0; i < count; i++) {
-                SegmentReference reference =
-                        references.get(new UUID(ro.getLong(), ro.getLong()));
-                checkState(reference != null);
-                referencedIds.add(reference.getSegmentId());
-            }
-
-            ro.limit(ro.position() + length);
-            return new Segment(FileStore.this, id, ro.slice(), referencedIds,
-                    Collections.<String, RecordId>emptyMap(),
-                    Collections.<Template, RecordId>emptyMap());
-        }
-
-    }
-
     private final File directory;
 
-    private final LinkedList<MappedByteBuffer> files = newLinkedList();
+    private final LinkedList<TarFile> files = newLinkedList();
 
     private final Map<String, Journal> journals = newHashMap();
 
-    private final Map<UUID, SegmentReference> references = newConcurrentMap();
-
     private final Cache<UUID, Segment> segments =
             CacheBuilder.newBuilder().maximumSize(1000).build();
 
     public FileStore(File directory, NodeState root) throws IOException {
-        // http://www.oracle.com/technetwork/java/hotspotfaq-138619.html#64bit_detection
-        if ("32".equals(System.getProperty("sun.arch.data.model"))) {
-            log.warn("TarMK will only work with small repositories"
-                    + " in a 32 bit JVM. Consider switching to a 64 bit JVM.");
-        }
-
         checkNotNull(directory).mkdirs();
         this.directory = directory;
 
@@ -144,19 +76,28 @@ public class FileStore implements Segmen
             String name = String.format(FILE_NAME_FORMAT, i);
             File file = new File(directory, name);
             if (file.isFile()) {
-                RandomAccessFile f = new RandomAccessFile(file, "rw");
-                try {
-                    files.add(f.getChannel().map(READ_WRITE, 0, FILE_SIZE));
-                } finally {
-                    f.close();
-                }
+                files.add(new TarFile(file, FILE_SIZE));
             } else {
                 break;
             }
         }
 
-        for (MappedByteBuffer file : files) {
-            loadSegments(file);
+        for (TarFile tar : files) {
+            ByteBuffer buffer = tar.readEntry(JOURNALS_UUID);
+            if (buffer != null) {
+                checkState(JOURNAL_MAGIC == buffer.getLong());
+                int count = buffer.getInt();
+                for (int i = 0; i < count; i++) {
+                    byte[] b = new byte[buffer.getInt()];
+                    buffer.get(b);
+                    String name = new String(b, UTF_8);
+                    RecordId recordId = new RecordId(
+                            new UUID(buffer.getLong(), buffer.getLong()),
+                            buffer.getInt());
+                    journals.put(name, new FileJournal(
+                            this, new SegmentNodeState(this, recordId)));
+                }
+            }
         }
 
         if (!journals.containsKey("root")) {
@@ -175,56 +116,17 @@ public class FileStore implements Segmen
     }
 
     public synchronized void close() {
-        if (!files.isEmpty()) {
-            files.getLast().force();
+        for (TarFile file : files) {
+            try {
+                file.close();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
         }
         files.clear();
-        references.clear();
         segments.invalidateAll();
         segments.cleanUp();
-        System.gc();
-    }
-
-    private void loadSegments(MappedByteBuffer ro) throws IOException {
-        while (ro.remaining() >= 4 * SEGMENT_SIZE) {
-            // skip tar header and get the magic bytes; TODO: verify?
-            long magic = ro.getLong(ro.position() + SEGMENT_SIZE);
-            if (magic == SEGMENT_MAGIC) {
-                int position = ro.position() + SEGMENT_SIZE + 8;
-                int length = ro.getInt(position);
-                int count = ro.getInt(position + 4);
-                UUID id = new UUID(
-                        ro.getLong(position + 8),
-                        ro.getLong(position + 16));
-                references.put(id, new SegmentReference(id, ro, position));
-
-                // advance to next entry in the file
-                position += 24 + count * 16 + length;
-                ro.position((position + 0x1ff) & ~0x1ff);
-            } else if (magic == JOURNAL_MAGIC) {
-                ro.position(ro.position() + SEGMENT_SIZE + 8);
-
-                int count = ro.getInt();
-                for (int i = 0; i < count; i++) {
-                    byte[] n = new byte[ro.getInt()];
-                    ro.get(n);
-                    SegmentReference reference = references.get(
-                            new UUID(ro.getLong(), ro.getLong()));
-                    checkState(reference != null);
-                    SegmentNodeState h = new SegmentNodeState(this, new RecordId(
-                            reference.getSegmentId(), ro.getInt()));
-                    journals.put(
-                            new String(n, UTF_8),
-                            new FileJournal(this, h));
-                }
-
-                // advance to next entry in the file
-                ro.position((ro.position() + 0x1ff) & ~0x1ff);
-            } else {
-                // last entry encountered
-                return;
-            }
-        }
+        System.gc(); // for any memory-mappings that are no longer used
     }
 
     @Override
@@ -243,9 +145,33 @@ public class FileStore implements Segmen
             return segments.get(id, new Callable<Segment>() {
                 @Override
                 public Segment call() throws Exception {
-                    SegmentReference reference = references.get(id);
-                    checkState(reference != null);
-                    return reference.getSegment();
+                    for (TarFile file : files) {
+                        ByteBuffer buffer = file.readEntry(id);
+                        if (buffer != null) {
+                            checkState(SEGMENT_MAGIC == buffer.getLong());
+                            int length = buffer.getInt();
+                            int count = buffer.getInt();
+
+                            checkState(id.equals(new UUID(
+                                    buffer.getLong(), buffer.getLong())));
+
+                            Collection<UUID> referencedIds =
+                                    newArrayListWithCapacity(count);
+                            for (int i = 0; i < count; i++) {
+                                referencedIds.add(new UUID(
+                                        buffer.getLong(), buffer.getLong()));
+                            }
+
+                            buffer.limit(buffer.position() + length);
+                            return new Segment(
+                                    FileStore.this, id,
+                                    buffer.slice(), referencedIds,
+                                    Collections.<String, RecordId>emptyMap(),
+                                    Collections.<Template, RecordId>emptyMap());
+                        }
+                    }
+                    throw new IllegalStateException(
+                            "Segment " + id + " not found");
                 }
             });
         } catch (ExecutionException e) {
@@ -259,174 +185,69 @@ public class FileStore implements Segmen
             Collection<UUID> referencedSegmentIds,
             Map<String, RecordId> strings, Map<Template, RecordId> templates) {
         int size = 8 + 4 + 4 + 16 + 16 * referencedSegmentIds.size() + length;
+        ByteBuffer buffer = ByteBuffer.allocate(size);
 
-        MappedByteBuffer rw = prepare(size);
-
-        rw.put(createTarHeader(segmentId.toString(), size));
-        rw.putLong(SEGMENT_MAGIC);
-        int position = rw.position();
-
-        rw.putInt(length);
-        rw.putInt(referencedSegmentIds.size());
-        rw.putLong(segmentId.getMostSignificantBits());
-        rw.putLong(segmentId.getLeastSignificantBits());
+        buffer.putLong(SEGMENT_MAGIC);
+        buffer.putInt(length);
+        buffer.putInt(referencedSegmentIds.size());
+        buffer.putLong(segmentId.getMostSignificantBits());
+        buffer.putLong(segmentId.getLeastSignificantBits());
         for (UUID referencedSegmentId : referencedSegmentIds) {
-            rw.putLong(referencedSegmentId.getMostSignificantBits());
-            rw.putLong(referencedSegmentId.getLeastSignificantBits());
+            buffer.putLong(referencedSegmentId.getMostSignificantBits());
+            buffer.putLong(referencedSegmentId.getLeastSignificantBits());
         }
 
-        ByteBuffer ro = rw.asReadOnlyBuffer();
-        ro.limit(ro.position() + length);
-        ro = ro.slice();
-
-        rw.put(data, offset, length);
+        int pos = buffer.position();
+        buffer.put(data, offset, length);
 
-        int n = rw.position() % SEGMENT_SIZE;
-        if (n > 0) {
-            rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
+        try {
+            writeEntry(segmentId, buffer.array());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
 
-        SegmentReference previous = references.put(
-                segmentId, new SegmentReference(segmentId, rw, position));
-        checkState(previous == null);
-
+        buffer.position(pos);
         segments.put(segmentId, new Segment(
-                this, segmentId, ro, referencedSegmentIds, strings, templates));
+                this, segmentId, buffer.slice(),
+                referencedSegmentIds, strings, templates));
+    }
+
+    private void writeEntry(UUID segmentId, byte[] buffer)
+            throws IOException {
+        if (files.isEmpty() || !files.getLast().writeEntry(
+                segmentId, buffer, 0, buffer.length)) {
+            String name = String.format(FILE_NAME_FORMAT, files.size());
+            TarFile last = new TarFile(new File(directory, name), FILE_SIZE);
+            checkState(last.writeEntry(segmentId, buffer, 0, buffer.length));
+            files.add(last);
+        }
     }
 
     @Override
     public void deleteSegment(UUID segmentId) {
-        if (references.remove(segmentId) == null) {
-            throw new IllegalStateException("Missing segment: " + segmentId);
-        }
-        segments.invalidate(segmentId);
+        // TODO: implement
     }
 
-    synchronized void writeJournals() {
+    synchronized void writeJournals() throws IOException {
         int size = 8 + 4;
         for (String name : journals.keySet()) {
             size += 4 + name.getBytes(UTF_8).length + 16 + 4;
         }
 
-        MappedByteBuffer rw = prepare(size);
-
-        rw.put(createTarHeader(JOURNALS_UUID, size));
-
-        rw.putLong(JOURNAL_MAGIC);
-        rw.putInt(journals.size());
+        ByteBuffer buffer = ByteBuffer.allocate(size);
+        buffer.putLong(JOURNAL_MAGIC);
+        buffer.putInt(journals.size());
         for (Map.Entry<String, Journal> entry : journals.entrySet()) {
             byte[] name = entry.getKey().getBytes(UTF_8);
-            rw.putInt(name.length);
-            rw.put(name);
+            buffer.putInt(name.length);
+            buffer.put(name);
             RecordId head = entry.getValue().getHead();
-            rw.putLong(head.getSegmentId().getMostSignificantBits());
-            rw.putLong(head.getSegmentId().getLeastSignificantBits());
-            rw.putInt(head.getOffset());
-        }
-
-        int n = rw.position() % SEGMENT_SIZE;
-        if (n > 0) {
-            rw.put(PADDING_BYTES, 0, SEGMENT_SIZE - n);
-        }
-    }
-
-    private synchronized MappedByteBuffer prepare(int size) {
-        if (!files.isEmpty()) {
-            MappedByteBuffer last = files.getLast();
-
-            // Check if there's still enough room in the last open file
-            int segments = (size + SEGMENT_SIZE - 1) / SEGMENT_SIZE;
-            if ((1 + segments + 2) * SEGMENT_SIZE <= last.remaining()) {
-                return last;
-            }
-
-            // No more room, so finish the last file with padding as needed
-            if (last.remaining() >= 3 * SEGMENT_SIZE) {
-                // Add a padding entry to avoid problems during reopening
-                last.put(createTarHeader(
-                        PADDING_UUID,
-                        last.remaining() - 3 * SEGMENT_SIZE));
-                if (last.remaining() > 2 * SEGMENT_SIZE) {
-                    last.putLong(PADDING_MAGIC);
-                    last.put(PADDING_BYTES, 0, SEGMENT_SIZE - 8);
-                }
-            }
-            while (last.remaining() > 0) {
-                last.put(PADDING_BYTES);
-            }
-            last.force();
-        }
-
-        // Need to start a new file
-        try {
-            String name = String.format(FILE_NAME_FORMAT, files.size());
-            File file = new File(directory, name);
-            RandomAccessFile f = new RandomAccessFile(file, "rw");
-            try {
-                MappedByteBuffer buffer =
-                        f.getChannel().map(READ_WRITE, 0, FILE_SIZE);
-                files.add(buffer);
-                return buffer;
-            } finally {
-                f.close();
-            }
-        } catch (IOException e) {
-            throw new RuntimeException("Unable to create a new segment", e);
+            buffer.putLong(head.getSegmentId().getMostSignificantBits());
+            buffer.putLong(head.getSegmentId().getLeastSignificantBits());
+            buffer.putInt(head.getOffset());
         }
-    }
-
-    private static byte[] createTarHeader(String name, int length) {
-        byte[] header = new byte[SEGMENT_SIZE];
-
-        // File name
-        byte[] n = name.getBytes(UTF_8);
-        System.arraycopy(n, 0, header, 0, n.length);
-
-        // File mode
-        System.arraycopy(
-                String.format("%07o", 0400).getBytes(UTF_8), 0,
-                header, 100, 7);
-
-        // User's numeric user ID
-        System.arraycopy(
-                String.format("%07o", 0).getBytes(UTF_8), 0,
-                header, 108, 7);
-
-        // Group's numeric user ID
-        System.arraycopy(
-                String.format("%07o", 0).getBytes(UTF_8), 0,
-                header, 116, 7);
-
-        // File size in bytes (octal basis)
-        System.arraycopy(
-                String.format("%011o", length).getBytes(UTF_8), 0,
-                header, 124, 11);
-
-        // Last modification time in numeric Unix time format (octal)
-        long time = System.currentTimeMillis() / 1000;
-        System.arraycopy(
-                String.format("%011o", time).getBytes(UTF_8), 0,
-                header, 136, 11);
-
-        // Checksum for header record
-        System.arraycopy(
-                new byte[] { ' ', ' ', ' ', ' ', ' ', ' ', ' ', ' ' }, 0,
-                header, 148, 8);
-
-        // Type flag
-        header[156] = '0';
-
-        // Compute checksum
-        int checksum = 0;
-        for (int i = 0; i < header.length; i++) {
-            checksum += header[i] & 0xff;
-        }
-        System.arraycopy(
-                String.format("%06o", checksum).getBytes(UTF_8), 0,
-                header, 148, 6);
-        header[154] = 0;
 
-        return header;
+        writeEntry(JOURNALS_UUID, buffer.array());
     }
 
 }

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java?rev=1524513&r1=1524512&r2=1524513&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/segment/file/TarFile.java Wed Sep 18 18:18:25 2013
@@ -18,6 +18,9 @@ package org.apache.jackrabbit.oak.plugin
 
 import static com.google.common.base.Charsets.UTF_8;
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Predicates.equalTo;
+import static com.google.common.base.Predicates.not;
+import static com.google.common.collect.Maps.filterKeys;
 
 import java.io.File;
 import java.io.IOException;
@@ -73,6 +76,7 @@ class TarFile {
 
         ImmutableMap.Builder<UUID, Location> builder = ImmutableMap.builder();
 
+        Location journals = null;
         this.position = 0;
         while (position + BLOCK_SIZE <= len) {
             // read the tar header block
@@ -87,14 +91,22 @@ class TarFile {
             }
 
             try {
+                Location location = new Location(position + BLOCK_SIZE, size);
                 UUID id = UUID.fromString(name);
-                builder.put(id, new Location(position + BLOCK_SIZE, size));
+                if (FileStore.JOURNALS_UUID.equals(id)) {
+                    journals = location;
+                } else {
+                    builder.put(id, location);
+                }
             } catch (IllegalArgumentException e) {
                 throw new IOException("Unexpected tar entry: " + name);
             }
 
             position += (1 + (size + BLOCK_SIZE - 1) / BLOCK_SIZE) * BLOCK_SIZE;
         }
+        if (journals != null) {
+            builder.put(FileStore.JOURNALS_UUID, journals);
+        }
 
         this.entries = builder.build();
     }
@@ -169,7 +181,7 @@ class TarFile {
 
         file.write(position, b, offset, size);
         entries = ImmutableMap.<UUID, Location>builder()
-                .putAll(entries)
+                .putAll(filterKeys(entries, not(equalTo(id))))
                 .put(id, new Location(position, size))
                 .build();
         position += size;