You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by mr...@apache.org on 2016/12/14 08:48:03 UTC

svn commit: r1774141 - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java

Author: mreutegg
Date: Wed Dec 14 08:48:03 2016
New Revision: 1774141

URL: http://svn.apache.org/viewvc?rev=1774141&view=rev
Log:
OAK-5299: Introduce BlobFactory in OakDirectory

Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java?rev=1774141&r1=1774140&r2=1774141&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectory.java Wed Dec 14 08:48:03 2016
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import com.google.common.collect.ImmutableSet;
@@ -40,6 +41,7 @@ import org.apache.jackrabbit.oak.api.Typ
 import org.apache.jackrabbit.oak.commons.StringUtils;
 import org.apache.jackrabbit.oak.plugins.blob.BlobStoreBlob;
 import org.apache.jackrabbit.oak.spi.blob.BlobOptions;
+import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.util.PerfLogger;
@@ -92,20 +94,25 @@ public class OakDirectory extends Direct
     private final Set<String> fileNamesAtStart;
     private final boolean activeDeleteEnabled;
     private final String indexName;
-    @Nullable
-    private final GarbageCollectableBlobStore blobStore;
+    private final BlobFactory blobFactory;
     private volatile boolean dirty;
 
     public OakDirectory(NodeBuilder builder, IndexDefinition definition, boolean readOnly) {
-        this(builder, INDEX_DATA_CHILD_NAME, definition, readOnly, null);
+        this(builder, INDEX_DATA_CHILD_NAME, definition, readOnly);
     }
 
     public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition, boolean readOnly) {
-        this(builder, dataNodeName, definition, readOnly, null);
+        this(builder, dataNodeName, definition, readOnly, new NodeBuilderBlobFactory(builder));
     }
 
     public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
-        boolean readOnly, @Nullable GarbageCollectableBlobStore blobStore) {
+                        boolean readOnly, @Nullable GarbageCollectableBlobStore blobStore) {
+        this(builder, dataNodeName, definition, readOnly,
+                blobStore != null ? new BlobStoreBlobFactory(blobStore) : new NodeBuilderBlobFactory(builder));
+    }
+
+    public OakDirectory(NodeBuilder builder, String dataNodeName, IndexDefinition definition,
+        boolean readOnly, BlobFactory blobFactory) {
         this.lockFactory = NoLockFactory.getNoLockFactory();
         this.builder = builder;
         this.directoryBuilder = readOnly ? builder.getChildNode(dataNodeName) : builder.child(dataNodeName);
@@ -115,7 +122,7 @@ public class OakDirectory extends Direct
         this.fileNamesAtStart = ImmutableSet.copyOf(this.fileNames);
         this.activeDeleteEnabled = definition.getActiveDeleteEnabled();
         this.indexName = definition.getIndexName();
-        this.blobStore =  blobStore;
+        this.blobFactory = blobFactory;
     }
 
     @Override
@@ -161,7 +168,7 @@ public class OakDirectory extends Direct
     @Override
     public long fileLength(String name) throws IOException {
         NodeBuilder file = directoryBuilder.getChildNode(name);
-        OakIndexInput input = new OakIndexInput(name, file, indexName, blobStore);
+        OakIndexInput input = new OakIndexInput(name, file, indexName, blobFactory);
         try {
             return input.length();
         } finally {
@@ -186,7 +193,7 @@ public class OakDirectory extends Direct
         }
         fileNames.add(name);
         markDirty();
-        return new OakIndexOutput(name, file, indexName, blobStore);
+        return new OakIndexOutput(name, file, indexName, blobFactory);
     }
 
 
