You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by th...@apache.org on 2012/01/31 09:41:54 UTC

svn commit: r1238259 - in /jackrabbit/sandbox/microkernel/src: main/java/org/apache/jackrabbit/mk/blobs/ test/java/org/apache/jackrabbit/mk/blobs/

Author: thomasm
Date: Tue Jan 31 08:41:53 2012
New Revision: 1238259

URL: http://svn.apache.org/viewvc?rev=1238259&view=rev
Log:
Support efficient adding of temp files, plus slightly improved performance for random access.

Modified:
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/BlobStore.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java
    jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java
    jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/blobs/DbBlobStoreTest.java

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/AbstractBlobStore.java Tue Jan 31 08:41:53 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.mk.blobs;
 
+import org.apache.jackrabbit.mk.fs.FilePath;
 import org.apache.jackrabbit.mk.util.ExceptionFactory;
 import org.apache.jackrabbit.mk.util.IOUtils;
 import org.apache.jackrabbit.mk.util.Cache;
@@ -27,10 +28,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.security.DigestOutputStream;
 import java.security.MessageDigest;
+import java.util.Arrays;
 
 /**
- * An abstract data store that splits the binaries in relatively small blocks.
- * Each data store id a list of zero or more entries. Each entry is either
+ * An abstract data store that splits the binaries in relatively small blocks,
+ * so that each block fits in memory.
+ * <p>
+ * Each data store id is a list of zero or more entries. Each entry is either
  * <ul>
  * <li>data (a number of bytes), or</li>
  * <li>the hash code of the content of a number of bytes, or</li>
@@ -40,27 +44,27 @@ import java.security.MessageDigest;
  * caching is simpler, and so that the storage backend doesn't need to support
  * arbitrary size blobs (some storage backends buffer blobs in memory) and fast
  * seeks (some storage backends re-read the whole blob when seeking).
- * <p/>
+ * <p>
  * The the format of a 'data' entry is: type (one byte; 0 for data), length
  * (variable size int), data (bytes).
- * <p/>
+ * <p>
  * The format of a 'hash of content' entry is: type (one byte; 1 for hash),
  * level (variable size int, 0 meaning not nested), size (variable size long),
  * hash code length (variable size int), hash code.
- * <p/>
+ * <p>
  * The format of a 'hash of data store id' entry is: type (one byte; 1 for
  * hash), level (variable size int, nesting level), total size (variable size
  * long), size of data store id (variable size long), hash code length (variable
  * size int), hash code.
  */
