You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by st...@apache.org on 2012/03/14 17:38:46 UTC
svn commit: r1300627 - in
/jackrabbit/sandbox/jackrabbit-mongo-persistence/src:
main/java/org/apache/jackrabbit/core/persistence/mongo/ test/ test/java/
test/java/org/ test/java/org/apache/ test/java/org/apache/jackrabbit/
test/java/org/apache/jackrabb...
Author: stefan
Date: Wed Mar 14 16:38:46 2012
New Revision: 1300627
URL: http://svn.apache.org/viewvc?rev=1300627&view=rev
Log:
experimental mongo-based pm & ds
Added:
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/resources/
Modified:
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java
jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java
Added: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java?rev=1300627&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java (added)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/BufferingOutputStream.java Wed Mar 14 16:38:46 2012
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.mongo;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * An {@link OutputStream} that starts buffering to a byte array, but
+ * switches to file buffering once the data reaches a configurable size.
+ *
+ * Copied from {@code com.google.common.io.FileBackedOutputStream} and
+ * adapted where needed.
+ */
+public final class BufferingOutputStream extends OutputStream {
+
+ private final int fileThreshold;
+
+ private OutputStream out;
+ private MemoryOutput memory;
+ private File file;
+
+ /** ByteArrayOutputStream that exposes its internals. */
+ private static class MemoryOutput extends ByteArrayOutputStream {
+ byte[] getBuffer() {
+ return buf;
+ }
+
+ int getCount() {
+ return count;
+ }
+ }
+
+ /** Returns the file holding the data (possibly null). */
+ public File getFile() {
+ return file;
+ }
+
+ /**
+ * Creates a new instance that uses the given file threshold.
+ * Equivalent to {@code ThresholdOutputStream(fileThreshold, false)}.
+ *
+ * @param fileThreshold the number of bytes before the stream should
+ * switch to buffering to a file
+ */
+ public BufferingOutputStream(int fileThreshold) {
+ this.fileThreshold = fileThreshold;
+ memory = new MemoryOutput();
+ out = memory;
+ }
+
+ public InputStream openStream() throws IOException {
+ if (file != null) {
+ return new FileInputStream(file);
+ } else {
+ return new ByteArrayInputStream(
+ memory.getBuffer(), 0, memory.getCount());
+ }
+ }
+
+ /**
+ * Calls {@link #close} if not already closed, and then resets this
+ * object back to its initial state, for reuse. If data was buffered
+ * to a file, it will be deleted.
+ *
+ * @throws IOException if an I/O error occurred while deleting the file buffer
+ */
+ public void reset() throws IOException {
+ try {
+ close();
+ } finally {
+ if (memory == null) {
+ memory = new MemoryOutput();
+ } else {
+ memory.reset();
+ }
+ out = memory;
+ if (file != null) {
+ File deleteMe = file;
+ file = null;
+ if (!deleteMe.delete()) {
+ throw new IOException("Could not delete: " + deleteMe);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ update(1);
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ throws IOException {
+ update(len);
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ /**
+ * Checks if writing {@code len} bytes would go over threshold, and
+ * switches to file buffering if so.
+ */
+ private void update(int len) throws IOException {
+ if (file == null && (memory.getCount() + len > fileThreshold)) {
+ File temp = File.createTempFile("FileBackedOutputStream", null);
+
+ FileOutputStream transfer = new FileOutputStream(temp);
+ transfer.write(memory.getBuffer(), 0, memory.getCount());
+ transfer.flush();
+
+ // We've successfully transferred the data; switch to writing to file
+ out = transfer;
+ file = temp;
+ memory = null;
+ }
+ }
+}
Modified: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java?rev=1300627&r1=1300626&r2=1300627&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java (original)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataRecord.java Wed Mar 14 16:38:46 2012
@@ -45,26 +45,25 @@ public class MongoDataRecord extends Abs
@Override
public long getLength() throws DataStoreException {
- // TODO touch remote resource?
-/*
- resource.put("uploadDate", new Date());
- resource.save();
-*/
+ //touch();
return resource.getLength();
}
@Override
public InputStream getStream() throws DataStoreException {
- // TODO touch remote resource?
-/*
- resource.put("uploadDate", new Date());
- resource.save();
-*/
- return new BufferedInputStream(resource.getInputStream());
+ //touch();
+ return resource.getInputStream();
}
@Override
public long getLastModified() {
return resource.getUploadDate().getTime();
}
+
+ private long touch() {
+ Date now = new Date();
+ resource.put("uploadDate", now);
+ resource.save();
+ return now.getTime();
+ }
}
Modified: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java?rev=1300627&r1=1300626&r2=1300627&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java (original)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/main/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStore.java Wed Mar 14 16:38:46 2012
@@ -37,6 +37,9 @@ import javax.jcr.RepositoryException;
import java.io.IOException;
import java.io.InputStream;
import java.net.UnknownHostException;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -278,20 +281,40 @@ public class MongoDataStore implements D
throw new IllegalStateException("not initialized");
}
- GridFSInputFile f = fs.createFile(stream);
+ MessageDigest digest;
try {
- // saveChunks() computes the md5 but doesn't persist the data
- f.saveChunks();
+ digest = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ // should never get here...
+ throw new DataStoreException("MD5 digest not supported", e);
+ }
+
+ // spool stream to buffer and compute md5 hash
+ DigestInputStream din = new DigestInputStream(stream, digest);
+ BufferingOutputStream bout = new BufferingOutputStream(0x10000);
+
+ byte[] buf = new byte[4096];
+ int read = 0;
+ try {
+ while (-1 != (read = din.read(buf))) {
+ bout.write(buf, 0, read);
+ }
} catch (IOException e) {
throw new DataStoreException("failed to read from stream", e);
}
- String md5 = f.getMD5();
+
+ String md5 = new DataIdentifier(digest.digest()).toString();
// check server before uploading new file
GridFSDBFile dbf = fs.findOne(new BasicDBObject("md5", md5));
if (dbf == null) {
// persist the data
- f.save();
+ try {
+ GridFSInputFile f = fs.createFile(bout.openStream());
+ f.save();
+ } catch (IOException e) {
+ throw new DataStoreException("failed to create data record", e);
+ }
dbf = fs.findOne(new BasicDBObject("md5", md5));
}
return new MongoDataRecord(new DataIdentifier(dbf.getMD5()), dbf);
Added: jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java?rev=1300627&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java (added)
+++ jackrabbit/sandbox/jackrabbit-mongo-persistence/src/test/java/org/apache/jackrabbit/core/persistence/mongo/MongoDataStoreTest.java Wed Mar 14 16:38:46 2012
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.core.persistence.mongo;
+
+import junit.framework.TestCase;
+import org.apache.jackrabbit.core.data.DataIdentifier;
+import org.apache.jackrabbit.core.data.DataRecord;
+import org.apache.jackrabbit.core.data.DataStoreException;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Random;
+
+/**
+ * Test the Database Data Store.
+ */
+public class MongoDataStoreTest extends TestCase {
+
+ private MongoDataStore store = new MongoDataStore();
+
+ private byte[] data = new byte[0x100000];
+
+ private DataIdentifier identifier;
+
+ protected void setUp() throws Exception {
+ // Initialize the data store
+ store.setDbName("test");
+ store.init("");
+
+ // Initialize random test data
+ new Random(1234567890).nextBytes(data);
+
+ // Add a test record
+ identifier =
+ store.addRecord(new ByteArrayInputStream(data)).getIdentifier();
+ }
+
+ protected void tearDown() {
+
+ try {
+ store.close();
+ } catch (DataStoreException expected) {
+ // ignore
+ }
+ }
+
+ public void testGetRecord() throws Exception {
+ DataRecord record = store.getRecord(identifier);
+ assertNotNull(record);
+ assertEquals(identifier, record.getIdentifier());
+ assertEquals(data.length, record.getLength());
+
+ // read the stream twice to make sure that it can be re-read
+ for (int i = 0; i < 2; i++) {
+ InputStream stream = record.getStream();
+ try {
+ assertNotNull(stream);
+ for (int j = 0; j < data.length; j++) {
+ assertEquals((data[j]) & 0xff, stream.read());
+ }
+ assertEquals(-1, stream.read());
+ } finally {
+ stream.close();
+ }
+ }
+ }
+
+ public void testConcurrentRead() throws Exception {
+ InputStream[] streams = new InputStream[10];
+
+ // retrieve many copies of the same record
+ for (int i = 0; i < streams.length; i++) {
+ streams[i] = store.getRecord(identifier).getStream();
+ }
+
+ // verify the contents of all the streams, reading them in parallel
+ for (int i = 0; i < data.length; i++) {
+ for (int j = 0; j < streams.length; j++) {
+ assertEquals((data[i]) & 0xff, streams[j].read());
+ }
+ }
+
+ // close all streams
+ for (int i = 0; i < streams.length; i++) {
+ assertEquals(-1, streams[i].read());
+ streams[i].close();
+ }
+ }
+
+}
\ No newline at end of file