You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/12/14 17:01:24 UTC
svn commit: r1774293 - in
/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment:
SegmentNodeStoreFactory.java SegmentNodeStoreService.java
Author: frm
Date: Wed Dec 14 17:01:24 2016
New Revision: 1774293
URL: http://svn.apache.org/viewvc?rev=1774293&view=rev
Log:
OAK-5303 - Improve readability of SegmentNodeStoreService
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java?rev=1774293&r1=1774292&r2=1774293&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreFactory.java Wed Dec 14 17:01:24 2016
@@ -16,10 +16,10 @@
*/
package org.apache.jackrabbit.oak.segment;
-import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.asCloseable;
-import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.property;
+import static org.apache.jackrabbit.oak.osgi.OsgiUtil.lookupConfigurationThenFramework;
import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
+import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -39,6 +39,7 @@ import org.apache.jackrabbit.oak.osgi.Os
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
@@ -137,4 +138,20 @@ public class SegmentNodeStoreFactory {
}
}
+ private static Closeable asCloseable(final Registration r) {
+ return new Closeable() {
+
+ @Override
+ public void close() {
+ r.unregister();
+ }
+
+ };
+ }
+
+ static String property(String name, ComponentContext context) {
+ return lookupConfigurationThenFramework(context, name);
+ }
+
+
}
Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java?rev=1774293&r1=1774292&r2=1774293&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreService.java Wed Dec 14 17:01:24 2016
@@ -16,9 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.jackrabbit.oak.segment;
-import static java.util.Collections.emptyMap;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
@@ -28,6 +30,30 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_STRING_CACHE_MB;
import static org.apache.jackrabbit.oak.segment.CachingSegmentReader.DEFAULT_TEMPLATE_CACHE_MB;
import static org.apache.jackrabbit.oak.segment.SegmentCache.DEFAULT_SEGMENT_CACHE_MB;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.BACKUP_DIRECTORY;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.COMPACTION_DISABLE_ESTIMATION;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.COMPACTION_FORCE_TIMEOUT;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.COMPACTION_RETRY_COUNT;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.COMPACTION_SIZE_DELTA_ESTIMATION;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.CUSTOM_BLOB_STORE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.DEFAULT_BLOB_GC_MAX_AGE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.DEFAULT_BLOB_SNAPSHOT_INTERVAL;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.DIRECTORY;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.GC_PROGRESS_LOG;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.MEMORY_THRESHOLD;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.MODE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.NODE_DEDUPLICATION_CACHE_SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.PAUSE_COMPACTION;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.PROP_BLOB_GC_MAX_AGE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.PROP_BLOB_SNAPSHOT_INTERVAL;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.RETAINED_GENERATIONS;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.SEGMENT_CACHE_SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.STANDBY;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.STRING_CACHE_SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.STRING_DEDUPLICATION_CACHE_SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.TEMPLATE_CACHE_SIZE;
+import static org.apache.jackrabbit.oak.segment.SegmentNodeStoreService.TEMPLATE_DEDUPLICATION_CACHE_SIZE;
import static org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener.IGNORE_SNFE;
import static org.apache.jackrabbit.oak.segment.WriterCacheManager.DEFAULT_NODE_CACHE_SIZE_OSGi;
import static org.apache.jackrabbit.oak.segment.WriterCacheManager.DEFAULT_STRING_CACHE_SIZE_OSGi;
@@ -43,23 +69,19 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.DEFAULT_MAX_FILE_SIZE;
import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder;
import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
-import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import com.google.common.base.Strings;
import com.google.common.base.Supplier;
-import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -77,7 +99,6 @@ import org.apache.jackrabbit.oak.api.jmx
import org.apache.jackrabbit.oak.api.jmx.FileStoreBackupRestoreMBean;
import org.apache.jackrabbit.oak.backup.impl.FileStoreBackupRestoreImpl;
import org.apache.jackrabbit.oak.cache.CacheStats;
-import org.apache.jackrabbit.oak.commons.IOUtils;
import org.apache.jackrabbit.oak.osgi.ObserverTracker;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
@@ -105,9 +126,10 @@ import org.apache.jackrabbit.oak.spi.sta
import org.apache.jackrabbit.oak.spi.state.RevisionGC;
import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean;
import org.apache.jackrabbit.oak.spi.whiteboard.AbstractServiceTracker;
-import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
+import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
+import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.util.GenericDescriptors;
@@ -134,14 +156,14 @@ public class SegmentNodeStoreService {
@Property(
label = "Directory",
- description="Directory location used to store the segment tar files. If not specified then looks " +
+ description = "Directory location used to store the segment tar files. If not specified then looks " +
"for framework property 'repository.home' otherwise use a subdirectory with name 'tarmk'"
)
public static final String DIRECTORY = "repository.home";
@Property(
label = "Mode",
- description="TarMK mode (64 for memory mapping, 32 for normal file access)"
+ description = "TarMK mode (64 for memory mapping, 32 for normal file access)"
)
public static final String MODE = "tarmk.mode";
@@ -273,7 +295,7 @@ public class SegmentNodeStoreService {
@Property(
label = "Backup Directory",
- description="Directory location for storing repository backups. If not set, defaults to" +
+ description = "Directory location for storing repository backups. If not set, defaults to" +
" 'segmentstore-backup' subdirectory under 'repository.home'."
)
public static final String BACKUP_DIRECTORY = "repository.backup.dir";
@@ -289,14 +311,14 @@ public class SegmentNodeStoreService {
@Reference
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
- private Closer registrations = Closer.create();
+ private Closer closer;
/**
* Blob modified before this time duration would be considered for Blob GC
*/
- private static final long DEFAULT_BLOB_GC_MAX_AGE = 24 * 60 * 60;
+ static final long DEFAULT_BLOB_GC_MAX_AGE = 24 * 60 * 60;
- @Property (longValue = DEFAULT_BLOB_GC_MAX_AGE,
+ @Property(longValue = DEFAULT_BLOB_GC_MAX_AGE,
label = "Blob GC Max Age (in secs)",
description = "Blob Garbage Collector (GC) logic will only consider those blobs for GC which " +
"are not accessed recently (currentTime - lastModifiedTime > blobGcMaxAgeInSecs). For " +
@@ -308,9 +330,9 @@ public class SegmentNodeStoreService {
/**
* Default interval for taking snapshots of locally tracked blob ids.
*/
- private static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
+ static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
- @Property (longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
+ @Property(longValue = DEFAULT_BLOB_SNAPSHOT_INTERVAL,
label = "Blob tracking snapshot interval (in secs)",
description = "This is the default interval in which the snapshots of locally tracked blob ids will"
+ "be taken and synchronized with the blob store. This should be configured to be less than the "
@@ -321,13 +343,14 @@ public class SegmentNodeStoreService {
@Activate
public void activate(ComponentContext context) throws IOException {
- if (blobStore == null && hasCustomBlobStore(context)) {
+ Configuration configuration = new Configuration(context);
+ if (blobStore == null && configuration.hasCustomBlobStore()) {
log.info("BlobStore use enabled. SegmentNodeStore would be initialized when BlobStore would be available");
return;
}
- registrations = Closer.create();
+ closer = Closer.create();
OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
- registerSegmentStore(context, blobStore, statisticsProvider, registrations, whiteboard, null, true);
+ registerSegmentStore(context, blobStore, statisticsProvider, closer, whiteboard, null, true);
}
/**
@@ -340,11 +363,11 @@ public class SegmentNodeStoreService {
* @param blobStore An instance of {@link BlobStore}. It can be
* {@code null}.
* @param statisticsProvider An instance of {@link StatisticsProvider}.
- * @param registrations An instance of {@link Closer}. It will be used
+ * @param closer An instance of {@link Closer}. It will be used
* to track every registered service or
* component.
- * @param whiteboard An instance of {@link OsgiWhiteboard}. It will
- * be used to register services in the OSGi
+ * @param whiteboard An instance of {@link Whiteboard}. It will be
+ * used to register services in the OSGi
* framework.
* @param role The role of this component. It can be {@code
* null}.
@@ -358,38 +381,52 @@ public class SegmentNodeStoreService {
@Nonnull ComponentContext context,
@Nullable BlobStore blobStore,
@Nonnull StatisticsProvider statisticsProvider,
- @Nonnull Closer registrations,
- @Nonnull OsgiWhiteboard whiteboard,
+ @Nonnull Closer closer,
+ @Nonnull Whiteboard whiteboard,
@Nullable String role,
boolean descriptors
) throws IOException {
+ Configuration configuration = new Configuration(context, role);
+ Closeables closeables = new Closeables(closer);
+ Registrations registrations = new Registrations(whiteboard, role);
+
// Listen for GCMonitor services
GCMonitorTracker gcMonitor = new GCMonitorTracker();
gcMonitor.start(whiteboard);
+ closeables.add(gcMonitor);
// Create the gc options
- SegmentGCOptions gcOptions = newGCOptions(context);
+ if (configuration.getCompactionGainThreshold() != null) {
+ log.warn("Deprecated property compaction.gainThreshold was detected. In order to configure compaction please use the new property "
+ + "compaction.sizeDeltaEstimation. For turning off estimation, the new property compaction.disableEstimation should be used.");
+ }
+ SegmentGCOptions gcOptions = new SegmentGCOptions(configuration.getPauseCompaction(), configuration.getRetryCount(), configuration.getForceCompactionTimeout())
+ .setRetainedGenerations(configuration.getRetainedGenerations())
+ .setGcSizeDeltaEstimation(configuration.getSizeDeltaEstimation())
+ .setMemoryThreshold(configuration.getMemoryThreshold())
+ .setEstimationDisabled(configuration.getDisableEstimation())
+ .withGCNodeWriteMonitor(configuration.getGCProcessLog());
// Build the FileStore
- FileStoreBuilder builder = fileStoreBuilder(getDirectory(context, role))
- .withSegmentCacheSize(getSegmentCacheSize(context))
- .withStringCacheSize(getStringCacheSize(context))
- .withTemplateCacheSize(getTemplateCacheSize(context))
- .withStringDeduplicationCacheSize(getStringDeduplicationCacheSize(context))
- .withTemplateDeduplicationCacheSize(getTemplateDeduplicationCacheSize(context))
- .withNodeDeduplicationCacheSize(getNodeDeduplicationCacheSize(context))
- .withMaxFileSize(getMaxFileSize(context))
- .withMemoryMapping(getMode(context).equals("64"))
+ FileStoreBuilder builder = fileStoreBuilder(configuration.getDirectory())
+ .withSegmentCacheSize(configuration.getSegmentCacheSize())
+ .withStringCacheSize(configuration.getStringCacheSize())
+ .withTemplateCacheSize(configuration.getTemplateCacheSize())
+ .withStringDeduplicationCacheSize(configuration.getStringDeduplicationCacheSize())
+ .withTemplateDeduplicationCacheSize(configuration.getTemplateDeduplicationCacheSize())
+ .withNodeDeduplicationCacheSize(configuration.getNodeDeduplicationCacheSize())
+ .withMaxFileSize(configuration.getMaxFileSize())
+ .withMemoryMapping(configuration.getMemoryMapping())
.withGCMonitor(gcMonitor)
.withStatisticsProvider(statisticsProvider)
.withGCOptions(gcOptions);
- if (hasCustomBlobStore(context) && blobStore != null) {
+ if (configuration.hasCustomBlobStore() && blobStore != null) {
log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore);
builder.withBlobStore(blobStore);
}
- if (!isStandbyInstance(context)) {
+ if (!configuration.isStandbyInstance()) {
builder.withSnfeListener(IGNORE_SNFE);
}
@@ -401,95 +438,82 @@ public class SegmentNodeStoreService {
return null;
}
// store should be closed last
- registrations.register(store);
- registrations.register(asCloseable(gcMonitor));
+ closeables.add(store);
// Listen for Executor services on the whiteboard
WhiteboardExecutor executor = new WhiteboardExecutor();
executor.start(whiteboard);
- registrations.register(asCloseable(executor));
-
- List<Registration> mbeans = Lists.newArrayList();
+ closeables.add(executor);
// Expose stats about the segment cache
CacheStatsMBean segmentCacheStats = store.getSegmentCacheStats();
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
segmentCacheStats,
CacheStats.TYPE,
- appendRole(segmentCacheStats.getName(), role),
- withRole(properties(), role)
+ segmentCacheStats.getName()
));
// Expose stats about the string and template caches
CacheStatsMBean stringCacheStats = store.getStringCacheStats();
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
- stringCacheStats, CacheStats.TYPE,
- appendRole(stringCacheStats.getName(), role),
- withRole(properties(), role)
+ stringCacheStats,
+ CacheStats.TYPE,
+ stringCacheStats.getName()
));
CacheStatsMBean templateCacheStats = store.getTemplateCacheStats();
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
- templateCacheStats, CacheStats.TYPE,
- appendRole(templateCacheStats.getName(), role),
- withRole(properties(), role)
+ templateCacheStats,
+ CacheStats.TYPE,
+ templateCacheStats.getName()
));
CacheStatsMBean stringDeduplicationCacheStats = store.getStringDeduplicationCacheStats();
if (stringDeduplicationCacheStats != null) {
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
- stringDeduplicationCacheStats, CacheStats.TYPE,
- appendRole(stringDeduplicationCacheStats.getName(), role),
- withRole(properties(), role)
+ stringDeduplicationCacheStats,
+ CacheStats.TYPE,
+ stringDeduplicationCacheStats.getName()
));
}
CacheStatsMBean templateDeduplicationCacheStats = store.getTemplateDeduplicationCacheStats();
if (templateDeduplicationCacheStats != null) {
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
- templateDeduplicationCacheStats, CacheStats.TYPE,
- appendRole(templateDeduplicationCacheStats.getName(), role),
- withRole(properties(), role)
+ templateDeduplicationCacheStats,
+ CacheStats.TYPE,
+ templateDeduplicationCacheStats.getName()
));
}
CacheStatsMBean nodeDeduplicationCacheStats = store.getNodeDeduplicationCacheStats();
if (nodeDeduplicationCacheStats != null) {
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
CacheStatsMBean.class,
- nodeDeduplicationCacheStats, CacheStats.TYPE,
- appendRole(nodeDeduplicationCacheStats.getName(), role),
- withRole(properties(), role)
+ nodeDeduplicationCacheStats,
+ CacheStats.TYPE,
+ nodeDeduplicationCacheStats.getName()
));
}
// Expose an MBean to managing and monitoring garbage collection
final FileStoreGCMonitor fsgcm = new FileStoreGCMonitor(Clock.SIMPLE);
- mbeans.add(new CompositeRegistration(
- whiteboard.register(GCMonitor.class, fsgcm, emptyMap()),
- registerMBean(
- whiteboard,
- SegmentRevisionGC.class,
- new SegmentRevisionGCMBean(store, gcOptions, fsgcm),
- SegmentRevisionGC.TYPE,
- appendRole("Segment node store revision garbage collection", role),
- withRole(properties(), role)
- )));
+ closeables.add(registrations.register(GCMonitor.class, fsgcm));
+ closeables.add(registrations.registerMBean(
+ SegmentRevisionGC.class,
+ new SegmentRevisionGCMBean(store, gcOptions, fsgcm),
+ SegmentRevisionGC.TYPE,
+ "Segment node store revision garbage collection"
+ ));
Runnable cancelGC = new Runnable() {
@@ -505,46 +529,41 @@ public class SegmentNodeStoreService {
return fsgcm.getStatus();
}
};
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
RevisionGCMBean.class,
new RevisionGC(store.getGCRunner(), cancelGC, statusMessage, executor),
RevisionGCMBean.TYPE,
- appendRole("Revision garbage collection", role),
- withRole(properties(), role)
+ "Revision garbage collection"
));
// Expose statistics about the FileStore
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
FileStoreStatsMBean.class,
store.getStats(),
FileStoreStatsMBean.TYPE,
- appendRole("FileStore statistics", role),
- withRole(properties(), role)
+ "FileStore statistics"
));
// register segment node store
- SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store)
- .withStatisticsProvider(statisticsProvider);
- if (isStandbyInstance(context) || !isPrimarySegmentStore(role)) {
+ SegmentNodeStore.SegmentNodeStoreBuilder segmentNodeStoreBuilder = SegmentNodeStoreBuilders.builder(store).withStatisticsProvider(statisticsProvider);
+ if (configuration.isStandbyInstance() || !configuration.isPrimarySegmentStore()) {
segmentNodeStoreBuilder.dispatchChanges(false);
}
SegmentNodeStore segmentNodeStore = segmentNodeStoreBuilder.build();
- if (isPrimarySegmentStore(role)) {
+ if (configuration.isPrimarySegmentStore()) {
ObserverTracker observerTracker = new ObserverTracker(segmentNodeStore);
observerTracker.start(context.getBundleContext());
- registrations.register(asCloseable(observerTracker));
+ closeables.add(observerTracker);
}
- if (isPrimarySegmentStore(role)) {
- mbeans.add(registerMBean(
- whiteboard,
+ if (configuration.isPrimarySegmentStore()) {
+ closeables.add(registrations.registerMBean(
CheckpointMBean.class,
- new SegmentCheckpointMBean(segmentNodeStore), CheckpointMBean.TYPE,
+ new SegmentCheckpointMBean(segmentNodeStore),
+ CheckpointMBean.TYPE,
"Segment node store checkpoint management"
));
}
@@ -559,22 +578,13 @@ public class SegmentNodeStoreService {
true,
false
);
- mbeans.add(whiteboard.register(
- Descriptors.class,
- clusterIdDesc,
- withRole(properties(), role)
- ));
-
+ closeables.add(registrations.register(Descriptors.class, clusterIdDesc));
// Register "discovery lite" descriptors
- mbeans.add(whiteboard.register(
- Descriptors.class,
- new SegmentDiscoveryLiteDescriptors(segmentNodeStore),
- withRole(properties(), role)
- ));
+ closeables.add(registrations.register(Descriptors.class, new SegmentDiscoveryLiteDescriptors(segmentNodeStore)));
}
// If a shared data store register the repo id in the data store
- if (isPrimarySegmentStore(role) && isShared(blobStore)) {
+ if (configuration.isPrimarySegmentStore() && isShared(blobStore)) {
SharedDataStore sharedDataStore = (SharedDataStore) blobStore;
try {
sharedDataStore.addMetadataRecord(new ByteArrayInputStream(new byte[0]), SharedStoreRecordType.REPOSITORY.getNameFromId(getOrCreateId(segmentNodeStore)));
@@ -582,29 +592,23 @@ public class SegmentNodeStoreService {
throw new IOException("Could not register a unique repositoryId", e);
}
if (blobStore instanceof BlobTrackingStore) {
- long trackSnapshotInterval = toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL, context), DEFAULT_BLOB_SNAPSHOT_INTERVAL);
- String root = property(DIRECTORY, context);
- if (Strings.isNullOrEmpty(root)) {
- root = "repository";
- }
BlobTrackingStore trackingStore = (BlobTrackingStore) blobStore;
if (trackingStore.getTracker() != null) {
trackingStore.getTracker().close();
}
- trackingStore.addTracker(new BlobIdTracker(root, getOrCreateId(segmentNodeStore), trackSnapshotInterval, sharedDataStore));
+ trackingStore.addTracker(new BlobIdTracker(configuration.getRootDirectory(), getOrCreateId(segmentNodeStore), configuration.getBlobSnapshotInterval(), sharedDataStore));
}
}
- if (isPrimarySegmentStore(role) && isGarbageCollectable(blobStore)) {
+ if (configuration.isPrimarySegmentStore() && blobStore instanceof GarbageCollectableBlobStore) {
BlobGarbageCollector gc = new MarkSweepGarbageCollector(
new SegmentBlobReferenceRetriever(store),
(GarbageCollectableBlobStore) blobStore,
executor,
- TimeUnit.SECONDS.toMillis(getBlobGcMaxAge(context)),
+ TimeUnit.SECONDS.toMillis(configuration.getBlobGcMaxAge()),
getOrCreateId(segmentNodeStore)
);
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
BlobGCMBean.class,
new BlobGC(gc, executor),
BlobGCMBean.TYPE,
@@ -614,56 +618,49 @@ public class SegmentNodeStoreService {
// Expose an MBean for backup/restore operations
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
FileStoreBackupRestoreMBean.class,
new FileStoreBackupRestoreImpl(
segmentNodeStore,
store.getRevisions(),
store.getReader(),
- getBackupDirectory(context, role),
+ configuration.getBackupDirectory(),
executor
),
FileStoreBackupRestoreMBean.TYPE,
- appendRole("Segment node store backup/restore", role),
- withRole(properties(), role)
+ "Segment node store backup/restore"
));
// Expose statistics about the SegmentNodeStore
- mbeans.add(registerMBean(
- whiteboard,
+ closeables.add(registrations.registerMBean(
SegmentNodeStoreStatsMBean.class,
segmentNodeStore.getStats(),
SegmentNodeStoreStatsMBean.TYPE,
- appendRole("SegmentNodeStore statistics", role),
- withRole(properties(), role)
+ "SegmentNodeStore statistics"
));
- if (isPrimarySegmentStore(role)) {
+ if (configuration.isPrimarySegmentStore()) {
log.info("Primary SegmentNodeStore initialized");
} else {
log.info("Secondary SegmentNodeStore initialized, role={}", role);
}
// Register a factory service to expose the FileStore
- registrations.register(asCloseable(whiteboard.register(
+ closeables.add(registrations.register(
SegmentStoreProvider.class,
- new DefaultSegmentStoreProvider(store),
- withRole(properties(), role)
- )));
-
- registrations.register(asCloseable(new CompositeRegistration(mbeans)));
+ new DefaultSegmentStoreProvider(store)
+ ));
- if (isStandbyInstance(context)) {
+ if (configuration.isStandbyInstance()) {
return segmentNodeStore;
}
- if (isPrimarySegmentStore(role)) {
+ if (configuration.isPrimarySegmentStore()) {
Map<String, Object> props = new HashMap<String, Object>();
props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
props.put("oak.nodestore.description", new String[] {"nodeStoreType=segment"});
- registrations.register(asCloseable(whiteboard.register(NodeStore.class, segmentNodeStore, props)));
+ closeables.add(registrations.register(NodeStore.class, segmentNodeStore, props));
}
return segmentNodeStore;
@@ -671,187 +668,292 @@ public class SegmentNodeStoreService {
@Deactivate
public void deactivate() {
- if (registrations != null) {
- IOUtils.closeQuietly(registrations);
- registrations = null;
- }
+ closeQuietly(closer);
+ closer = null;
}
- private static Map<String, String> properties() {
- return new HashMap<>();
+}
+
+/**
+ * Encapsulates a {@link Closer} and makes it easier to track the lifecycle
+ * of entities that can be disposed.
+ */
+class Closeables implements Closeable {
+
+ private final Closer closer;
+
+ Closeables(Closer closer) {
+ this.closer = closer;
}
- private static Map<String, String> withRole(Map<String, String> map, String role) {
- if (role != null) {
- map.put("role", sanitizeRole(role));
- }
- return map;
+ void add(Closeable c) {
+ closer.register(c);
}
- private static boolean isGarbageCollectable(BlobStore store) {
- return store instanceof GarbageCollectableBlobStore;
+ void add(final AbstractServiceTracker<?> t) {
+ add(new Closeable() {
+
+ @Override
+ public void close() throws IOException {
+ t.stop();
+ }
+
+ });
}
- private static SegmentGCOptions newGCOptions(ComponentContext context) {
- boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION, context), PAUSE_DEFAULT);
- int retryCount = toInteger(property(COMPACTION_RETRY_COUNT, context), RETRY_COUNT_DEFAULT);
- int forceTimeout = toInteger(property(COMPACTION_FORCE_TIMEOUT, context), FORCE_TIMEOUT_DEFAULT);
- int retainedGenerations = toInteger(property(RETAINED_GENERATIONS, context), RETAINED_GENERATIONS_DEFAULT);
+ void add(final Registration r) {
+ add(new Closeable() {
- long sizeDeltaEstimation = toLong(property(COMPACTION_SIZE_DELTA_ESTIMATION, context), SIZE_DELTA_ESTIMATION_DEFAULT);
- int memoryThreshold = toInteger(property(MEMORY_THRESHOLD, context), MEMORY_THRESHOLD_DEFAULT);
- boolean disableEstimation = toBoolean(property(COMPACTION_DISABLE_ESTIMATION, context), DISABLE_ESTIMATION_DEFAULT);
+ @Override
+ public void close() throws IOException {
+ r.unregister();
+ }
- if (property("compaction.gainThreshold", context) != null) {
- log.warn("Deprecated property compaction.gainThreshold was detected. In order to configure compaction please use the new property "
- + "compaction.sizeDeltaEstimation. For turning off estimation, the new property compaction.disableEstimation should be used.");
- }
- long gcProgressLog = toLong(property(GC_PROGRESS_LOG, context), GC_PROGRESS_LOG_DEFAULT);
+ });
+ }
+
+ void add(final ObserverTracker t) {
+ add(new Closeable() {
- return new SegmentGCOptions(pauseCompaction, retryCount, forceTimeout)
- .setRetainedGenerations(retainedGenerations)
- .setGcSizeDeltaEstimation(sizeDeltaEstimation)
- .setMemoryThreshold(memoryThreshold)
- .setEstimationDisabled(disableEstimation)
- .withGCNodeWriteMonitor(gcProgressLog);
+ @Override
+ public void close() throws IOException {
+ t.stop();
+ }
+
+ });
}
- private static boolean isStandbyInstance(ComponentContext context) {
- return Boolean.parseBoolean(property(STANDBY, context));
+ @Override
+ public void close() throws IOException {
+ closer.close();
}
- private static boolean hasCustomBlobStore(ComponentContext context) {
- return Boolean.parseBoolean(property(CUSTOM_BLOB_STORE, context));
+}
+
+/**
+ * Allows simple access to the configuration of this component. Provides
+ * default values for unspecified properties and type conversion.
+ */
+class Configuration {
+
+ private static int roundToNextPowerOfTwo(int size) {
+ return 1 << (32 - Integer.numberOfLeadingZeros(Math.max(0, size - 1)));
}
- private static File getBaseDirectory(ComponentContext context) {
- String directory = property(DIRECTORY, context);
+ private final ComponentContext context;
- if (directory != null) {
- return new File(directory);
- }
+ private final String role;
- return new File("tarmk");
+ Configuration(ComponentContext context) {
+ this(context, null);
+ }
+
+ Configuration(ComponentContext context, String role) {
+ this.context = context;
+ this.role = role;
}
- private static File getDirectory(ComponentContext context, String role) {
- return new File(getBaseDirectory(context), appendRole("segmentstore", role));
+ String property(String name) {
+ return lookupConfigurationThenFramework(context, name);
+ }
+
+ String getRootDirectory() {
+ String root = property(DIRECTORY);
+ if (isNullOrEmpty(root)) {
+ return "repository";
+ }
+ return root;
}
- private static File getBackupDirectory(ComponentContext context, String role) {
- String backupDirectory = property(BACKUP_DIRECTORY, context);
+ File getDirectory() {
+ return new File(getBaseDirectory(), appendRole("segmentstore"));
+ }
+
+ File getBackupDirectory() {
+ String backupDirectory = property(BACKUP_DIRECTORY);
if (backupDirectory != null) {
return new File(backupDirectory);
}
- return new File(getBaseDirectory(context), appendRole("segmentstore-backup", role));
+ return new File(getBaseDirectory(), appendRole("segmentstore-backup"));
}
- private static String getMode(ComponentContext context) {
- String mode = property(MODE, context);
+ int getSegmentCacheSize() {
+ return toInteger(getCacheSize(SEGMENT_CACHE_SIZE), DEFAULT_SEGMENT_CACHE_MB);
+ }
- if (mode != null) {
- return mode;
- }
+ int getStringCacheSize() {
+ return toInteger(getCacheSize(STRING_CACHE_SIZE), DEFAULT_STRING_CACHE_MB);
+ }
- return System.getProperty(MODE, System.getProperty("sun.arch.data.model", "32"));
+ int getTemplateCacheSize() {
+ return toInteger(getCacheSize(TEMPLATE_CACHE_SIZE), DEFAULT_TEMPLATE_CACHE_MB);
}
- private static String getCacheSize(String propertyName, ComponentContext context) {
- String cacheSize = property(propertyName, context);
+ int getStringDeduplicationCacheSize() {
+ return toInteger(getCacheSize(STRING_DEDUPLICATION_CACHE_SIZE), DEFAULT_STRING_CACHE_SIZE_OSGi);
+ }
- if (cacheSize != null) {
- return cacheSize;
- }
+ int getTemplateDeduplicationCacheSize() {
+ return toInteger(getCacheSize(TEMPLATE_DEDUPLICATION_CACHE_SIZE), DEFAULT_TEMPLATE_CACHE_SIZE_OSGi);
+ }
- return System.getProperty(propertyName);
+ int getNodeDeduplicationCacheSize() {
+ return roundToNextPowerOfTwo(toInteger(getCacheSize(NODE_DEDUPLICATION_CACHE_SIZE), DEFAULT_NODE_CACHE_SIZE_OSGi));
}
- private static long getBlobGcMaxAge(ComponentContext context) {
- return toLong(property(PROP_BLOB_GC_MAX_AGE, context), DEFAULT_BLOB_GC_MAX_AGE);
+ boolean getPauseCompaction() {
+ return toBoolean(property(PAUSE_COMPACTION), PAUSE_DEFAULT);
}
- private static int getSegmentCacheSize(ComponentContext context) {
- return toInteger(getCacheSize(SEGMENT_CACHE_SIZE, context), DEFAULT_SEGMENT_CACHE_MB);
+ int getRetryCount() {
+ return toInteger(property(COMPACTION_RETRY_COUNT), RETRY_COUNT_DEFAULT);
}
- private static int getStringCacheSize(ComponentContext context) {
- return toInteger(getCacheSize(STRING_CACHE_SIZE, context), DEFAULT_STRING_CACHE_MB);
+ int getForceCompactionTimeout() {
+ return toInteger(property(COMPACTION_FORCE_TIMEOUT), FORCE_TIMEOUT_DEFAULT);
}
- private static int getTemplateCacheSize(ComponentContext context) {
- return toInteger(getCacheSize(TEMPLATE_CACHE_SIZE, context), DEFAULT_TEMPLATE_CACHE_MB);
+ int getRetainedGenerations() {
+ return toInteger(property(RETAINED_GENERATIONS), RETAINED_GENERATIONS_DEFAULT);
}
- private static int getStringDeduplicationCacheSize(ComponentContext context) {
- return toInteger(getCacheSize(STRING_DEDUPLICATION_CACHE_SIZE, context), DEFAULT_STRING_CACHE_SIZE_OSGi);
+ long getSizeDeltaEstimation() {
+ return toLong(property(COMPACTION_SIZE_DELTA_ESTIMATION), SIZE_DELTA_ESTIMATION_DEFAULT);
}
- private static int getTemplateDeduplicationCacheSize(ComponentContext context) {
- return toInteger(getCacheSize(TEMPLATE_DEDUPLICATION_CACHE_SIZE, context), DEFAULT_TEMPLATE_CACHE_SIZE_OSGi);
+ int getMemoryThreshold() {
+ return toInteger(property(MEMORY_THRESHOLD), MEMORY_THRESHOLD_DEFAULT);
}
- private static int getNodeDeduplicationCacheSize(ComponentContext context) {
- // Round to the next power of 2
- int size = Math.max(1,
- toInteger(getCacheSize(NODE_DEDUPLICATION_CACHE_SIZE, context), DEFAULT_NODE_CACHE_SIZE_OSGi));
- return 1 << (32 - Integer.numberOfLeadingZeros(size - 1));
+ boolean getDisableEstimation() {
+ return toBoolean(property(COMPACTION_DISABLE_ESTIMATION), DISABLE_ESTIMATION_DEFAULT);
}
- private static int getMaxFileSize(ComponentContext context) {
- return toInteger(property(SIZE, context), DEFAULT_MAX_FILE_SIZE);
+ String getCompactionGainThreshold() {
+ return property("compaction.gainThreshold");
}
- static String property(String name, ComponentContext context) {
- return lookupConfigurationThenFramework(context, name);
+ long getGCProcessLog() {
+ return toLong(property(GC_PROGRESS_LOG), GC_PROGRESS_LOG_DEFAULT);
}
- private static String sanitizeRole(String role) {
- return role.replaceAll(":", "-");
+ int getMaxFileSize() {
+ return toInteger(property(SIZE), DEFAULT_MAX_FILE_SIZE);
+ }
+
+ String getMode() {
+ String mode = property(MODE);
+ if (mode != null) {
+ return mode;
+ }
+ return System.getProperty(MODE, System.getProperty("sun.arch.data.model", "32"));
+ }
+
+ boolean getMemoryMapping() {
+ return getMode().equals("64");
+ }
+
+ long getBlobSnapshotInterval() {
+ return toLong(property(PROP_BLOB_SNAPSHOT_INTERVAL), DEFAULT_BLOB_SNAPSHOT_INTERVAL);
+ }
+
+ boolean isStandbyInstance() {
+ return toBoolean(property(STANDBY), false);
+ }
+
+ boolean hasCustomBlobStore() {
+ return toBoolean(property(CUSTOM_BLOB_STORE), false);
+ }
+
+ long getBlobGcMaxAge() {
+ return toLong(property(PROP_BLOB_GC_MAX_AGE), DEFAULT_BLOB_GC_MAX_AGE);
}
- private static String appendRole(@Nonnull String name, @Nullable String role) {
+ boolean isPrimarySegmentStore() {
+ return role == null;
+ }
+
+ private String appendRole(String name) {
if (role == null) {
return name;
} else {
- return name + "-" + sanitizeRole(role);
+ return name + "-" + role;
}
}
- static Closeable asCloseable(final Registration r) {
- return new Closeable() {
+ private File getBaseDirectory() {
+ String directory = property(DIRECTORY);
+ if (directory != null) {
+ return new File(directory);
+ }
+ return new File("tarmk");
+ }
- @Override
- public void close() {
- r.unregister();
- }
+ private String getCacheSize(String propertyName) {
+ String cacheSize = property(propertyName);
+ if (cacheSize != null) {
+ return cacheSize;
+ }
+ return System.getProperty(propertyName);
+ }
- };
+}
+
+/**
+ * Performs registrations of services and MBean in a uniform way. Augments
+ * the metadata of services and MBeans with an optionally provided role
+ * name.
+ */
+class Registrations {
+
+ private final Whiteboard whiteboard;
+
+ private final String role;
+
+ Registrations(Whiteboard whiteboard, String role) {
+ this.whiteboard = whiteboard;
+ this.role = role;
}
- private static Closeable asCloseable(final AbstractServiceTracker<?> t) {
- return new Closeable() {
+ <T> Registration registerMBean(Class<T> clazz, T bean, String type, String name) {
+ return registerMBean(clazz, bean, type, name, new HashMap<String, String>());
+ }
- @Override
- public void close() {
- t.stop();
- }
+ <T> Registration registerMBean(Class<T> clazz, T bean, String type, String name, Map<String, String> attributes) {
+ return WhiteboardUtils.registerMBean(whiteboard, clazz, bean, type, maybeAppendRole(name), maybePutRoleAttribute(attributes));
+ }
- };
+ <T> Registration register(Class<T> clazz, T service) {
+ return whiteboard.register(clazz, service, new HashMap<>());
}
- private static Closeable asCloseable(final ObserverTracker t) {
- return new Closeable() {
+ <T> Registration register(Class<T> clazz, T service, Map<String, Object> properties) {
+ return whiteboard.register(clazz, service, maybePutRoleProperty(properties));
+ }
- @Override
- public void close() {
- t.stop();
- }
+ private String maybeAppendRole(String name) {
+ if (role != null) {
+ return name + " - " + role;
+ }
+ return name;
+ }
- };
+ private String jmxRole() {
+ return role.replaceAll(":", "-");
}
- private static boolean isPrimarySegmentStore(String role) {
- return role == null;
+ private Map<String, String> maybePutRoleAttribute(Map<String, String> attributes) {
+ if (role != null) {
+ attributes.put("role", jmxRole());
+ }
+ return attributes;
}
-}
+ private Map<String, Object> maybePutRoleProperty(Map<String, Object> attributes) {
+ if (role != null) {
+ attributes.put("role", role);
+ }
+ return attributes;
+ }
+
+}
\ No newline at end of file