You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ch...@apache.org on 2015/07/03 14:02:41 UTC
svn commit: r1689003 - in /jackrabbit/oak/trunk/oak-lucene/src:
main/java/org/apache/jackrabbit/oak/plugins/index/lucene/
test/java/org/apache/jackrabbit/oak/plugins/index/lucene/
Author: chetanm
Date: Fri Jul 3 12:02:41 2015
New Revision: 1689003
URL: http://svn.apache.org/r1689003
Log:
OAK-3069 - Provide option to eagerly copy the new index files in CopyOnRead
OAK-3068 - Lucene IndexCopier improved logs around the CopyOnRead feature
IndexCopier provides a prefetch option to copy files before opening the index. This is then controlled via OSGi config. It also includes affected parts from OAK-3068 by Alex
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java Fri Jul 3 12:02:41 2015
@@ -21,12 +21,17 @@ package org.apache.jackrabbit.oak.plugin
import javax.management.openmbean.TabularData;
+import aQute.bnd.annotation.ProviderType;
+
@SuppressWarnings("UnusedDeclaration")
+@ProviderType
public interface CopyOnReadStatsMBean {
String TYPE = "IndexCopierStats";
TabularData getIndexPathMapping();
+ boolean isPrefetchEnabled();
+
int getReaderLocalReadCount();
int getReaderRemoteReadCount();
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java Fri Jul 3 12:02:41 2015
@@ -111,16 +111,23 @@ public class IndexCopier implements Copy
private final Map<String, String> indexPathVersionMapping = newConcurrentMap();
private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = newConcurrentMap();
private final Set<LocalIndexFile> copyInProgressFiles = Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
+ private final boolean prefetchEnabled;
public IndexCopier(Executor executor, File indexRootDir) throws IOException {
+ this(executor, indexRootDir, false);
+ }
+
+ public IndexCopier(Executor executor, File indexRootDir, boolean prefetchEnabled) throws IOException {
this.executor = executor;
this.indexRootDir = indexRootDir;
+ this.prefetchEnabled = prefetchEnabled;
this.indexWorkDir = initializerWorkDir(indexRootDir);
}
- public Directory wrapForRead(String indexPath, IndexDefinition definition, Directory remote) throws IOException {
+ public Directory wrapForRead(String indexPath, IndexDefinition definition,
+ Directory remote) throws IOException {
Directory local = createLocalDirForIndexReader(indexPath, definition);
- return new CopyOnReadDirectory(remote, local);
+ return new CopyOnReadDirectory(remote, local, prefetchEnabled, indexPath);
}
public Directory wrapForWrite(IndexDefinition definition, Directory remote, boolean reindexMode) throws IOException {
@@ -241,6 +248,7 @@ public class IndexCopier implements Copy
private class CopyOnReadDirectory extends FilterDirectory {
private final Directory remote;
private final Directory local;
+ private final String indexPath;
private final ConcurrentMap<String, CORFileReference> files = newConcurrentMap();
/**
@@ -249,11 +257,15 @@ public class IndexCopier implements Copy
*/
private final Set<String> localFileNames = Sets.newConcurrentHashSet();
- public CopyOnReadDirectory(Directory remote, Directory local) throws IOException {
+ public CopyOnReadDirectory(Directory remote, Directory local, boolean prefetch, String indexPath) throws IOException {
super(remote);
this.remote = remote;
this.local = local;
+ this.indexPath = indexPath;
this.localFileNames.addAll(Arrays.asList(local.listAll()));
+ if (prefetch) {
+ prefetchIndexFiles();
+ }
}
@Override
@@ -269,15 +281,20 @@ public class IndexCopier implements Copy
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (REMOTE_ONLY.contains(name)) {
+ log.trace("[{}] opening remote only file {}", indexPath, name);
return remote.openInput(name, context);
}
CORFileReference ref = files.get(name);
if (ref != null) {
if (ref.isLocalValid()) {
+ log.trace("[{}] opening existing local file {}", indexPath, name);
return files.get(name).openLocalInput(context);
} else {
readerRemoteReadCount.incrementAndGet();
+ log.trace(
+ "[{}] opening existing remote file as local version is not valid {}",
+ indexPath, name);
return remote.openInput(name, context);
}
}
@@ -285,14 +302,17 @@ public class IndexCopier implements Copy
CORFileReference toPut = new CORFileReference(name);
CORFileReference old = files.putIfAbsent(name, toPut);
if (old == null) {
+ log.trace("[{}] scheduled local copy for {}", indexPath, name);
copy(toPut);
}
//If immediate executor is used the result would be ready right away
if (toPut.isLocalValid()) {
+ log.trace("[{}] opening new local file {}", indexPath, name);
return toPut.openLocalInput(context);
}
+ log.trace("[{}] opening new remote file {}", indexPath, name);
readerRemoteReadCount.incrementAndGet();
return remote.openInput(name, context);
}
@@ -302,55 +322,89 @@ public class IndexCopier implements Copy
executor.execute(new Runnable() {
@Override
public void run() {
- String name = reference.name;
- boolean success = false;
- boolean copyAttempted = false;
+ scheduledForCopyCount.decrementAndGet();
+ copyFilesToLocal(reference, true);
+ }
+ });
+ }
+
+ private void prefetchIndexFiles() throws IOException {
+ long start = PERF_LOGGER.start();
+ long totalSize = 0;
+ int copyCount = 0;
+ for (String name : remote.listAll()) {
+ if (REMOTE_ONLY.contains(name)) {
+ continue;
+ }
+ CORFileReference fileRef = new CORFileReference(name);
+ files.putIfAbsent(name, fileRef);
+ long fileSize = copyFilesToLocal(fileRef, false);
+ if (fileSize > 0) {
+ copyCount++;
+ totalSize += fileSize;
+ }
+ }
+ PERF_LOGGER.end(start, -1, "[{}] Copied {} files totaling {}", indexPath, copyCount, humanReadableByteCount(totalSize));
+ }
+
+ private long copyFilesToLocal(CORFileReference reference, boolean logDuration) {
+ String name = reference.name;
+ boolean success = false;
+ boolean copyAttempted = false;
+ long fileSize = 0;
+ try {
+ if (!local.fileExists(name)) {
+ long perfStart = -1;
+ if (logDuration) {
+ perfStart = PERF_LOGGER.start();
+ }
+
+ fileSize = remote.fileLength(name);
+ LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
+ long start = startCopy(file);
+ copyAttempted = true;
+
+ remote.copy(local, name, name, IOContext.READ);
+ reference.markValid();
+
+ doneCopy(file, start);
+ if (logDuration) {
+ PERF_LOGGER.end(perfStart, 0,
+ "[{}] Copied file {} of size {}", indexPath,
+ name, humanReadableByteCount(fileSize));
+ }
+ } else {
+ long localLength = local.fileLength(name);
+ long remoteLength = remote.fileLength(name);
+
+ //Do a simple consistency check. Ideally Lucene index files are never
+ //updated but still do a check if the copy is consistent
+ if (localLength != remoteLength) {
+ log.warn("[{}] Found local copy for {} in {} but size of local {} differs from remote {}. " +
+ "Content would be read from remote file only",
+ indexPath, name, local, localLength, remoteLength);
+ invalidFileCount.incrementAndGet();
+ } else {
+ reference.markValid();
+ }
+ }
+ success = true;
+ } catch (IOException e) {
+ //TODO In case of exception there would not be any other attempt
+ //to download the file. Look into support for retry
+ log.warn("[{}] Error occurred while copying file [{}] from {} to {}", indexPath, name, remote, local, e);
+ } finally {
+ if (copyAttempted && !success){
try {
- scheduledForCopyCount.decrementAndGet();
- if (!local.fileExists(name)) {
- long fileSize = remote.fileLength(name);
- LocalIndexFile file = new LocalIndexFile(local, name, fileSize, true);
- long start = startCopy(file);
- copyAttempted = true;
-
- remote.copy(local, name, name, IOContext.READ);
- reference.markValid();
-
- doneCopy(file, start);
- } else {
- long localLength = local.fileLength(name);
- long remoteLength = remote.fileLength(name);
-
- //Do a simple consistency check. Ideally Lucene index files are never
- //updated but still do a check if the copy is consistent
- if (localLength != remoteLength) {
- log.warn("Found local copy for {} in {} but size of local {} differs from remote {}. " +
- "Content would be read from remote file only",
- name, local, localLength, remoteLength);
- invalidFileCount.incrementAndGet();
- } else {
- reference.markValid();
- }
+ if (local.fileExists(name)) {
+ local.deleteFile(name);
}
- success = true;
} catch (IOException e) {
- //TODO In case of exception there would not be any other attempt
- //to download the file. Look into support for retry
- log.warn("Error occurred while copying file [{}] " +
- "from {} to {}", name, remote, local, e);
- } finally {
- if (copyAttempted && !success){
- try {
- if (local.fileExists(name)) {
- local.deleteFile(name);
- }
- } catch (IOException e) {
- log.warn("Error occurred while deleting corrupted file [{}] from [{}]", name, local, e);
- }
- }
+ log.warn("[{}] Error occurred while deleting corrupted file [{}] from [{}]", indexPath, name, local, e);
}
}
- });
+ }
+ return fileSize;
}
/**
@@ -381,8 +435,9 @@ public class IndexCopier implements Copy
try{
removeDeletedFiles();
} catch (IOException e) {
- log.warn("Error occurred while removing deleted files from Local {}, " +
- "Remote {}", local, remote, e);
+ log.warn(
+ "[{}] Error occurred while removing deleted files from Local {}, Remote {}",
+ indexPath, local, remote, e);
}
try {
@@ -392,7 +447,9 @@ public class IndexCopier implements Copy
local.close();
remote.close();
} catch (IOException e) {
- log.warn("Error occurred while closing directory ", e);
+ log.warn(
+ "[{}] Error occurred while closing directory ",
+ indexPath, e);
}
}
});
@@ -1068,6 +1125,11 @@ public class IndexCopier implements Copy
}
@Override
+ public boolean isPrefetchEnabled() {
+ return prefetchEnabled;
+ }
+
+ @Override
public int getReaderLocalReadCount() {
return readerLocalReadCount.get();
}
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderService.java Fri Jul 3 12:02:41 2015
@@ -128,6 +128,15 @@ public class LuceneIndexProviderService
)
private static final String PROP_THREAD_POOL_SIZE = "threadPoolSize";
+ private static final boolean PROP_PREFETCH_INDEX_FILES_DEFAULT = false;
+ @Property(
+ boolValue = PROP_PREFETCH_INDEX_FILES_DEFAULT,
+ label = "Prefetch Index Files",
+ description = "Prefetch the index files when CopyOnRead is enabled. When enabled all new Lucene" +
+ " index files would be copied locally before the index is made available to QueryEngine"
+ )
+ private static final String PROP_PREFETCH_INDEX_FILES = "prefetchIndexFiles";
+
private Whiteboard whiteboard;
private BackgroundObserver backgroundObserver;
@@ -192,6 +201,10 @@ public class LuceneIndexProviderService
InfoStream.setDefault(InfoStream.NO_OUTPUT);
}
+ IndexCopier getIndexCopier() {
+ return indexCopier;
+ }
+
private void initialize(){
if(indexProvider == null){
return;
@@ -242,6 +255,8 @@ public class LuceneIndexProviderService
return;
}
String indexDirPath = PropertiesUtil.toString(config.get(PROP_LOCAL_INDEX_DIR), null);
+ boolean prefetchEnabled = PropertiesUtil.toBoolean(config.get(PROP_PREFETCH_INDEX_FILES),
+ PROP_PREFETCH_INDEX_FILES_DEFAULT);
if (Strings.isNullOrEmpty(indexDirPath)) {
String repoHome = bundleContext.getProperty(REPOSITORY_HOME);
if (repoHome != null){
@@ -252,8 +267,12 @@ public class LuceneIndexProviderService
checkNotNull(indexDirPath, "Index directory cannot be determined as neither index " +
"directory path [%s] nor repository home [%s] defined", PROP_LOCAL_INDEX_DIR, REPOSITORY_HOME);
+ if (prefetchEnabled){
+ log.info("Prefetching of index files enabled. Index would be opened after copying all new files locally");
+ }
+
indexDir = new File(indexDirPath);
- indexCopier = new IndexCopier(getExecutorService(), indexDir);
+ indexCopier = new IndexCopier(getExecutorService(), indexDir, prefetchEnabled);
oakRegs.add(registerMBean(whiteboard,
CopyOnReadStatsMBean.class,
Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/package-info.java Fri Jul 3 12:02:41 2015
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-@Version("2.0.0")
+@Version("2.1.0")
@Export(optional = "provide:=true")
package org.apache.jackrabbit.oak.plugins.index.lucene;
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java Fri Jul 3 12:02:41 2015
@@ -111,6 +111,33 @@ public class IndexCopierTest {
}
@Test
+ public void basicTestWithPrefetch() throws Exception{
+ Directory baseDir = new RAMDirectory();
+ IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
+ IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), getWorkDir(), true);
+
+ Directory remote = new RAMDirectory();
+
+ byte[] t1 = writeFile(remote, "t1");
+ byte[] t2 = writeFile(remote , "t2");
+
+ Directory wrapped = c1.wrapForRead("/foo", defn, remote);
+ assertEquals(2, wrapped.listAll().length);
+
+ assertTrue(wrapped.fileExists("t1"));
+ assertTrue(wrapped.fileExists("t2"));
+
+ assertTrue(baseDir.fileExists("t1"));
+ assertTrue(baseDir.fileExists("t2"));
+
+ assertEquals(t1.length, wrapped.fileLength("t1"));
+ assertEquals(t2.length, wrapped.fileLength("t2"));
+
+ readAndAssert(wrapped, "t1", t1);
+
+ }
+
+ @Test
public void basicTestWithFS() throws Exception{
IndexDefinition defn = new IndexDefinition(root, builder.getNodeState());
IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
@@ -849,11 +876,16 @@ public class IndexCopierTest {
private class RAMIndexCopier extends IndexCopier {
final Directory baseDir;
- public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) throws IOException {
- super(executor, indexRootDir);
+ public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir,
+ boolean prefetchEnabled) throws IOException {
+ super(executor, indexRootDir, prefetchEnabled);
this.baseDir = baseDir;
}
+ public RAMIndexCopier(Directory baseDir, Executor executor, File indexRootDir) throws IOException {
+ this(baseDir, executor, indexRootDir, false);
+ }
+
@Override
protected Directory createLocalDirForIndexReader(String indexPath, IndexDefinition definition) throws IOException {
return baseDir;
Modified: jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java?rev=1689003&r1=1689002&r2=1689003&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java (original)
+++ jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/LuceneIndexProviderServiceTest.java Fri Jul 3 12:02:41 2015
@@ -34,11 +34,17 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class LuceneIndexProviderServiceTest {
+ /*
+ The test case uses raw config name and not access it via
+ constants in LuceneIndexProviderService to ensure that change
+ in names are detected
+ */
@Rule
public final TemporaryFolder folder = new TemporaryFolder();
@@ -60,6 +66,10 @@ public class LuceneIndexProviderServiceT
(LuceneIndexEditorProvider) context.getService(IndexEditorProvider.class);
assertNull(editorProvider.getIndexCopier());
+ IndexCopier indexCopier = service.getIndexCopier();
+ assertNotNull("IndexCopier should be initialized as CopyOnRead is enabled by default", indexCopier);
+ assertFalse(indexCopier.isPrefetchEnabled());
+
assertNotNull("CopyOnRead should be enabled by default", context.getService(CopyOnReadStatsMBean.class));
assertTrue(context.getService(Observer.class) instanceof BackgroundObserver);
@@ -93,6 +103,18 @@ public class LuceneIndexProviderServiceT
MockOsgi.deactivate(service);
}
+
+ @Test
+ public void enablePrefetchIndexFiles() throws Exception{
+ Map<String,Object> config = getDefaultConfig();
+ config.put("prefetchIndexFiles", true);
+ MockOsgi.activate(service, context.bundleContext(), config);
+
+ IndexCopier indexCopier = service.getIndexCopier();
+ assertTrue(indexCopier.isPrefetchEnabled());
+
+ MockOsgi.deactivate(service);
+ }
@Test
public void debugLogging() throws Exception{