@@ -195,7 +202,7 @@ public class OakDirectory extends Direct
             throws IOException {
         NodeBuilder file = directoryBuilder.getChildNode(name);
         if (file.exists()) {
-            return new OakIndexInput(name, file, indexName, blobStore);
+            return new OakIndexInput(name, file, indexName, blobFactory);
         } else {
             String msg = String.format("[%s] %s", indexName, name);
             throw new FileNotFoundException(msg);
@@ -340,17 +347,17 @@ public class OakDirectory extends Direct
 
         private final String dirDetails;
 
-        private final GarbageCollectableBlobStore blobStore;
+        private final BlobFactory blobFactory;
 
         public OakIndexFile(String name, NodeBuilder file, String dirDetails,
-            @Nullable GarbageCollectableBlobStore blobStore) {
+            @Nonnull BlobFactory blobFactory) {
             this.name = name;
             this.file = file;
             this.dirDetails = dirDetails;
             this.blobSize = determineBlobSize(file);
             this.uniqueKey = readUniqueKey(file);
             this.blob = new byte[blobSize];
-            this.blobStore = blobStore;
+            this.blobFactory = checkNotNull(blobFactory);
 
             PropertyState property = file.getProperty(JCR_DATA);
             if (property != null && property.getType() == BINARIES) {
@@ -381,7 +388,7 @@ public class OakDirectory extends Direct
             this.length = that.length;
             this.data = newArrayList(that.data);
             this.dataModified = that.dataModified;
-            this.blobStore = that.blobStore;
+            this.blobFactory = that.blobFactory;
         }
 
         private void loadBlob(int i) throws IOException {
@@ -410,7 +417,7 @@ public class OakDirectory extends Direct
                             new ByteArrayInputStream(uniqueKey));
                 }
 
-                Blob b = writeBlob(in);
+                Blob b = blobFactory.createBlob(in);
                 if (index < data.size()) {
                     data.set(index, b);
                 } else {
@@ -422,25 +429,6 @@ public class OakDirectory extends Direct
             }
         }
 
-        /**
-         * Writes the blob to the blobstore directly if available.
-         *
-         * @param in input stream
-         * @return
-         * @throws IOException
-         */
-        private Blob writeBlob(InputStream in) throws IOException {
-            if (blobStore != null) {
-                if (!ENABLE_AYNC_DS) {
-                    return new BlobStoreBlob(blobStore,
-                        blobStore.writeBlob(in, new BlobOptions().setUpload(SYNCHRONOUS)));
-                } else {
-                    return new BlobStoreBlob(blobStore, blobStore.writeBlob(in));
-                }
-            }
-            return file.createBlob(in);
-        }
-
         public void seek(long pos) throws IOException {
             // seek() may be called with pos == length
             // see https://issues.apache.org/jira/browse/LUCENE-1196
@@ -558,10 +546,10 @@ public class OakDirectory extends Direct
         private final String dirDetails;
 
         public OakIndexInput(String name, NodeBuilder file, String dirDetails,
-            @Nullable GarbageCollectableBlobStore blobStore) {
+            BlobFactory blobFactory) {
             super(name);
             this.dirDetails = dirDetails;
-            this.file = new OakIndexFile(name, file, dirDetails, blobStore);
+            this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
             clones = WeakIdentityMap.newConcurrentHashMap();
         }
 
@@ -641,9 +629,10 @@ public class OakDirectory extends Direct
         private final String dirDetails;
         private final OakIndexFile file;
 
-        public OakIndexOutput(String name, NodeBuilder file, String dirDetails, GarbageCollectableBlobStore blobStore) throws IOException {
+        public OakIndexOutput(String name, NodeBuilder file, String dirDetails,
+                              BlobFactory blobFactory) throws IOException {
             this.dirDetails = dirDetails;
-            this.file = new OakIndexFile(name, file, dirDetails, blobStore);
+            this.file = new OakIndexFile(name, file, dirDetails, blobFactory);
         }
 
         @Override
@@ -699,4 +688,42 @@ public class OakDirectory extends Direct
 
     }
 
+    public interface BlobFactory {
+
+        Blob createBlob(InputStream in) throws IOException;
+    }
+
+    public static final class NodeBuilderBlobFactory implements BlobFactory {
+
+        private final NodeBuilder builder;
+
+        public NodeBuilderBlobFactory(NodeBuilder builder) {
+            this.builder = builder;
+        }
+
+        @Override
+        public Blob createBlob(InputStream in) throws IOException {
+            return builder.createBlob(in);
+        }
+    }
+
+    public static final class BlobStoreBlobFactory implements BlobFactory {
+
+        private final BlobStore store;
+
+        public BlobStoreBlobFactory(BlobStore store) {
+            this.store = store;
+        }
+
+        @Override
+        public Blob createBlob(InputStream in) throws IOException {
+            String blobId;
+            if (!ENABLE_AYNC_DS) {
+                blobId = store.writeBlob(in, new BlobOptions().setUpload(SYNCHRONOUS));
+            } else {
+                blobId = store.writeBlob(in);
+            }
+            return new BlobStoreBlob(store, blobId);
+        }
+    }
 }

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java?rev=1774141&r1=1774140&r2=1774141&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/OakDirectoryTest.java Wed Dec 14 08:48:03 2016
@@ -27,6 +27,7 @@ import static org.apache.jackrabbit.JcrC
 import static org.apache.jackrabbit.oak.api.Type.BINARIES;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.INDEX_DATA_CHILD_NAME;
 import static org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.PROP_BLOB_SIZE;
+import static org.apache.jackrabbit.oak.plugins.index.lucene.OakDirectory.UNIQUE_KEY_SIZE;
 import static org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
@@ -36,6 +37,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -44,12 +46,12 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.NullInputStream;
 import org.apache.jackrabbit.oak.api.Blob;
 import org.apache.jackrabbit.oak.api.Type;
-import org.apache.jackrabbit.oak.plugins.index.IndexConstants;
 import org.apache.jackrabbit.oak.plugins.memory.ArrayBasedBlob;
 import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState;
 import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
@@ -188,7 +190,7 @@ public class OakDirectoryTest {
         assertEquals(blobSize, testNode.getProperty(PROP_BLOB_SIZE).getValue(Type.LONG).longValue());
 
         List<Blob> blobs = newArrayList(testNode.getProperty(JCR_DATA).getValue(BINARIES));
-        assertEquals(blobSize + OakDirectory.UNIQUE_KEY_SIZE, blobs.get(0).length());
+        assertEquals(blobSize + UNIQUE_KEY_SIZE, blobs.get(0).length());
 
         return data;
     }
@@ -456,6 +458,29 @@ public class OakDirectoryTest {
         dir.close();
     }
 
+    @Test
+    public void blobFactory() throws Exception {
+        final AtomicInteger numBlobs = new AtomicInteger();
+        final int fileSize = 1024;
+        IndexDefinition def = new IndexDefinition(root, builder.getNodeState(), "/foo");
+        OakDirectory.BlobFactory factory = new OakDirectory.BlobFactory() {
+            @Override
+            public Blob createBlob(InputStream in) throws IOException {
+                ByteArrayOutputStream out = new ByteArrayOutputStream();
+                IOUtils.copy(in, out);
+                byte[] data = out.toByteArray();
+                assertEquals(fileSize + UNIQUE_KEY_SIZE, data.length);
+                numBlobs.incrementAndGet();
+                return new ArrayBasedBlob(data);
+            }
+        };
+        OakDirectory dir = new OakDirectory(builder, INDEX_DATA_CHILD_NAME, def, false, factory);
+        numBlobs.set(0);
+        writeFile(dir, "file", fileSize);
+        assertEquals(1, numBlobs.get());
+        dir.close();
+    }
+
     private static void readInputToEnd(long expectedSize, IndexInput input) throws IOException {
         int COPY_BUFFER_SIZE = 16384;
         byte[] copyBuffer = new byte[(int) ONE_MB];