You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/03 14:02:41 UTC

svn commit: r1689003 - in /jackrabbit/oak/trunk/oak-lucene/src: main/java/org/apache/jackrabbit/oak/plugins/index/lucene/ test/java/org/apache/jackrabbit/oak/plugins/index/lucene/

Author: chetanm
Date: Fri Jul  3 12:02:41 2015
New Revision: 1689003

URL: http://svn.apache.org/r1689003
Log:
OAK-3069 - Provide option to eagerly copy the new index files in CopyOnRead
OAK-3068 - Lucene IndexCopier improved logs around the CopyOnRead feature

IndexCopier provides a prefetch option to copy files before opening the index. This is then controlled via OSGi config. It also includes affected parts from OAK-3068 by Alex

Modified:
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
    jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
    jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java Fri Jul  3 12:02:41 2015
@@ -21,12 +21,17 @@ package org.apache.jackrabbit.oak.plugin
 
 import javax.management.openmbean.TabularData;
 
+import aQute.bnd.annotation.ProviderType;
+
 @SuppressWarnings("UnusedDeclaration")
+@ProviderType
 public interface CopyOnReadStatsMBean {
     String TYPE = "IndexCopierStats";
 
     TabularData getIndexPathMapping();
 
+    boolean isPrefetchEnabled();
+
     int getReaderLocalReadCount();
 
     int getReaderRemoteReadCount();

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Fri Jul  3 12:02:41 2015
@@ -111,16 +111,23 @@ public class IndexCopier implements Copy
     private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
     private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
     private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
+    private final boolean prefetchEnabled;
 
     public IndexCopier(Executor executor, File indexRootDir) throws IOException {
+        this(executor, indexRootDir, false);
+    }
+
+    public IndexCopier(Executor executor, File indexRootDir, boolean prefetchEnabled) throws IOException {
         this.executor = executor;
         this.indexRootDir = indexRootDir;
+        this.prefetchEnabled = prefetchEnabled;
         this.indexWorkDir = initializerWorkDir(indexRootDir);
     }
 
-    public Directory wrapForRead(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
+    public Directory wrapForRead(String indexPath, IndexDefinition definition,
+            Directory remote) throws IOException {
         Directory local = createLocalDirForIndexReader(indexPath, definition);
-        return new CopyOnReadDirectory(remote, local);
+        return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath);
     }
 
     public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException {
@@ -241,6 +248,7 @@ public class IndexCopier implements Copy
     private class CopyOnReadDirectory extends FilterDirectory {
         private final Directory remote;
         private final Directory local;
+        private final String indexPath;
 
         private final ConcurrentMap<String, CORFileReference> files = newConcurrentMap();
         /**
@@ -249,11 +257,15 @@ public class IndexCopier implements Copy
          */
         private final Set<String> localFileNames = Sets.newConcurrentHashSet();
 
-        public CopyOnReadDirectory(Directory remote, Directory local) throws IOException {
+        public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch, String indexPath) throws IOException {
             super(remote);
             this.remote = remote;
             this.local = local;
+            this.indexPath = indexPath;
             this.localFileNames.addAll(Arrays.asList(local.listAll()));
+            if (prefetch) {
+                prefetchIndexFiles();
+            }
         }
 
         @Override
@@ -269,15 +281,20 @@ public class IndexCopier implements Copy
         @Override
         public IndexInput openInput(String name, IOContext context) throws IOException {
             if (REMOTE_ONLY.contains(name)) {
+                log.trace("[{}] opening remote only file {}", indexPath, name);
                 return remote.openInput(name, context);
             }
 
             CORFileReference ref = files.get(name);
             if (ref != null) {
                 if (ref.isLocalValid()) {
+                    log.trace("[{}] opening existing local file {}", indexPath, name);
                     return files.get(name).openLocalInput(context);
                 } else {
                     readerRemoteReadCount.incrementAndGet();
+                    log.trace(
+                            "[{}] opening existing remote file as local version is not valid {}",
+                            indexPath, name);
                     return remote.openInput(name, context);
                 }
             }
@@ -285,14 +302,17 @@ public class IndexCopier implements Copy
             CORFileReference toPut = new CORFileReference(name);
             CORFileReference old = files.putIfAbsent(name, toPut);
             if (old == null) {
+                log.trace("[{}] scheduled local copy for {}", indexPath, name);
                 copy(toPut);
             }
 
             //If immediate executor is used the result would be ready right away
             if (toPut.isLocalValid()) {
+                log.trace("[{}] opening new local file {}", indexPath, name);
                 return toPut.openLocalInput(context);
             }
 
+            log.trace("[{}] opening new remote file {}", indexPath, name);
             readerRemoteReadCount.incrementAndGet();
             return remote.openInput(name, context);
         }
@@ -302,55 +322,89 @@ public class IndexCopier implements Copy
             executor.execute(new Runnable() {
                 @Override
                 public void run() {
-                    String name = reference.name;
-                    boolean success = false;
-                    boolean copyAttempted = false;
+                    scheduledForCopyCount.decrementAndGet();
+                    copyFilesToLocal(reference, true);
+                }
+            });
+        }
+
+        private void prefetchIndexFiles() throws IOException {
+            long start = PERF_LOGGER.start();
+            long totalSize = 0;
+            int copyCount = 0;
+            for (String name : remote.listAll()) {
+                if (REMOTE_ONLY.contains(name)) {
+                    continue;
+                }
+                CORFileReference fileRef = new CORFileReference(name);
+                files.putIfAbsent(name, fileRef);
+                long fileSize = copyFilesToLocal(fileRef, false);
+                if (fileSize > 0) {
+                    copyCount++;
+                    totalSize += fileSize;
+                }
+            }
+            PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}", indexPath, copyCount, humanReadableByteCount(totalSize));
+        }
+
+        private long copyFilesToLocal(CORFileReference reference, boolean logDuration) {
+            String name = reference.name;
+            boolean success = false;
+            boolean copyAttempted = false;
+            long fileSize = 0;
+            try {
+                if (!local.fileExists(name)) {
+                    long perfStart = -1;
+                    if (logDuration) {
+                        perfStart = PERF_LOGGER.start();
+                    }
+
+                    fileSize = remote.fileLength(name);
+                    LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
+                    long start = startCopy(file);
+                    copyAttempted = true;
+
+                    remote.copy(local, name, name, IOContext.READ);
+                    reference.markValid();
+
+                    doneCopy(file, start);
+                    if (logDuration) {
+                        PERF_LOGGER.end(perfStart, 0,
+                                "[{}] Copied file {} of size {}", indexPath,
+                                name, humanReadableByteCount(fileSize));
+                    }
+                } else {
+                    long localLength = local.fileLength(name);
+                    long remoteLength = remote.fileLength(name);
+
+                    //Do a simple consistency check. Ideally Lucene index files are never
+                    //updated but still do a check if the copy is consistent
+                    if (localLength != remoteLength) {
+                        log.warn("[{}] Found local copy for {} in {} but size of local {} differs from remote {}. " +
+                                        "Content would be read from remote file only",
+                                indexPath, name, local, localLength, remoteLength);
+                        invalidFileCount.incrementAndGet();
+                    } else {
+                        reference.markValid();
+                    }
+                }
+                success = true;
+            } catch (IOException e) {
+                //TODO In case of exception there would not be any other attempt
+                //to download the file. Look into support for retry
+                log.warn("[{}] Error occurred while copying file [{}] from {} to {}", indexPath, name, remote, local, e);
+            } finally {
+                if (copyAttempted && !success){
                     try {
-                        scheduledForCopyCount.decrementAndGet();
-                        if (!local.fileExists(name)) {
-                            long fileSize = remote.fileLength(name);
-                            LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
-                            long start = startCopy(file);
-                            copyAttempted = true;
-
-                            remote.copy(local, name, name, IOContext.READ);
-                            reference.markValid();
-
-                            doneCopy(file, start);
-                        } else {
-                            long localLength = local.fileLength(name);
-                            long remoteLength = remote.fileLength(name);
-
-                            //Do a simple consistency check. Ideally Lucene index files are never
-                            //updated but still do a check if the copy is consistent
-                            if (localLength != remoteLength) {
-                                log.warn("Found local copy for {} in {} but size of local {} differs from remote {}. " +
-                                                "Content would be read from remote file only",
-                                        name, local, localLength, remoteLength);
-                                invalidFileCount.incrementAndGet();
-                            } else {
-                                reference.markValid();
-                            }
+                        if (local.fileExists(name)) {
+                            local.deleteFile(name);
                         }
-                        success = true;
                     } catch (IOException e) {
-                        //TODO In case of exception there would not be any other attempt
-                        //to download the file. Look into support for retry
-                        log.warn("Error occurred while copying file [{}] " +
-                                "from {} to {}", name, remote, local, e);
-                    } finally {
-                        if (copyAttempted && !success){
-                            try {
-                                if (local.fileExists(name)) {
-                                    local.deleteFile(name);
-                                }
-                            } catch (IOException e) {
-                                log.warn("Error occurred while deleting corrupted file [{}] from [{}]", name, local, e);
-                            }
-                        }
+                        log.warn("[{}] Error occurred while deleting corrupted file [{}] from [{}]", indexPath, name, local, e);
                     }
                 }
-            });
+            }
+            return fileSize;
         }
 
         /**
@@ -381,8 +435,9 @@ public class IndexCopier implements Copy
                     try{
                         removeDeletedFiles();
                     } catch (IOException e) {
-                        log.warn("Error occurred while removing deleted files from Local {}, " +
-                                "Remote {}", local, remote, e);
+                        log.warn(
+                                "[{}] Error occurred while removing deleted files from Local {}, Remote {}",
+                                indexPath, local, remote, e);
                     }
 
                     try {
@@ -392,7 +447,9 @@ public class IndexCopier implements Copy
                         local.close();
                         remote.close();
                     } catch (IOException e) {
-                        log.warn("Error occurred while closing directory ", e);
+                        log.warn(
+                                "[{}] Error occurred while closing directory ",
+                                indexPath, e);
                     }
                 }
             });
@@ -1068,6 +1125,11 @@ public class IndexCopier implements Copy
     }
 
     @Override
+    public boolean isPrefetchEnabled() {
+        return prefetchEnabled;
+    }
+
+    @Override
     public int getReaderLocalReadCount() {
         return readerLocalReadCount.get();
     }

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Fri Jul  3 12:02:41 2015
@@ -128,6 +128,15 @@ public class LuceneIndexProviderService
     )
     private static final String PROP_THREAD_POOL_SIZE = "threadPoolSize";
 
+    private static final boolean PROP_PREFETCH_INDEX_FILES_DEFAULT = false;
+    @Property(
+            boolValue = PROP_PREFETCH_INDEX_FILES_DEFAULT,
+            label = "Prefetch Index Files",
+            description = "Prefetch the index files when CopyOnRead is enabled. When enabled all new Lucene" +
+                    " index files would be copied locally before the index is made available to QueryEngine"
+    )
+    private static final String PROP_PREFETCH_INDEX_FILES = "prefetchIndexFiles";
+
     private Whiteboard whiteboard;
 
     private BackgroundObserver backgroundObserver;
@@ -192,6 +201,10 @@ public class LuceneIndexProviderService
         InfoStream.setDefault(InfoStream.NO_OUTPUT);
     }
 
+    IndexCopier getIndexCopier() {
+        return indexCopier;
+    }
+
     private void initialize(){
         if(indexProvider == null){
             return;
@@ -242,6 +255,8 @@ public class LuceneIndexProviderService
             return;
         }
         String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
+        boolean prefetchEnabled = PropertiesUtil.toBoolean(config.get(PROP_PREFETCH_INDEX_FILES),
+                PROP_PREFETCH_INDEX_FILES_DEFAULT);
         if (Strings.isNullOrEmpty(indexDirPath)) {
             String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
             if (repoHome != null){
@@ -252,8 +267,12 @@ public class LuceneIndexProviderService
         checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
                 "directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
 
+        if (prefetchEnabled){
+            log.info("Prefetching of index files enabled. Index would be opened after copying all new files locally");
+        }
+
         indexDir = new File(indexDirPath);
-        indexCopier = new IndexCopier(getExecutorService(), indexDir);
+        indexCopier = new IndexCopier(getExecutorService(), indexDir, prefetchEnabled);
 
         oakRegs.add(registerMBean(whiteboard,
                 CopyOnReadStatsMBean.class,

Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java Fri Jul  3 12:02:41 2015
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-@Version("2.0.0")
+@Version("2.1.0")
 @Export(optional = "provide:=true")
 package org.apache.jackrabbit.oak.plugins.index.lucene;
 

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Fri Jul  3 12:02:41 2015
@@ -111,6 +111,33 @@ public class IndexCopierTest {
     }
 
     @Test
+    public void basicTestWithPrefetch() throws Exception{
+        Directory baseDir = new RAMDirectory();
+        IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+        IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir(), true);
+
+        Directory remote = new RAMDirectory();
+
+        byte[] t1 = writeFile(remote, "t1");
+        byte[] t2 = writeFile(remote , "t2");
+
+        Directory wrapped = c1.wrapForRead("/foo", defn, remote);
+        assertEquals(2, wrapped.listAll().length);
+
+        assertTrue(wrapped.fileExists("t1"));
+        assertTrue(wrapped.fileExists("t2"));
+
+        assertTrue(baseDir.fileExists("t1"));
+        assertTrue(baseDir.fileExists("t2"));
+
+        assertEquals(t1.length, wrapped.fileLength("t1"));
+        assertEquals(t2.length, wrapped.fileLength("t2"));
+
+        readAndAssert(wrapped, "t1", t1);
+
+    }
+
+    @Test
     public void basicTestWithFS() throws Exception{
         IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
         IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
@@ -849,11 +876,16 @@ public class IndexCopierTest {
     private class RAMIndexCopier extends IndexCopier {
         final Directory baseDir;
 
-        public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) throws IOException {
-            super(executor, indexRootDir);
+        public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir,
+                              boolean prefetchEnabled) throws IOException {
+            super(executor, indexRootDir, prefetchEnabled);
             this.baseDir = baseDir;
         }
 
+        public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) throws IOException {
+            this(baseDir, executor, indexRootDir, false);
+        }
+
         @Override
         protected Directory createLocalDirForIndexReader(String indexPath, IndexDefinition definition) throws IOException {
             return baseDir;

Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java Fri Jul  3 12:02:41 2015
@@ -34,11 +34,17 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class LuceneIndexProviderServiceTest {
+    /*
+        The test case uses raw config name and not access it via
+         constants in LuceneIndexProviderService to ensure that change
+         in names are detected
+     */
 
     @Rule
     public final TemporaryFolder folder = new TemporaryFolder();
@@ -60,6 +66,10 @@ public class LuceneIndexProviderServiceT
                 (LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
         assertNull(editorProvider.getIndexCopier());
 
+        IndexCopier indexCopier = service.getIndexCopier();
+        assertNotNull("IndexCopier should be initialized as CopyOnRead is enabled by default", indexCopier);
+        assertFalse(indexCopier.isPrefetchEnabled());
+
         assertNotNull("CopyOnRead should be enabled by default", context.getService(CopyOnReadStatsMBean.class));
 
         assertTrue(context.getService(Observer.class) instanceof BackgroundObserver);
@@ -93,6 +103,18 @@ public class LuceneIndexProviderServiceT
 
         MockOsgi.deactivate(service);
     }
+
+    @Test
+    public void enablePrefetchIndexFiles() throws Exception{
+        Map<String,Object> config = getDefaultConfig();
+        config.put("prefetchIndexFiles", true);
+        MockOsgi.activate(service, context.bundleContext(), config);
+
+        IndexCopier indexCopier = service.getIndexCopier();
+        assertTrue(indexCopier.isPrefetchEnabled());
+
+        MockOsgi.deactivate(service);
+    }
 
     @Test
     public void debugLogging() throws Exception{