You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by fr...@apache.org on 2016/03/07 13:27:52 UTC
svn commit: r1733913 - in
/jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment:
SegmentNodeStoreBuilder.java SegmentNodeStoreService.java
Author: frm
Date: Mon Mar 7 12:27:52 2016
New Revision: 1733913
URL: http://svn.apache.org/viewvc?rev=1733913&view=rev
Log:
OAK-4089 - Don't start the observation susbsystem when starting the NodeStore in standby mode
Modified:
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java
jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
Modified: jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java?rev=1733913&r1=1733912&r2=1733913&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java (original)
+++ jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreBuilder.java Mon Mar 7 12:27:52 2016
@@ -34,18 +34,9 @@ public class SegmentNodeStoreBuilder {
private boolean isCreated;
- private boolean hasCompactionStrategy;
- private boolean pauseCompaction;
- private boolean cloneBinaries;
- private String cleanup;
- private long cleanupTs;
- private byte memoryThreshold;
- private int lockWaitTime;
- private int retryCount;
- private boolean forceAfterFail;
- private boolean persistCompactionMap;
- private byte gainThreshold;
- private CompactionStrategy compactionStrategy;
+ private CompactionStrategy compactionStrategy = NO_COMPACTION;
+
+ private volatile SegmentNodeStore segmentNodeStore;
static SegmentNodeStoreBuilder newSegmentNodeStore(SegmentStore store) {
return new SegmentNodeStoreBuilder(store);
@@ -65,21 +56,44 @@ public class SegmentNodeStoreBuilder {
forceAfterFail, persistCompactionMap, GAIN_THRESHOLD_DEFAULT);
}
+ public SegmentNodeStoreBuilder withCompactionStrategy(CompactionStrategy compactionStrategy) {
+ this.compactionStrategy = compactionStrategy;
+ return this;
+ }
+
public SegmentNodeStoreBuilder withCompactionStrategy(
- boolean pauseCompaction, boolean cloneBinaries, String cleanup,
- long cleanupTs, byte memoryThreshold, final int lockWaitTime,
- int retryCount, boolean forceAfterFail, boolean persistCompactionMap, byte gainThreshold) {
- this.hasCompactionStrategy = true;
- this.pauseCompaction = pauseCompaction;
- this.cloneBinaries = cloneBinaries;
- this.cleanup = cleanup;
- this.cleanupTs = cleanupTs;
- this.memoryThreshold = memoryThreshold;
- this.lockWaitTime = lockWaitTime;
- this.retryCount = retryCount;
- this.forceAfterFail = forceAfterFail;
- this.persistCompactionMap = persistCompactionMap;
- this.gainThreshold = gainThreshold;
+ boolean pauseCompaction,
+ boolean cloneBinaries,
+ String cleanup,
+ long cleanupTs,
+ byte memoryThreshold,
+ final int lockWaitTime,
+ int retryCount,
+ boolean forceAfterFail,
+ boolean persistCompactionMap,
+ byte gainThreshold) {
+
+ compactionStrategy = new CompactionStrategy(
+ pauseCompaction,
+ cloneBinaries,
+ CleanupType.valueOf(cleanup),
+ cleanupTs,
+ memoryThreshold) {
+
+ @Override
+ public boolean compacted(Callable<Boolean> setHead) throws Exception {
+ // Need to guard against concurrent commits to avoid
+ // mixed segments. See OAK-2192.
+ return segmentNodeStore.locked(setHead, lockWaitTime, SECONDS);
+ }
+
+ };
+
+ compactionStrategy.setRetryCount(retryCount);
+ compactionStrategy.setForceAfterFail(forceAfterFail);
+ compactionStrategy.setPersistCompactionMap(persistCompactionMap);
+ compactionStrategy.setGainThreshold(gainThreshold);
+
return this;
}
@@ -92,28 +106,8 @@ public class SegmentNodeStoreBuilder {
public SegmentNodeStore create() {
checkState(!isCreated);
isCreated = true;
- final SegmentNodeStore segmentStore = new SegmentNodeStore(store, true);
- if (hasCompactionStrategy) {
- compactionStrategy = new CompactionStrategy(pauseCompaction,
- cloneBinaries, CleanupType.valueOf(cleanup), cleanupTs,
- memoryThreshold) {
-
- @Override
- public boolean compacted(Callable<Boolean> setHead)
- throws Exception {
- // Need to guard against concurrent commits to avoid
- // mixed segments. See OAK-2192.
- return segmentStore.locked(setHead, lockWaitTime, SECONDS);
- }
- };
- compactionStrategy.setRetryCount(retryCount);
- compactionStrategy.setForceAfterFail(forceAfterFail);
- compactionStrategy.setPersistCompactionMap(persistCompactionMap);
- compactionStrategy.setGainThreshold(gainThreshold);
- } else {
- compactionStrategy = NO_COMPACTION;
- }
- return segmentStore;
+ segmentNodeStore = new SegmentNodeStore(store, true);
+ return segmentNodeStore;
}
}
Modified: jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java?rev=1733913&r1=1733912&r2=1733913&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java (original)
+++ jackrabbit/oak/trunk/oak-segment/src/main/java/org/apache/jackrabbit/oak/plugins/segment/SegmentNodeStoreService.java Mon Mar 7 12:27:52 2016
@@ -18,7 +18,7 @@ package org.apache.jackrabbit.oak.plugin
import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.emptyMap;
-import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toBoolean;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toInteger;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
@@ -32,6 +32,7 @@ import static org.apache.jackrabbit.oak.
import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.PERSIST_COMPACTION_MAP_DEFAULT;
import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.RETRY_COUNT_DEFAULT;
import static org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.TIMESTAMP_DEFAULT;
+import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
@@ -42,9 +43,9 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FilenameUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.ConfigurationPolicy;
@@ -70,6 +71,7 @@ import org.apache.jackrabbit.oak.plugins
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils.SharedStoreRecordType;
import org.apache.jackrabbit.oak.plugins.identifier.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy;
+import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategy.CleanupType;
import org.apache.jackrabbit.oak.plugins.segment.compaction.CompactionStrategyMBean;
import org.apache.jackrabbit.oak.plugins.segment.compaction.DefaultCompactionStrategyMBean;
import org.apache.jackrabbit.oak.plugins.segment.file.FileStore;
@@ -248,7 +250,7 @@ public class SegmentNodeStoreService ext
private FileStore store;
- private volatile SegmentNodeStore delegate;
+ private volatile SegmentNodeStore segmentNodeStore;
private ObserverTracker observerTracker;
@@ -256,6 +258,8 @@ public class SegmentNodeStoreService ext
private ComponentContext context;
+ private CompactionStrategy compactionStrategy;
+
@Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
policy = ReferencePolicy.DYNAMIC, target = ONLY_STANDALONE_TARGET)
private volatile BlobStore blobStore;
@@ -291,12 +295,12 @@ public class SegmentNodeStoreService ext
@Override
protected SegmentNodeStore getNodeStore() {
- checkState(delegate != null, "service must be activated when used");
- return delegate;
+ checkState(segmentNodeStore != null, "service must be activated when used");
+ return segmentNodeStore;
}
@Activate
- private void activate(ComponentContext context) throws IOException {
+ public void activate(ComponentContext context) throws IOException {
this.context = context;
this.customBlobStore = Boolean.parseBoolean(property(CUSTOM_BLOB_STORE));
@@ -307,164 +311,250 @@ public class SegmentNodeStoreService ext
}
}
- public void registerNodeStore() throws IOException {
+ protected void bindBlobStore(BlobStore blobStore) throws IOException {
+ this.blobStore = blobStore;
+ registerNodeStore();
+ }
+
+ protected void unbindBlobStore(BlobStore blobStore){
+ this.blobStore = null;
+ unregisterNodeStore();
+ }
+
+ @Deactivate
+ public void deactivate() {
+ unregisterNodeStore();
+
+ synchronized (this) {
+ if (observerTracker != null) {
+ observerTracker.stop();
+ }
+ if (gcMonitor != null) {
+ gcMonitor.stop();
+ }
+ segmentNodeStore = null;
+
+ if (store != null) {
+ store.close();
+ store = null;
+ }
+ }
+ }
+
+ private synchronized void registerNodeStore() throws IOException {
if (registerSegmentStore()) {
- boolean standby = toBoolean(property(STANDBY), false);
- providerRegistration = context.getBundleContext().registerService(
- SegmentStoreProvider.class.getName(), this, null);
- if (!standby) {
+ if (toBoolean(property(STANDBY), false)) {
+ return;
+ }
+
+ if (registerSegmentNodeStore()) {
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(Constants.SERVICE_PID, SegmentNodeStore.class.getName());
props.put("oak.nodestore.description", new String[]{"nodeStoreType=segment"});
- storeRegistration = context.getBundleContext().registerService(
- NodeStore.class.getName(), this, props);
+ storeRegistration = context.getBundleContext().registerService(NodeStore.class.getName(), this, props);
}
}
}
- public synchronized boolean registerSegmentStore() throws IOException {
+ private boolean registerSegmentStore() throws IOException {
if (context == null) {
log.info("Component still not activated. Ignoring the initialization call");
return false;
}
- Dictionary<?, ?> properties = context.getProperties();
- name = String.valueOf(properties.get(NAME));
-
- String directory = property(DIRECTORY);
- if (directory == null) {
- directory = "tarmk";
- }else{
- directory = FilenameUtils.concat(directory, "segmentstore");
- }
- String mode = property(MODE);
- if (mode == null) {
- mode = System.getProperty(MODE,
- System.getProperty("sun.arch.data.model", "32"));
- }
-
- String size = property(SIZE);
- if (size == null) {
- size = System.getProperty(SIZE, "256");
- }
-
- String cache = property(CACHE);
- if (cache == null) {
- cache = System.getProperty(CACHE);
- }
-
- boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION),
- PAUSE_DEFAULT);
- boolean cloneBinaries = toBoolean(
- property(COMPACTION_CLONE_BINARIES),
- CLONE_BINARIES_DEFAULT);
- long cleanupTs = toLong(property(COMPACTION_CLEANUP_TIMESTAMP),
- TIMESTAMP_DEFAULT);
- int retryCount = toInteger(property(COMPACTION_RETRY_COUNT),
- RETRY_COUNT_DEFAULT);
- boolean forceCommit = toBoolean(property(COMPACTION_FORCE_AFTER_FAIL),
- FORCE_AFTER_FAIL_DEFAULT);
- final int lockWaitTime = toInteger(property(COMPACTION_LOCK_WAIT_TIME),
- COMPACTION_LOCK_WAIT_TIME_DEFAULT);
- boolean persistCompactionMap = toBoolean(property(PERSIST_COMPACTION_MAP),
- PERSIST_COMPACTION_MAP_DEFAULT);
- String cleanup = property(COMPACTION_CLEANUP);
- if (cleanup == null) {
- cleanup = CLEANUP_DEFAULT.toString();
- }
-
- String memoryThresholdS = property(COMPACTION_MEMORY_THRESHOLD);
- byte memoryThreshold = MEMORY_THRESHOLD_DEFAULT;
- if (memoryThresholdS != null) {
- memoryThreshold = Byte.valueOf(memoryThresholdS);
- }
-
- String gainThresholdS = property(COMPACTION_GAIN_THRESHOLD);
- byte gainThreshold = GAIN_THRESHOLD_DEFAULT;
- if (gainThresholdS != null) {
- gainThreshold = Byte.valueOf(gainThresholdS);
- }
+ OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
- final long blobGcMaxAgeInSecs = toLong(property(PROP_BLOB_GC_MAX_AGE), DEFAULT_BLOB_GC_MAX_AGE);
+ // Listen for GCMonitor services
- OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
gcMonitor = new GCMonitorTracker();
gcMonitor.start(whiteboard);
- Builder storeBuilder = FileStore.newFileStore(new File(directory))
- .withCacheSize(Integer.parseInt(cache))
- .withMaxFileSize(Integer.parseInt(size))
- .withMemoryMapping("64".equals(mode))
+
+ // Build the FileStore
+
+ Builder builder = FileStore.newFileStore(getDirectory())
+ .withCacheSize(getCacheSize())
+ .withMaxFileSize(getMaxFileSize())
+ .withMemoryMapping(getMode().equals("64"))
.withGCMonitor(gcMonitor)
.withStatisticsProvider(statisticsProvider);
+
if (customBlobStore) {
log.info("Initializing SegmentNodeStore with BlobStore [{}]", blobStore);
- store = storeBuilder.withBlobStore(blobStore).create();
- } else {
- store = storeBuilder.create();
+ builder.withBlobStore(blobStore);
}
- SegmentNodeStoreBuilder nodeStoreBuilder = SegmentNodeStore
- .newSegmentNodeStore(store);
- nodeStoreBuilder.withCompactionStrategy(pauseCompaction, cloneBinaries,
- cleanup, cleanupTs, memoryThreshold, lockWaitTime, retryCount,
- forceCommit, persistCompactionMap, gainThreshold);
- delegate = nodeStoreBuilder.create();
- CompactionStrategy compactionStrategy = nodeStoreBuilder
- .getCompactionStrategy();
+ store = builder.create();
+
+ // Create a compaction strategy
+
+ compactionStrategy = newCompactionStrategy();
+
+ // Expose an MBean to provide information about the compaction strategy
+
+ compactionStrategyRegistration = registerMBean(
+ whiteboard,
+ CompactionStrategyMBean.class,
+ new DefaultCompactionStrategyMBean(compactionStrategy),
+ CompactionStrategyMBean.TYPE,
+ "Segment node store compaction strategy settings"
+ );
+
+ // Let the FileStore be aware of the compaction strategy
+
store.setCompactionStrategy(compactionStrategy);
+ // Expose stats about the segment cache
+
CacheStats segmentCacheStats = store.getTracker().getSegmentCacheStats();
- segmentCacheMBean = registerMBean(whiteboard, CacheStatsMBean.class,
+
+ segmentCacheMBean = registerMBean(
+ whiteboard,
+ CacheStatsMBean.class,
segmentCacheStats,
- CacheStats.TYPE, segmentCacheStats.getName());
+ CacheStats.TYPE,
+ segmentCacheStats.getName()
+ );
+
+ // Expose stats about the string cache, if available
CacheStats stringCacheStats = store.getTracker().getStringCacheStats();
+
if (stringCacheStats != null) {
- stringCacheMBean = registerMBean(whiteboard, CacheStatsMBean.class,
- stringCacheStats,
- CacheStats.TYPE, stringCacheStats.getName());
+ stringCacheMBean = registerMBean(
+ whiteboard,
+ CacheStatsMBean.class,
+ stringCacheStats,CacheStats.TYPE,
+ stringCacheStats.getName()
+ );
}
- FileStoreGCMonitor fsgcMonitor = new FileStoreGCMonitor(Clock.SIMPLE);
- fsgcMonitorMBean = new CompositeRegistration(
- whiteboard.register(GCMonitor.class, fsgcMonitor, emptyMap()),
- registerMBean(whiteboard, GCMonitorMBean.class, fsgcMonitor, GCMonitorMBean.TYPE,
- "File Store garbage collection monitor"),
- scheduleWithFixedDelay(whiteboard, fsgcMonitor, 1));
-
- observerTracker = new ObserverTracker(delegate);
- observerTracker.start(context.getBundleContext());
+ // Listen for Executor services on the whiteboard
executor = new WhiteboardExecutor();
executor.start(whiteboard);
- checkpointRegistration = registerMBean(whiteboard, CheckpointMBean.class, new SegmentCheckpointMBean(delegate),
- CheckpointMBean.TYPE, "Segment node store checkpoint management");
+ // Expose an MBean to trigger garbage collection
+
+ Runnable triggerGarbageCollection = new Runnable() {
- RevisionGC revisionGC = new RevisionGC(new Runnable() {
@Override
public void run() {
store.gc();
}
- }, executor);
- revisionGCRegistration = registerMBean(whiteboard, RevisionGCMBean.class, revisionGC,
- RevisionGCMBean.TYPE, "Segment node store revision garbage collection");
- // ensure a clusterId is initialized
+ };
+
+ revisionGCRegistration = registerMBean(
+ whiteboard,
+ RevisionGCMBean.class,
+ new RevisionGC(triggerGarbageCollection, executor),
+ RevisionGCMBean.TYPE,
+ "Segment node store revision garbage collection"
+ );
+
+ // Expose statistics about the FileStore
+
+ fileStoreStatsMBean = registerMBean(
+ whiteboard,
+ FileStoreStatsMBean.class,
+ store.getStats(),
+ FileStoreStatsMBean.TYPE,
+ "FileStore statistics"
+ );
+
+ // Register a monitor for the garbage collection of the FileStore
+
+ FileStoreGCMonitor fsgcm = new FileStoreGCMonitor(Clock.SIMPLE);
+
+ fsgcMonitorMBean = new CompositeRegistration(
+ whiteboard.register(GCMonitor.class, fsgcm, emptyMap()),
+ registerMBean(
+ whiteboard,
+ GCMonitorMBean.class,
+ fsgcm,
+ GCMonitorMBean.TYPE,
+ "File Store garbage collection monitor"
+ ),
+ scheduleWithFixedDelay(whiteboard, fsgcm, 1)
+ );
+
+ // Register a factory service to expose the FileStore
+
+ providerRegistration = context.getBundleContext().registerService(SegmentStoreProvider.class.getName(), this, null);
+
+ return true;
+ }
+
+ private CompactionStrategy newCompactionStrategy() {
+ boolean pauseCompaction = toBoolean(property(PAUSE_COMPACTION), PAUSE_DEFAULT);
+ boolean cloneBinaries = toBoolean(property(COMPACTION_CLONE_BINARIES), CLONE_BINARIES_DEFAULT);
+ long cleanupTs = toLong(property(COMPACTION_CLEANUP_TIMESTAMP), TIMESTAMP_DEFAULT);
+ int retryCount = toInteger(property(COMPACTION_RETRY_COUNT), RETRY_COUNT_DEFAULT);
+ boolean forceAfterFail = toBoolean(property(COMPACTION_FORCE_AFTER_FAIL), FORCE_AFTER_FAIL_DEFAULT);
+ final int lockWaitTime = toInteger(property(COMPACTION_LOCK_WAIT_TIME), COMPACTION_LOCK_WAIT_TIME_DEFAULT);
+ boolean persistCompactionMap = toBoolean(property(PERSIST_COMPACTION_MAP), PERSIST_COMPACTION_MAP_DEFAULT);
+
+ CleanupType cleanupType = getCleanUpType();
+ byte memoryThreshold = getMemoryThreshold();
+ byte gainThreshold = getGainThreshold();
+
+ // This is indeed a dirty hack, but it's needed to break a circular
+ // dependency between different components. The FileStore needs the
+ // CompactionStrategy, the CompactionStrategy needs the
+ // SegmentNodeStore, and the SegmentNodeStore needs the FileStore.
+
+ CompactionStrategy compactionStrategy = new CompactionStrategy(pauseCompaction, cloneBinaries, cleanupType, cleanupTs, memoryThreshold) {
+
+ @Override
+ public boolean compacted(Callable<Boolean> setHead) throws Exception {
+ // Need to guard against concurrent commits to avoid
+ // mixed segments. See OAK-2192.
+ return segmentNodeStore.locked(setHead, lockWaitTime, SECONDS);
+ }
+
+ };
+
+ compactionStrategy.setRetryCount(retryCount);
+ compactionStrategy.setForceAfterFail(forceAfterFail);
+ compactionStrategy.setPersistCompactionMap(persistCompactionMap);
+ compactionStrategy.setGainThreshold(gainThreshold);
+
+ return compactionStrategy;
+ }
+
+ private boolean registerSegmentNodeStore() throws IOException {
+ Dictionary<?, ?> properties = context.getProperties();
+ name = String.valueOf(properties.get(NAME));
+
+ final long blobGcMaxAgeInSecs = toLong(property(PROP_BLOB_GC_MAX_AGE), DEFAULT_BLOB_GC_MAX_AGE);
+
+ OsgiWhiteboard whiteboard = new OsgiWhiteboard(context.getBundleContext());
+
+ SegmentNodeStoreBuilder nodeStoreBuilder = SegmentNodeStore.newSegmentNodeStore(store);
+ nodeStoreBuilder.withCompactionStrategy(compactionStrategy);
+ segmentNodeStore = nodeStoreBuilder.create();
+
+ observerTracker = new ObserverTracker(segmentNodeStore);
+ observerTracker.start(context.getBundleContext());
+
+ checkpointRegistration = registerMBean(whiteboard, CheckpointMBean.class, new SegmentCheckpointMBean(segmentNodeStore),
+ CheckpointMBean.TYPE, "Segment node store checkpoint management");
+
+ // ensure a clusterId is initialized
// and expose it as 'oak.clusterid' repository descriptor
GenericDescriptors clusterIdDesc = new GenericDescriptors();
- clusterIdDesc.put(ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY,
+ clusterIdDesc.put(ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY,
new SimpleValueFactory().createValue(
- ClusterRepositoryInfo.getOrCreateId(delegate)), true, false);
+ ClusterRepositoryInfo.getOrCreateId(segmentNodeStore)), true, false);
whiteboard.register(Descriptors.class, clusterIdDesc, Collections.emptyMap());
-
+
// If a shared data store register the repo id in the data store
String repoId = "";
if (SharedDataStoreUtils.isShared(blobStore)) {
try {
- repoId = ClusterRepositoryInfo.getOrCreateId(delegate);
+ repoId = ClusterRepositoryInfo.getOrCreateId(segmentNodeStore);
((SharedDataStore) blobStore).addMetadataRecord(new ByteArrayInputStream(new byte[0]),
- SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
+ SharedStoreRecordType.REPOSITORY.getNameFromId(repoId));
} catch (Exception e) {
throw new IOException("Could not register a unique repositoryId", e);
}
@@ -472,65 +562,26 @@ public class SegmentNodeStoreService ext
if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
BlobGarbageCollector gc = new MarkSweepGarbageCollector(
- new SegmentBlobReferenceRetriever(store.getTracker()),
- (GarbageCollectableBlobStore) store.getBlobStore(),
- executor, TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs),
- repoId);
-
- blobGCRegistration = registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
- BlobGCMBean.TYPE, "Segment node store blob garbage collection");
+ new SegmentBlobReferenceRetriever(store.getTracker()),
+ (GarbageCollectableBlobStore) store.getBlobStore(),
+ executor,
+ TimeUnit.SECONDS.toMillis(blobGcMaxAgeInSecs),
+ repoId
+ );
+
+ blobGCRegistration = registerMBean(
+ whiteboard,
+ BlobGCMBean.class,
+ new BlobGC(gc, executor),
+ BlobGCMBean.TYPE,
+ "Segment node store blob garbage collection"
+ );
}
- compactionStrategyRegistration = registerMBean(whiteboard,
- CompactionStrategyMBean.class,
- new DefaultCompactionStrategyMBean(compactionStrategy),
- CompactionStrategyMBean.TYPE,
- "Segment node store compaction strategy settings");
-
- fileStoreStatsMBean = registerMBean(whiteboard,
- FileStoreStatsMBean.class,
- store.getStats(),
- FileStoreStatsMBean.TYPE,
- "FileStore statistics");
-
log.info("SegmentNodeStore initialized");
return true;
}
- private String property(String name) {
- return lookupConfigurationThenFramework(context, name);
- }
-
- @Deactivate
- public void deactivate() {
- unregisterNodeStore();
-
- synchronized (this) {
- if (observerTracker != null) {
- observerTracker.stop();
- }
- if (gcMonitor != null) {
- gcMonitor.stop();
- }
- delegate = null;
-
- if (store != null) {
- store.close();
- store = null;
- }
- }
- }
-
- protected void bindBlobStore(BlobStore blobStore) throws IOException {
- this.blobStore = blobStore;
- registerNodeStore();
- }
-
- protected void unbindBlobStore(BlobStore blobStore){
- this.blobStore = null;
- unregisterNodeStore();
- }
-
private void unregisterNodeStore() {
if (segmentCacheMBean != null) {
segmentCacheMBean.unregister();
@@ -578,6 +629,92 @@ public class SegmentNodeStoreService ext
}
}
+ private File getBaseDirectory() {
+ String directory = property(DIRECTORY);
+
+ if (directory != null) {
+ return new File(directory);
+ }
+
+ return new File("tarmk");
+ }
+
+ private File getDirectory() {
+ return new File(getBaseDirectory(), "segmentstore");
+ }
+
+ private String getMode() {
+ String mode = property(MODE);
+
+ if (mode != null) {
+ return mode;
+ }
+
+ return System.getProperty(MODE, System.getProperty("sun.arch.data.model", "32"));
+ }
+
+ private String getCacheSizeProperty() {
+ String cache = property(CACHE);
+
+ if (cache != null) {
+ return cache;
+ }
+
+ return System.getProperty(CACHE);
+ }
+
+ private int getCacheSize() {
+ return Integer.parseInt(getCacheSizeProperty());
+ }
+
+ private String getMaxFileSizeProperty() {
+ String size = property(SIZE);
+
+ if (size != null) {
+ return size;
+ }
+
+ return System.getProperty(SIZE, "256");
+ }
+
+ private int getMaxFileSize() {
+ return Integer.parseInt(getMaxFileSizeProperty());
+ }
+
+ private CleanupType getCleanUpType() {
+ String cleanupType = property(COMPACTION_CLEANUP);
+
+ if (cleanupType == null) {
+ return CLEANUP_DEFAULT;
+ }
+
+ return CleanupType.valueOf(cleanupType);
+ }
+
+ private byte getMemoryThreshold() {
+ String mt = property(COMPACTION_MEMORY_THRESHOLD);
+
+ if (mt == null) {
+ return MEMORY_THRESHOLD_DEFAULT;
+ }
+
+ return Byte.valueOf(mt);
+ }
+
+ private byte getGainThreshold() {
+ String gt = property(COMPACTION_GAIN_THRESHOLD);
+
+ if (gt == null) {
+ return GAIN_THRESHOLD_DEFAULT;
+ }
+
+ return Byte.valueOf(gt);
+ }
+
+ private String property(String name) {
+ return lookupConfigurationThenFramework(context, name);
+ }
+
/**
* needed for situations where you have to unwrap the
* SegmentNodeStoreService, to get the SegmentStore, like the failover
@@ -598,6 +735,6 @@ public class SegmentNodeStoreService ext
@Override
public String toString() {
- return name + ": " + delegate;
+ return name + ": " + segmentNodeStore;
}
}