-public abstract class AbstractBlobStore implements BlobStore, Cache.Backend<String, AbstractBlobStore.Data> {
+public abstract class AbstractBlobStore implements BlobStore, Cache.Backend<AbstractBlobStore.BlockId, AbstractBlobStore.Data> {
+
+    protected static final String HASH_ALGORITHM = "SHA-1";
 
     protected static final int TYPE_DATA = 0;
     protected static final int TYPE_HASH = 1;
     protected static final int TYPE_HASH_COMPRESSED = 2;
 
-    private static final String HASH_ALGORITHM = "SHA-1";
-
     /**
      * The minimum size of a block. Smaller blocks are stored (the data store id
      * is the data itself).
@@ -73,7 +77,7 @@ public abstract class AbstractBlobStore 
      */
     private int blockSize = 2 * 1024 * 1024;
 
-    private Cache<String, Data> cache = Cache.newInstance(this, 8 * 1024 * 1024);
+    private Cache<AbstractBlobStore.BlockId, Data> cache = Cache.newInstance(this, 8 * 1024 * 1024);
 
     public void setBlockSizeMin(int x) {
         this.blockSizeMin = x;
@@ -83,6 +87,24 @@ public abstract class AbstractBlobStore 
         this.blockSize = x;
     }
 
+    public int getBlockSize() {
+        return blockSize;
+    }
+
+    public String addBlob(String tempFilePath) {
+        try {
+            FilePath file = FilePath.get(tempFilePath);
+            try {
+                InputStream in = file.newInputStream();
+                return writeBlob(in);
+            } finally {
+                file.delete();
+            }
+        } catch (Exception e) {
+            throw ExceptionFactory.convert(e);
+        }
+    }
+
     public String writeBlob(InputStream in) {
         try {
             ByteArrayOutputStream idStream = new ByteArrayOutputStream();
@@ -99,9 +121,9 @@ public abstract class AbstractBlobStore 
         byte[] block = new byte[blockSize];
         int count = 0;
         while (true) {
-            MessageDigest digest = MessageDigest.getInstance(HASH_ALGORITHM);
+            MessageDigest messageDigest = MessageDigest.getInstance(HASH_ALGORITHM);
             ByteArrayOutputStream buff = new ByteArrayOutputStream();
-            DigestOutputStream dout = new DigestOutputStream(buff, digest);
+            DigestOutputStream dout = new DigestOutputStream(buff, messageDigest);
             int blockLen = IOUtils.readFully(in, block, 0, block.length);
             count++;
             if (blockLen == 0) {
@@ -113,7 +135,7 @@ public abstract class AbstractBlobStore 
                 totalLength += blockLen;
             } else {
                 dout.write(block, 0, blockLen);
-                byte[] blockId = digest.digest();
+                byte[] digest = messageDigest.digest();
                 idStream.write(TYPE_HASH);
                 IOUtils.writeVarInt(idStream, level);
                 if (level > 0) {
@@ -121,10 +143,10 @@ public abstract class AbstractBlobStore 
                 }
                 IOUtils.writeVarLong(idStream, blockLen);
                 totalLength += blockLen;
-                IOUtils.writeVarInt(idStream, blockId.length);
-                idStream.write(blockId);
+                IOUtils.writeVarInt(idStream, digest.length);
+                idStream.write(digest);
                 byte[] data = buff.toByteArray();
-                storeBlock(blockId, level, data);
+                storeBlock(digest, level, data);
             }
             if (idStream.size() > blockSize / 2) {
                 // convert large ids to a block, but ensure it can be stored as
@@ -181,12 +203,14 @@ public abstract class AbstractBlobStore 
                     if (pos >= totalLength) {
                         pos -= totalLength;
                     } else {
-                        byte[] block = readBlock(digest);
                         if (level > 0) {
+                            byte[] block = readBlock(digest, 0);
                             idStream = new ByteArrayInputStream(block);
                         } else {
+                            long readPos = pos - pos % blockSize;
+                            byte[] block = readBlock(digest, readPos);
                             ByteArrayInputStream in = new ByteArrayInputStream(block);
-                            IOUtils.skipFully(in, (int) pos);
+                            IOUtils.skipFully(in, pos - readPos);
                             return IOUtils.readFully(in, buff, off, length);
                         }
                     }
@@ -199,20 +223,20 @@ public abstract class AbstractBlobStore 
         }
     }
 
-    private byte[] readBlock(byte[] digest) throws Exception {
-        String id = StringUtils.convertBytesToHex(digest);
+    private byte[] readBlock(byte[] digest, long pos) throws Exception {
+        BlockId id = new BlockId(digest, pos);
         return cache.get(id).data;
     }
 
-    public Data load(String id) {
+    public Data load(BlockId id) {
         try {
-            return new Data(readBlockFromBackend(StringUtils.convertHexToBytes(id)));
+            return new Data(readBlockFromBackend(id));
         } catch (Exception e) {
             throw ExceptionFactory.convert(e);
         }
     }
 
-    protected abstract byte[] readBlockFromBackend(byte[] blockId) throws Exception;
+    protected abstract byte[] readBlockFromBackend(BlockId id) throws Exception;
 
     public long getBlobLength(String blobId) {
         try {
@@ -247,6 +271,36 @@ public abstract class AbstractBlobStore 
         }
     }
 
+    public static class BlockId {
+
+        final byte[] digest;
+        final long pos;
+
+        BlockId(byte[] digest, long pos) {
+            this.digest = digest;
+            this.pos = pos;
+        }
+
+        public boolean equals(Object other) {
+            if (this == other) {
+                return true;
+            }
+            BlockId o = (BlockId) other;
+            return Arrays.equals(digest, o.digest) &&
+                    pos == o.pos;
+        }
+
+        public int hashCode() {
+            return Arrays.hashCode(digest) ^
+                    (int) (pos >> 32) ^ (int) pos;
+        }
+
+        public String toString() {
+            return StringUtils.convertBytesToHex(digest) + "@" + pos;
+        }
+
+    }
+
     public static class Data implements Cache.Value {
 
         final byte[] data;
@@ -255,6 +309,11 @@ public abstract class AbstractBlobStore 
             this.data = data;
         }
 
+        public String toString() {
+            String s = StringUtils.convertBytesToHex(data);
+            return s.length() > 100 ? s.substring(0, 100) + ".. (len=" + data.length + ")" : s;
+        }
+
         public int getMemory() {
             return data.length;
         }

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/BlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/BlobStore.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/BlobStore.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/BlobStore.java Tue Jan 31 08:41:53 2012
@@ -23,6 +23,23 @@ import java.io.InputStream;
  */
 public interface BlobStore {
 
+    /**
+     * Write a blob from a temporary file. The temporary file is removed
+     * afterwards. A file based blob stores might simply rename the file, so
+     * that no additional writes are necessary.
+     *
+     * @param tempFilePath the temporary file
+     * @return the blob id
+     */
+    String addBlob(String tempFilePath) throws Exception;
+
+    /**
+     * Write a blob from an input stream.
+     * This method closes the input stream.
+     *
+     * @param in the input stream
+     * @return the blob id
+     */
     String writeBlob(InputStream in) throws Exception;
 
     int readBlob(String blobId, long pos, byte[] buff, int off, int length) throws Exception;

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/DbBlobStore.java Tue Jan 31 08:41:53 2012
@@ -45,10 +45,10 @@ public class DbBlobStore extends Abstrac
     }
 
     @Override
-    protected synchronized void storeBlock(byte[] blockId, int level, byte[] data) throws SQLException {
+    protected synchronized void storeBlock(byte[] digest, int level, byte[] data) throws SQLException {
         Connection conn = cp.getConnection();
         try {
-            String id = StringUtils.convertBytesToHex(blockId);
+            String id = StringUtils.convertBytesToHex(digest);
             long now = System.currentTimeMillis();
             PreparedStatement prep = conn.prepareStatement(
                     "update datastore_meta set lastMod = ? where id = ?");
@@ -95,13 +95,13 @@ public class DbBlobStore extends Abstrac
     }
 
     @Override
-    protected byte[] readBlockFromBackend(byte[] blockId) throws Exception {
+    protected byte[] readBlockFromBackend(BlockId blockId) throws Exception {
         Connection conn = cp.getConnection();
         try {
             PreparedStatement prep = conn.prepareStatement(
                     "select data from datastore_data where id = ?");
             try {
-                String id = StringUtils.convertBytesToHex(blockId);
+                String id = StringUtils.convertBytesToHex(blockId.digest);
                 prep.setString(1, id);
                 ResultSet rs = prep.executeQuery();
                 if (!rs.next()) {
@@ -109,7 +109,16 @@ public class DbBlobStore extends Abstrac
                 }
                 byte[] data = rs.getBytes(1);
                 // System.out.println("    read block " + id + " blockLen: " + data.length + " [0]: " + data[0]);
-                return data;
+                if (blockId.pos == 0) {
+                    return data;
+                }
+                int len = (int) (data.length - blockId.pos);
+                if (len < 0) {
+                    return new byte[0];
+                }
+                byte[] d2 = new byte[len];
+                System.arraycopy(data, (int) blockId.pos, d2, 0, len);
+                return d2;
             } finally {
                 prep.close();
             }

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/FileBlobStore.java Tue Jan 31 08:41:53 2012
@@ -18,12 +18,16 @@ package org.apache.jackrabbit.mk.blobs;
 
 import org.apache.jackrabbit.mk.fs.FilePath;
 import org.apache.jackrabbit.mk.fs.FileUtils;
+import org.apache.jackrabbit.mk.util.ExceptionFactory;
 import org.apache.jackrabbit.mk.util.IOUtils;
 import org.apache.jackrabbit.mk.util.StringUtils;
 
+import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.IOException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
 
 /**
  * A file blob store.
@@ -31,6 +35,7 @@ import java.io.IOException;
 public class FileBlobStore extends AbstractBlobStore {
 
     private final FilePath baseDir;
+    private final byte[] buffer = new byte[16 * 1024];
 
     public FileBlobStore(String dir) throws IOException {
         baseDir = FilePath.get(dir);
@@ -38,8 +43,50 @@ public class FileBlobStore extends Abstr
     }
 
     @Override
-    protected synchronized void storeBlock(byte[] blockId, int level, byte[] data) throws IOException {
-        FilePath f = getFile(blockId);
+    public String addBlob(String tempFilePath) {
+        try {
+            FilePath file = FilePath.get(tempFilePath);
+            InputStream in = file.newInputStream();
+            MessageDigest digest = MessageDigest.getInstance(HASH_ALGORITHM);
+            DigestInputStream din = new DigestInputStream(in, digest);
+            long length = file.size();
+            try {
+                while (true) {
+                    int len = din.read(buffer, 0, buffer.length);
+                    if (len < 0) {
+                        break;
+                    }
+                }
+            } finally {
+                din.close();
+            }
+            ByteArrayOutputStream idStream = new ByteArrayOutputStream();
+            idStream.write(TYPE_HASH);
+            IOUtils.writeVarInt(idStream, 0);
+            IOUtils.writeVarLong(idStream, length);
+            byte[] blockId = digest.digest();
+            FilePath f = getFile(blockId);
+            if (f.exists()) {
+                file.delete();
+            } else {
+                FilePath parent = f.getParent();
+                if (!parent.exists()) {
+                    FileUtils.createDirectories(parent.toString());
+                }
+                file.moveTo(f);
+            }
+            IOUtils.writeVarInt(idStream, blockId.length);
+            idStream.write(blockId);
+            byte[] id = idStream.toByteArray();
+            return StringUtils.convertBytesToHex(id);
+        } catch (Exception e) {
+            throw ExceptionFactory.convert(e);
+        }
+    }
+
+    @Override
+    protected synchronized void storeBlock(byte[] digest, int level, byte[] data) throws IOException {
+        FilePath f = getFile(digest);
         if (f.exists()) {
             return;
         }
@@ -61,12 +108,13 @@ public class FileBlobStore extends Abstr
     }
 
     @Override
-    protected byte[] readBlockFromBackend(byte[] blockId) throws IOException {
-        FilePath f = getFile(blockId);
-        int length = (int) f.size();
+    protected byte[] readBlockFromBackend(BlockId id) throws IOException {
+        FilePath f = getFile(id.digest);
+        int length = (int) Math.min(f.size(), getBlockSize());
         byte[] data = new byte[length];
         InputStream in = f.newInputStream();
         try {
+            IOUtils.skipFully(in, id.pos);
             IOUtils.readFully(in, data, 0, length);
         } finally {
             in.close();

Modified: jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java (original)
+++ jackrabbit/sandbox/microkernel/src/main/java/org/apache/jackrabbit/mk/blobs/MemoryBlobStore.java Tue Jan 31 08:41:53 2012
@@ -16,8 +16,6 @@
  */
 package org.apache.jackrabbit.mk.blobs;
 
-import org.apache.jackrabbit.mk.util.StringUtils;
-
 import java.util.HashMap;
 
 /**
@@ -25,20 +23,16 @@ import java.util.HashMap;
  */
 public class MemoryBlobStore extends AbstractBlobStore {
 
-    private HashMap<String, byte[]> map = new HashMap<String, byte[]>();
+    private HashMap<BlockId, byte[]> map = new HashMap<BlockId, byte[]>();
 
     @Override
-    protected synchronized byte[] readBlockFromBackend(byte[] blockId) {
-        return map.get(getId(blockId));
-    }
-
-    private String getId(byte[] blockId) {
-        return StringUtils.convertBytesToHex(blockId);
+    protected byte[] readBlockFromBackend(BlockId id) {
+        return map.get(id);
     }
 
     @Override
-    protected synchronized void storeBlock(byte[] blockId, int level, byte[] data) {
-        map.put(getId(blockId), data);
+    protected synchronized void storeBlock(byte[] digest, int level, byte[] data) {
+        map.put(new BlockId(digest, 0), data);
     }
 
 }

Modified: jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/blobs/DbBlobStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/blobs/DbBlobStoreTest.java?rev=1238259&r1=1238258&r2=1238259&view=diff
==============================================================================
--- jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/blobs/DbBlobStoreTest.java (original)
+++ jackrabbit/sandbox/microkernel/src/test/java/org/apache/jackrabbit/mk/blobs/DbBlobStoreTest.java Tue Jan 31 08:41:53 2012
@@ -18,6 +18,7 @@ package org.apache.jackrabbit.mk.blobs;
 
 import junit.framework.TestCase;
 import org.apache.jackrabbit.mk.api.MicroKernelException;
+import org.apache.jackrabbit.mk.fs.FileUtils;
 import org.apache.jackrabbit.mk.json.JsopBuilder;
 import org.apache.jackrabbit.mk.json.JsopTokenizer;
 import org.apache.jackrabbit.mk.util.IOUtilsTest;
@@ -29,6 +30,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.sql.Connection;
 import java.util.ArrayList;
 import java.util.Random;
@@ -59,6 +61,24 @@ public class DbBlobStoreTest extends Tes
         }
     }
 
+    public void testAddFile() throws Exception {
+        store.setBlockSize(1024 * 1024);
+        byte[] data = new byte[4 * 1024 * 1024];
+        Random r = new Random(0);
+        r.nextBytes(data);
+        String tempFileName = "target/temp/test";
+        OutputStream out = FileUtils.newOutputStream(tempFileName, false);
+        out.write(data);
+        out.close();
+        String s = store.addBlob(tempFileName);
+        assertEquals(data.length, store.getBlobLength(s));
+        byte[] buff = new byte[1];
+        for (int i = 0; i < data.length; i += 1024) {
+            store.readBlob(s, i, buff, 0, 1);
+            assertEquals(data[i], buff[0]);
+        }
+    }
+
     public void testCombinedIdentifier() throws Exception {
         String id = store.writeBlob(new ByteArrayInputStream(new byte[2]));
         assertEquals(2, store.getBlobLength(id));
@@ -104,7 +124,7 @@ public class DbBlobStoreTest extends Tes
     }
 
     public void testSmall() throws Exception {
-        doTest(10, 1000);
+        doTest(10, 300);
     }
 
     public void testMedium() throws Exception {
@@ -163,7 +183,7 @@ public class DbBlobStoreTest extends Tes
 //        DbBlobStore store = new DbBlobStore();
 //        store.setConnectionPool(JdbcConnectionPool.create("jdbc:h2:target/test;log=0;undo_log=0", "", ""));
 
-        String id = addFiles(store, "/Users/thomasm/Desktop/cq54/crx-quickstart");
+        String id = addFiles(store, "~/temp/ds");
         extractFiles(store, id, "target/test");
     }