You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by st...@apache.org on 2012/03/14 17:38:46 UTC

svn commit: r1300627 - in /jackrabbit/sandbox/jackrabbit-mongo-persistence/src: main/java/org/apache/jackrabbit/core/persistence/mongo/ test/ test/java/ test/java/org/ test/java/org/apache/ test/java/org/apache/jackrabbit/ test/java/org/apache/jackrabb...

Author: stefan
Date: Wed Mar 14 16:38:46 2012
New Revision: 1300627

URL: http://svn.apache.org/viewvc?rev=1300627&view=rev
Log:
experimental mongo-based pm & ds

Added:
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/resources/
Modified:
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java
    jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java

Added: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java?rev=1300627&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java (added)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java Wed Mar 14 16:38:46 2012
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.mongo;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but
+ * switches to file buffering once the data reaches a configurable size.
+ *
+ * Copied from {@code com.google.common.io.FileBackedOutputStream} and
+ * adapted where needed.
+ */
+public final class BufferingOutputStream extends OutputStream {
+
+    private final int fileThreshold;
+
+    private OutputStream out;
+    private MemoryOutput memory;
+    private File file;
+
+    /** ByteArrayOutputStream that exposes its internals. */
+    private static class MemoryOutput extends ByteArrayOutputStream {
+        byte[] getBuffer() {
+            return buf;
+        }
+
+        int getCount() {
+            return count;
+        }
+    }
+
+    /** Returns the file holding the data (possibly null). */
+    public File getFile() {
+        return file;
+    }
+
+    /**
+     * Creates a new instance that uses the given file threshold.
+     * Equivalent to {@code ThresholdOutputStream(fileThreshold, false)}.
+     *
+     * @param fileThreshold the number of bytes before the stream should
+     *     switch to buffering to a file
+     */
+    public BufferingOutputStream(int fileThreshold) {
+        this.fileThreshold = fileThreshold;
+        memory = new MemoryOutput();
+        out = memory;
+    }
+
+    public InputStream openStream() throws IOException {
+        if (file != null) {
+            return new FileInputStream(file);
+        } else {
+            return new ByteArrayInputStream(
+                    memory.getBuffer(), 0, memory.getCount());
+        }
+    }
+
+    /**
+     * Calls {@link #close} if not already closed, and then resets this
+     * object back to its initial state, for reuse. If data was buffered
+     * to a file, it will be deleted.
+     *
+     * @throws IOException if an I/O error occurred while deleting the file buffer
+     */
+    public void reset() throws IOException {
+        try {
+            close();
+        } finally {
+            if (memory == null) {
+                memory = new MemoryOutput();
+            } else {
+                memory.reset();
+            }
+            out = memory;
+            if (file != null) {
+                File deleteMe = file;
+                file = null;
+                if (!deleteMe.delete()) {
+                    throw new IOException("Could not delete: " + deleteMe);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        update(1);
+        out.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len)
+            throws IOException {
+        update(len);
+        out.write(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        out.close();
+    }
+
+    @Override
+    public void flush() throws IOException {
+        out.flush();
+    }
+
+    /**
+     * Checks if writing {@code len} bytes would go over threshold, and
+     * switches to file buffering if so.
+     */
+    private void update(int len) throws IOException {
+        if (file == null && (memory.getCount() + len > fileThreshold)) {
+            File temp = File.createTempFile("FileBackedOutputStream", null);
+
+            FileOutputStream transfer = new FileOutputStream(temp);
+            transfer.write(memory.getBuffer(), 0, memory.getCount());
+            transfer.flush();
+
+            // We've successfully transferred the data; switch to writing to file
+            out = transfer;
+            file = temp;
+            memory = null;
+        }
+    }
+}

Modified: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java?rev=1300627&r1=1300626&r2=1300627&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java (original)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java Wed Mar 14 16:38:46 2012
@@ -45,26 +45,25 @@ public class MongoDataRecord extends Abs
 
     @Override
     public long getLength() throws DataStoreException {
-        // TODO touch remote resource?
-/*
-        resource.put("uploadDate", new Date());
-        resource.save();
-*/
+        //touch();
         return resource.getLength();
     }
 
     @Override
     public InputStream getStream() throws DataStoreException {
-        // TODO touch remote resource?
-/*
-        resource.put("uploadDate", new Date());
-        resource.save();
-*/
-        return new BufferedInputStream(resource.getInputStream());
+        //touch();
+        return resource.getInputStream();
     }
 
     @Override
     public long getLastModified() {
         return resource.getUploadDate().getTime();
     }
+
+    private long touch() {
+        Date now = new Date();
+        resource.put("uploadDate", now);
+        resource.save();
+        return now.getTime();
+    }
 }

Modified: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java?rev=1300627&r1=1300626&r2=1300627&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java (original)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java Wed Mar 14 16:38:46 2012
@@ -37,6 +37,9 @@ import javax.jcr.RepositoryException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.UnknownHostException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Iterator;
 
@@ -278,20 +281,40 @@ public class MongoDataStore implements D
             throw new IllegalStateException("not initialized");
         }
 
-        GridFSInputFile f = fs.createFile(stream);
+        MessageDigest digest;
         try {
-            // saveChunks() computes the md5 but doesn't persist the data
-            f.saveChunks();
+            digest = MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            // should never get here...
+            throw new DataStoreException("MD5 digest not supported", e);
+        }
+
+        // spool stream to buffer and compute md5 hash
+        DigestInputStream din = new DigestInputStream(stream, digest);
+        BufferingOutputStream bout = new BufferingOutputStream(0x10000);
+
+        byte[] buf = new byte[4096];
+        int read = 0;
+        try {
+            while (-1 != (read = din.read(buf))) {
+                bout.write(buf, 0, read);
+            }
         } catch (IOException e) {
             throw new DataStoreException("failed to read from stream", e);
         }
-        String md5 = f.getMD5();
+
+        String md5 = new DataIdentifier(digest.digest()).toString();
         
         // check server before uploading new file
         GridFSDBFile dbf = fs.findOne(new BasicDBObject("md5", md5));
         if (dbf == null) {
             // persist the data
-            f.save();
+            try {
+                GridFSInputFile f = fs.createFile(bout.openStream());
+                f.save();
+            } catch (IOException e) {
+                throw new DataStoreException("failed to create data record", e);
+            }
             dbf = fs.findOne(new BasicDBObject("md5", md5));
         }
         return new MongoDataRecord(new DataIdentifier(dbf.getMD5()), dbf);

Added: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java?rev=1300627&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java (added)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java Wed Mar 14 16:38:46 2012
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.mongo;
+
+import junit.framework.TestCase;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Random;
+
+/**
+ * Test the Database Data Store.
+ */
+public class MongoDataStoreTest extends TestCase {
+
+    private MongoDataStore store = new MongoDataStore();
+
+    private byte[] data = new byte[0x100000];
+
+    private DataIdentifier identifier;
+
+    protected void setUp() throws Exception {
+        // Initialize the data store
+        store.setDbName("test");
+        store.init("");
+
+        // Initialize random test data
+        new Random(1234567890).nextBytes(data);
+
+        // Add a test record
+        identifier =
+                store.addRecord(new ByteArrayInputStream(data)).getIdentifier();
+    }
+
+    protected void tearDown() {
+
+        try {
+            store.close();
+        } catch (DataStoreException expected) {
+            // ignore
+        }
+    }
+
+    public void testGetRecord() throws Exception {
+        DataRecord record = store.getRecord(identifier);
+        assertNotNull(record);
+        assertEquals(identifier, record.getIdentifier());
+        assertEquals(data.length, record.getLength());
+
+        // read the stream twice to make sure that it can be re-read
+        for (int i = 0; i < 2; i++) {
+            InputStream stream = record.getStream();
+            try {
+                assertNotNull(stream);
+                for (int j = 0; j < data.length; j++) {
+                    assertEquals((data[j]) & 0xff, stream.read());
+                }
+                assertEquals(-1, stream.read());
+            } finally {
+                stream.close();
+            }
+        }
+    }
+
+    public void testConcurrentRead() throws Exception {
+        InputStream[] streams = new InputStream[10];
+
+        // retrieve many copies of the same record
+        for (int i = 0; i < streams.length; i++) {
+            streams[i] = store.getRecord(identifier).getStream();
+        }
+
+        // verify the contents of all the streams, reading them in parallel
+        for (int i = 0; i < data.length; i++) {
+            for (int j = 0; j < streams.length; j++) {
+                assertEquals((data[i]) & 0xff, streams[j].read());
+            }
+        }
+
+        // close all streams
+        for (int i = 0; i < streams.length; i++) {
+            assertEquals(-1, streams[i].read());
+            streams[i].close();
+        }
+    }
+
+}
\ No newline at end of file