You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/19 13:10:24 UTC
[66/72] [abbrv] incubator-brooklyn git commit: BROOKLYN-162 - jclouds
last few package prefixes needed,
and tidy in core and elsewhere related (or observed in the process)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
new file mode 100644
index 0000000..10f349a
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynMementoPersisterToObjectStore.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.CatalogItem;
+import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler;
+import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento;
+import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.BrooklynObjectType;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.config.StringConfigMap;
+import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessor;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessorWithLock;
+import org.apache.brooklyn.core.mgmt.rebind.PeriodicDeltaChangeListener;
+import org.apache.brooklyn.core.mgmt.rebind.PersisterDeltaImpl;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoImpl;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoManifestImpl;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.xstream.XmlUtil;
+import org.apache.brooklyn.util.exceptions.CompoundRuntimeException;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
+ * {@link PersistenceObjectStore} such as a file system or a jclouds object store */
+public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister {
+
+ // TODO Crazy amount of duplication between handling entity, location, policy, enricher + feed;
+ // Need to remove that duplication.
+
+ // TODO Should stop() take a timeout, and shutdown the executor gracefully?
+
+ private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class);
+
+ public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey(
+ "persister.threadpool.maxSize",
+ "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)",
+ 10);
+
+ public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
+ "persister.maxSerializationAttempts",
+ "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)",
+ 5);
+
+ private final PersistenceObjectStore objectStore;
+ private final MementoSerializer<Object> serializerWithStandardClassLoader;
+
+ private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
+
+ private final ListeningExecutorService executor;
+
+ private volatile boolean writesAllowed = false;
+ private volatile boolean writesShuttingDown = false;
+ private StringConfigMap brooklynProperties;
+
+ private List<Delta> queuedDeltas = new CopyOnWriteArrayList<BrooklynMementoPersister.Delta>();
+
+ /**
+ * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
+ * for any concurrent call to complete.
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+ public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, StringConfigMap brooklynProperties, ClassLoader classLoader) {
+ this.objectStore = checkNotNull(objectStore, "objectStore");
+ this.brooklynProperties = brooklynProperties;
+
+ int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
+ MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
+ this.serializerWithStandardClassLoader = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
+
+ int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
+
+ objectStore.createSubPath("entities");
+ objectStore.createSubPath("locations");
+ objectStore.createSubPath("policies");
+ objectStore.createSubPath("enrichers");
+ objectStore.createSubPath("feeds");
+ objectStore.createSubPath("catalog");
+
+ // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ?
+ objectStore.createSubPath("plane");
+
+ executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() {
+ @Override public Thread newThread(Runnable r) {
+ // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator
+ return new Thread(r, "brooklyn-persister");
+ }}));
+ }
+
+ public MementoSerializer<Object> getMementoSerializer() {
+ return getSerializerWithStandardClassLoader();
+ }
+
+ protected MementoSerializer<Object> getSerializerWithStandardClassLoader() {
+ return serializerWithStandardClassLoader;
+ }
+
+ protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, BrooklynObjectType type, String objectId) {
+ ClassLoader cl = getCustomClassLoaderForBrooklynObject(lookupContext, type, objectId);
+ if (cl==null) return serializerWithStandardClassLoader;
+ return getSerializerWithCustomClassLoader(lookupContext, cl);
+ }
+
+ protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, ClassLoader classLoader) {
+ int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
+ MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
+ MementoSerializer<Object> result = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
+ result.setLookupContext(lookupContext);
+ return result;
+ }
+
+ @Nullable protected ClassLoader getCustomClassLoaderForBrooklynObject(LookupContext lookupContext, BrooklynObjectType type, String objectId) {
+ BrooklynObject item = lookupContext.peek(type, objectId);
+ String catalogItemId = (item == null) ? null : item.getCatalogItemId();
+ // TODO enrichers etc aren't yet known -- would need to backtrack to the entity to get them from bundles
+ if (catalogItemId == null) {
+ return null;
+ }
+ // See RebindIteration.BrooklynObjectInstantiator.load(), for handling where catalog item is missing;
+ // similar logic here.
+ CatalogItem<?, ?> catalogItem = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId);
+ if (catalogItem == null) {
+ // TODO do we need to only log once, rather than risk log.warn too often? I think this only happens on rebind, so ok.
+ LOG.warn("Unable to load catalog item "+catalogItemId+" for custom class loader of "+type+" "+objectId+"; will use default class loader");
+ return null;
+ } else {
+ return ClassLoaderFromBrooklynClassLoadingContext.of(CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), catalogItem));
+ }
+ }
+
+ @Override public void enableWriteAccess() {
+ writesAllowed = true;
+ }
+
+ @Override
+ public void disableWriteAccess(boolean graceful) {
+ writesShuttingDown = true;
+ try {
+ writesAllowed = false;
+ // a very long timeout to ensure we don't lose state.
+ // If persisting thousands of entities over slow network to Object Store, could take minutes.
+ waitForWritesCompleted(Duration.ONE_HOUR);
+
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ writesShuttingDown = false;
+ }
+ }
+
+ @Override
+ public void stop(boolean graceful) {
+ disableWriteAccess(graceful);
+
+ if (executor != null) {
+ if (graceful) {
+ executor.shutdown();
+ try {
+ // should be quick because we've just turned off writes, waiting for their completion
+ executor.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ } else {
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ public PersistenceObjectStore getObjectStore() {
+ return objectStore;
+ }
+
+ protected StoreObjectAccessorWithLock getWriter(String path) {
+ String id = path.substring(path.lastIndexOf('/')+1);
+ synchronized (writers) {
+ StoreObjectAccessorWithLock writer = writers.get(id);
+ if (writer == null) {
+ writer = new StoreObjectAccessorLocking( objectStore.newAccessor(path) );
+ writers.put(id, writer);
+ }
+ return writer;
+ }
+ }
+
+ private Map<String,String> makeIdSubPathMap(Iterable<String> subPathLists) {
+ Map<String,String> result = MutableMap.of();
+ for (String subpath: subPathLists) {
+ String id = subpath;
+ id = id.substring(id.lastIndexOf('/')+1);
+ id = id.substring(id.lastIndexOf('\\')+1);
+ // assumes id is the filename; should work even if not, as id is later read from xpath
+ // but you'll get warnings (and possibility of loss if there is a collision)
+ result.put(id, subpath);
+ }
+ return result;
+ }
+
+ protected BrooklynMementoRawData listMementoSubPathsAsData(final RebindExceptionHandler exceptionHandler) {
+ final BrooklynMementoRawData.Builder subPathDataBuilder = BrooklynMementoRawData.builder();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER)
+ subPathDataBuilder.putAll(type, makeIdSubPathMap(objectStore.listContentsWithSubPath(type.getSubPathName())));
+
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadMementoFailed(BrooklynObjectType.UNKNOWN, "Failed to list files", e);
+ throw new IllegalStateException("Failed to list memento files in "+objectStore, e);
+ }
+
+ BrooklynMementoRawData subPathData = subPathDataBuilder.build();
+ LOG.debug("Loaded rebind lists; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch),
+ subPathData.getEntities().size(), subPathData.getLocations().size(), subPathData.getPolicies().size(), subPathData.getEnrichers().size(),
+ subPathData.getFeeds().size(), subPathData.getCatalogItems().size(),
+ objectStore.getSummaryName() });
+
+ return subPathData;
+ }
+
+ public BrooklynMementoRawData loadMementoRawData(final RebindExceptionHandler exceptionHandler) {
+ BrooklynMementoRawData subPathData = listMementoSubPathsAsData(exceptionHandler);
+
+ final BrooklynMementoRawData.Builder builder = BrooklynMementoRawData.builder();
+
+ Visitor loaderVisitor = new Visitor() {
+ @Override
+ public void visit(BrooklynObjectType type, String id, String contentsSubpath) throws Exception {
+ String contents = null;
+ try {
+ contents = read(contentsSubpath);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadMementoFailed(type, "memento "+id+" read error", e);
+ }
+
+ String xmlId = (String) XmlUtil.xpath(contents, "/"+type.toCamelCase()+"/id");
+ String safeXmlId = Strings.makeValidFilename(xmlId);
+ if (!Objects.equal(id, safeXmlId))
+ LOG.warn("ID mismatch on "+type.toCamelCase()+", "+id+" from path, "+safeXmlId+" from xml");
+
+ builder.put(type, xmlId, contents);
+ }
+ };
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ visitMemento("loading raw", subPathData, loaderVisitor, exceptionHandler);
+
+ BrooklynMementoRawData result = builder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded rebind raw data; took {}; {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntities().size(),
+ result.getLocations().size(), result.getPolicies().size(), result.getEnrichers().size(),
+ result.getFeeds().size(), result.getCatalogItems().size(),
+ objectStore.getSummaryName() });
+ }
+
+ return result;
+ }
+
+ @Override
+ public BrooklynMementoManifest loadMementoManifest(BrooklynMementoRawData mementoData, final RebindExceptionHandler exceptionHandler) throws IOException {
+ if (mementoData==null)
+ mementoData = loadMementoRawData(exceptionHandler);
+
+ final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
+
+ Visitor visitor = new Visitor() {
+ @Override
+ public void visit(BrooklynObjectType type, String objectId, final String contents) throws Exception {
+ final String prefix = "/"+type.toCamelCase()+"/";
+
+ class XPathHelper {
+ private String get(String innerPath) {
+ return (String) XmlUtil.xpath(contents, prefix+innerPath);
+ }
+ }
+ XPathHelper x = new XPathHelper();
+
+ switch (type) {
+ case ENTITY:
+ builder.entity(x.get("id"), x.get("type"),
+ Strings.emptyToNull(x.get("parent")), Strings.emptyToNull(x.get("catalogItemId")));
+ break;
+ case LOCATION:
+ case POLICY:
+ case ENRICHER:
+ case FEED:
+ builder.putType(type, x.get("id"), x.get("type"));
+ break;
+ case CATALOG_ITEM:
+ try {
+ CatalogItemMemento memento = (CatalogItemMemento) getSerializerWithStandardClassLoader().fromString(contents);
+ if (memento == null) {
+ LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing");
+ } else {
+ builder.catalogItem(memento);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" early catalog deserialization error", e);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected brooklyn type: "+type);
+ }
+ }
+ };
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ visitMemento("manifests", mementoData, visitor, exceptionHandler);
+
+ BrooklynMementoManifest result = builder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded rebind manifests; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch),
+ result.getEntityIdToManifest().size(), result.getLocationIdToType().size(),
+ result.getPolicyIdToType().size(), result.getEnricherIdToType().size(), result.getFeedIdToType().size(),
+ result.getCatalogItemMementos().size(),
+ objectStore.getSummaryName() });
+ }
+
+ return result;
+ }
+
+ @Override
+ public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, final LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException {
+ if (mementoData==null)
+ mementoData = loadMementoRawData(exceptionHandler);
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+
+ final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
+
+ Visitor visitor = new Visitor() {
+ @Override
+ public void visit(BrooklynObjectType type, String objectId, String contents) throws Exception {
+ try {
+ Memento memento = (Memento) getSerializerWithCustomClassLoader(lookupContext, type, objectId).fromString(contents);
+ if (memento == null) {
+ LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing");
+ } else {
+ builder.memento(memento);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" deserialization error", e);
+ }
+ }
+
+ };
+
+ // TODO not convinced this is single threaded on reads; maybe should get a new one each time?
+ getSerializerWithStandardClassLoader().setLookupContext(lookupContext);
+ try {
+ visitMemento("deserialization", mementoData, visitor, exceptionHandler);
+ } finally {
+ getSerializerWithStandardClassLoader().unsetLookupContext();
+ }
+
+ BrooklynMemento result = builder.build();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded rebind mementos; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{
+ Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(),
+ result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
+ result.getFeedIds().size(), result.getCatalogItemIds().size(),
+ objectStore.getSummaryName() });
+ }
+
+ return result;
+ }
+
+ protected interface Visitor {
+ public void visit(BrooklynObjectType type, String id, String contents) throws Exception;
+ }
+
+ protected void visitMemento(final String phase, final BrooklynMementoRawData rawData, final Visitor visitor, final RebindExceptionHandler exceptionHandler) {
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ class VisitorWrapper implements Runnable {
+ private final BrooklynObjectType type;
+ private final Map.Entry<String,String> objectIdAndData;
+ public VisitorWrapper(BrooklynObjectType type, Map.Entry<String,String> objectIdAndData) {
+ this.type = type;
+ this.objectIdAndData = objectIdAndData;
+ }
+ public void run() {
+ try {
+ visitor.visit(type, objectIdAndData.getKey(), objectIdAndData.getValue());
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ exceptionHandler.onLoadMementoFailed(type, "memento "+objectIdAndData.getKey()+" "+phase+" error", e);
+ }
+ }
+ }
+
+ for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
+ for (final Map.Entry<String,String> entry : rawData.getObjectsOfType(type).entrySet()) {
+ futures.add(executor.submit(new VisitorWrapper(type, entry)));
+ }
+ }
+
+ try {
+ // Wait for all, failing fast if any exceptions.
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+
+ List<Exception> exceptions = Lists.newArrayList();
+
+ for (ListenableFuture<?> future : futures) {
+ if (future.isDone()) {
+ try {
+ future.get();
+ } catch (InterruptedException e2) {
+ throw Exceptions.propagate(e2);
+ } catch (ExecutionException e2) {
+ LOG.warn("Problem loading memento ("+phase+"): "+e2, e2);
+ exceptions.add(e2);
+ }
+ future.cancel(true);
+ }
+ }
+ if (exceptions.isEmpty()) {
+ throw Exceptions.propagate(e);
+ } else {
+ // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+ throw new CompoundRuntimeException("Problem loading mementos ("+phase+")", exceptions);
+ }
+ }
+ }
+
+ protected void checkWritesAllowed() {
+ if (!writesAllowed && !writesShuttingDown) {
+ throw new IllegalStateException("Writes not allowed in "+this);
+ }
+ }
+
+ /** See {@link BrooklynPersistenceUtils} for conveniences for using this method. */
+ @Override
+ @Beta
+ public void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler) {
+ checkWritesAllowed();
+ try {
+ lock.writeLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+
+ try {
+ objectStore.prepareForMasterUse();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
+ for (Map.Entry<String, String> entry : newMemento.getObjectsOfType(type).entrySet()) {
+ futures.add(asyncPersist(type.getSubPathName(), type, entry.getKey(), entry.getValue(), exceptionHandler));
+ }
+ }
+
+ try {
+ // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
+ // But then propagate failure if any fail. (hence the two calls).
+ Futures.successfulAsList(futures).get();
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) {
+ checkWritesAllowed();
+
+ while (!queuedDeltas.isEmpty()) {
+ Delta extraDelta = queuedDeltas.remove(0);
+ doDelta(extraDelta, exceptionHandler, true);
+ }
+
+ doDelta(delta, exceptionHandler, false);
+ }
+
+ protected void doDelta(Delta delta, PersistenceExceptionHandler exceptionHandler, boolean previouslyQueued) {
+ Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
+
+ if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued ? "previously queued " : "")+"delta of memento in {}: "
+ + "updated {} entities, {} locations, {} policies, {} enrichers, {} catalog items; "
+ + "removed {} entities, {} locations, {} policies, {} enrichers, {} catalog items",
+ new Object[] {Time.makeTimeStringRounded(stopwatch),
+ delta.entities().size(), delta.locations().size(), delta.policies().size(), delta.enrichers().size(), delta.catalogItems().size(),
+ delta.removedEntityIds().size(), delta.removedLocationIds().size(), delta.removedPolicyIds().size(), delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()});
+ }
+
+ @Override
+ public void queueDelta(Delta delta) {
+ queuedDeltas.add(delta);
+ }
+
+ /**
+ * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
+ * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
+ *
+ * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
+ * don't do unnecessary repeated writes of an entity.
+ */
+ private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
+ try {
+ lock.writeLock().lockInterruptibly();
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ try {
+ objectStore.prepareForMasterUse();
+
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ List<ListenableFuture<?>> futures = Lists.newArrayList();
+
+ for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
+ for (Memento entity : delta.getObjectsOfType(type)) {
+ futures.add(asyncPersist(type.getSubPathName(), entity, exceptionHandler));
+ }
+ }
+ for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
+ for (String id : delta.getRemovedIdsOfType(type)) {
+ futures.add(asyncDelete(type.getSubPathName(), id, exceptionHandler));
+ }
+ }
+
+ try {
+ // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
+ // But then propagate failure if any fail. (hence the two calls).
+ Futures.successfulAsList(futures).get();
+ Futures.allAsList(futures).get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+
+ return stopwatch;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ ImmutableSet<StoreObjectAccessorWithLock> wc;
+ synchronized (writers) {
+ wc = ImmutableSet.copyOf(writers.values());
+ }
+ lock.readLock().unlock();
+
+ // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
+ // each writer is now synchronous.
+ for (StoreObjectAccessorWithLock writer : wc) {
+ writer.waitForCurrentWrites(timeout);
+ }
+ } else {
+ throw new TimeoutException("Timeout waiting for writes to "+objectStore);
+ }
+ }
+
+ private String read(String subPath) {
+ StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+ return objectAccessor.get();
+ }
+
+ private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) {
+ try {
+ getWriter(getPath(subPath, memento.getId())).put(getSerializerWithStandardClassLoader().toString(memento));
+ } catch (Exception e) {
+ exceptionHandler.onPersistMementoFailed(memento, e);
+ }
+ }
+
+ private void persist(String subPath, BrooklynObjectType type, String id, String content, PersistenceExceptionHandler exceptionHandler) {
+ try {
+ if (content==null) {
+ LOG.warn("Null content for "+type+" "+id);
+ }
+ getWriter(getPath(subPath, id)).put(content);
+ } catch (Exception e) {
+ exceptionHandler.onPersistRawMementoFailed(type, id, e);
+ }
+ }
+
+ private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) {
+ try {
+ StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
+ w.delete();
+ synchronized (writers) {
+ writers.remove(id);
+ }
+ } catch (Exception e) {
+ exceptionHandler.onDeleteMementoFailed(id, e);
+ }
+ }
+
+ private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ persist(subPath, memento, exceptionHandler);
+ }});
+ }
+
+ private ListenableFuture<?> asyncPersist(final String subPath, final BrooklynObjectType type, final String id, final String content, final PersistenceExceptionHandler exceptionHandler) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ persist(subPath, type, id, content, exceptionHandler);
+ }});
+ }
+
+ private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) {
+ return executor.submit(new Runnable() {
+ public void run() {
+ delete(subPath, id, exceptionHandler);
+ }});
+ }
+
+ private String getPath(String subPath, String id) {
+ return subPath+"/"+Strings.makeValidFilename(id);
+ }
+
+ @Override
+ public String getBackingStoreDescription() {
+ return getObjectStore().getSummaryName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java
new file mode 100644
index 0000000..b3db32c
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/BrooklynPersistenceUtils.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.CatalogItem;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.location.LocationSpec;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
+import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
+import org.apache.brooklyn.api.mgmt.ha.ManagementPlaneSyncRecord;
+import org.apache.brooklyn.api.mgmt.ha.MementoCopyMode;
+import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento;
+import org.apache.brooklyn.api.objs.BrooklynObject;
+import org.apache.brooklyn.api.objs.BrooklynObjectType;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.core.mgmt.ha.ManagementPlaneSyncRecordPersisterToObjectStore;
+import org.apache.brooklyn.core.mgmt.internal.LocalLocationManager;
+import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
+import org.apache.brooklyn.core.mgmt.rebind.PersistenceExceptionHandlerImpl;
+import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer;
+import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformerLoader;
+import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
+import org.apache.brooklyn.core.server.BrooklynServerConfig;
+import org.apache.brooklyn.core.server.BrooklynServerPaths;
+import org.apache.brooklyn.entity.core.Entities;
+import org.apache.brooklyn.entity.core.EntityInternal;
+import org.apache.brooklyn.location.localhost.LocalhostMachineProvisioningLocation;
+import org.apache.brooklyn.util.core.ResourceUtils;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+
+import com.google.common.annotations.Beta;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+
+public class BrooklynPersistenceUtils {
+
+ private static final Logger log = LoggerFactory.getLogger(BrooklynPersistenceUtils.class);
+
+ @Beta
+ public static final List<BrooklynObjectType> STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER = ImmutableList.of(
+ BrooklynObjectType.ENTITY, BrooklynObjectType.LOCATION, BrooklynObjectType.POLICY,
+ BrooklynObjectType.ENRICHER, BrooklynObjectType.FEED, BrooklynObjectType.CATALOG_ITEM);
+
+ /** Creates a {@link PersistenceObjectStore} for general-purpose use. */
+ public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext,
+ String locationSpec, String locationContainer) {
+
+ return newPersistenceObjectStore(managementContext, locationSpec, locationContainer,
+ PersistMode.AUTO, HighAvailabilityMode.STANDBY);
+ }
+
+ /** Creates a {@link PersistenceObjectStore} for use with a specified set of modes. */
+ public static PersistenceObjectStore newPersistenceObjectStore(ManagementContext managementContext,
+ String locationSpec, String locationContainer, PersistMode persistMode, HighAvailabilityMode highAvailabilityMode) {
+ PersistenceObjectStore destinationObjectStore;
+ locationContainer = BrooklynServerPaths.newMainPersistencePathResolver(managementContext).location(locationSpec).dir(locationContainer).resolve();
+
+ Location location = null;
+ if (Strings.isBlank(locationSpec)) {
+ location = managementContext.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)
+ .configure(LocalLocationManager.CREATE_UNMANAGED, true));
+ } else {
+ location = managementContext.getLocationRegistry().resolve(locationSpec, false, null).get();
+ if (!(location instanceof LocationWithObjectStore)) {
+ throw new IllegalArgumentException("Destination location "+location+" does not offer a persistent store");
+ }
+ }
+ destinationObjectStore = ((LocationWithObjectStore)location).newPersistenceObjectStore(locationContainer);
+
+ destinationObjectStore.injectManagementContext(managementContext);
+ destinationObjectStore.prepareForSharedUse(persistMode, highAvailabilityMode);
+ return destinationObjectStore;
+ }
+
+ public static void writeMemento(ManagementContext managementContext, BrooklynMementoRawData memento,
+ PersistenceObjectStore destinationObjectStore) {
+ BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+ destinationObjectStore,
+ ((ManagementContextInternal)managementContext).getBrooklynProperties(),
+ managementContext.getCatalogClassLoader());
+ PersistenceExceptionHandler exceptionHandler = PersistenceExceptionHandlerImpl.builder().build();
+ persister.enableWriteAccess();
+ persister.checkpoint(memento, exceptionHandler);
+ }
+
+ public static void writeManagerMemento(ManagementContext managementContext, ManagementPlaneSyncRecord optionalPlaneRecord,
+ PersistenceObjectStore destinationObjectStore) {
+ if (optionalPlaneRecord != null) {
+ ManagementPlaneSyncRecordPersisterToObjectStore managementPersister = new ManagementPlaneSyncRecordPersisterToObjectStore(
+ managementContext, destinationObjectStore, managementContext.getCatalogClassLoader());
+ managementPersister.checkpoint(optionalPlaneRecord);
+ }
+ }
+
+ public static CompoundTransformer loadTransformer(ResourceUtils resources, String transformationsFileUrl) {
+ if (Strings.isBlank(transformationsFileUrl)) {
+ return CompoundTransformer.NOOP;
+ } else {
+ String contents = resources.getResourceAsString(transformationsFileUrl);
+ return CompoundTransformerLoader.load(contents);
+ }
+ }
+
+ public static Memento newObjectMemento(BrooklynObject instance) {
+ return ((BrooklynObjectInternal)instance).getRebindSupport().getMemento();
+ }
+
+ public static BrooklynMementoRawData newStateMemento(ManagementContext mgmt, MementoCopyMode source) {
+ switch (source) {
+ case LOCAL:
+ return newStateMementoFromLocal(mgmt);
+ case REMOTE:
+ return mgmt.getRebindManager().retrieveMementoRawData();
+ case AUTO:
+ throw new IllegalStateException("Copy mode AUTO not supported here");
+ }
+ throw new IllegalStateException("Should not come here, unknown mode "+source);
+ }
+
+ public static ManagementPlaneSyncRecord newManagerMemento(ManagementContext mgmt, MementoCopyMode source) {
+ switch (source) {
+ case LOCAL:
+ return mgmt.getHighAvailabilityManager().getLastManagementPlaneSyncRecord();
+ case REMOTE:
+ return mgmt.getHighAvailabilityManager().loadManagementPlaneSyncRecord(true);
+ case AUTO:
+ throw new IllegalStateException("Copy mode AUTO not supported here");
+ }
+ throw new IllegalStateException("Should not come here, unknown mode "+source);
+ }
+
+
+ private static BrooklynMementoRawData newStateMementoFromLocal(ManagementContext mgmt) {
+ BrooklynMementoRawData.Builder result = BrooklynMementoRawData.builder();
+ MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(mgmt.getClass().getClassLoader());
+ RetryingMementoSerializer<Object> serializer = new RetryingMementoSerializer<Object>(rawSerializer, 1);
+
+ for (Location instance: mgmt.getLocationManager().getLocations())
+ result.location(instance.getId(), serializer.toString(newObjectMemento(instance)));
+ for (Entity instance: mgmt.getEntityManager().getEntities()) {
+ instance = Entities.deproxy(instance);
+ result.entity(instance.getId(), serializer.toString(newObjectMemento(instance)));
+ for (Feed instanceAdjunct: ((EntityInternal)instance).feeds().getFeeds())
+ result.feed(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct)));
+ for (Enricher instanceAdjunct: instance.getEnrichers())
+ result.enricher(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct)));
+ for (Policy instanceAdjunct: instance.getPolicies())
+ result.policy(instanceAdjunct.getId(), serializer.toString(newObjectMemento(instanceAdjunct)));
+ }
+ for (CatalogItem<?,?> instance: mgmt.getCatalog().getCatalogItems())
+ result.catalogItem(instance.getId(), serializer.toString(newObjectMemento(instance)));
+
+ return result.build();
+ }
+
+ /** generates and writes mementos for the given mgmt context to the given targetStore;
+ * this may be taken from {@link MementoCopyMode#LOCAL} current state
+ * or {@link MementoCopyMode#REMOTE} persisted state, or the default {@link MementoCopyMode#AUTO} detected
+ */
+ public static void writeMemento(ManagementContext mgmt, PersistenceObjectStore targetStore, MementoCopyMode source) {
+ if (source==null || source==MementoCopyMode.AUTO)
+ source = (mgmt.getHighAvailabilityManager().getNodeState()==ManagementNodeState.MASTER ? MementoCopyMode.LOCAL : MementoCopyMode.REMOTE);
+
+ Stopwatch timer = Stopwatch.createStarted();
+
+ BrooklynMementoRawData dataRecord = newStateMemento(mgmt, source);
+ ManagementPlaneSyncRecord mgmtRecord = newManagerMemento(mgmt, source);
+
+ writeMemento(mgmt, dataRecord, targetStore);
+ writeManagerMemento(mgmt, mgmtRecord, targetStore);
+
+ log.debug("Wrote full memento to "+targetStore+" in "+Time.makeTimeStringRounded(Duration.of(timer)));
+ }
+
+ public static enum CreateBackupMode { PROMOTION, DEMOTION, CUSTOM;
+ @Override public String toString() { return super.toString().toLowerCase(); }
+ }
+
+ public static void createBackup(ManagementContext managementContext, CreateBackupMode mode, MementoCopyMode source) {
+ if (source==null || source==MementoCopyMode.AUTO) {
+ switch (mode) {
+ case PROMOTION: source = MementoCopyMode.REMOTE; break;
+ case DEMOTION: source = MementoCopyMode.LOCAL; break;
+ default:
+ throw new IllegalArgumentException("Cannot detect copy mode for "+mode+"/"+source);
+ }
+ }
+ BrooklynMementoRawData memento = null;
+ ManagementPlaneSyncRecord planeState = null;
+
+ try {
+ log.debug("Loading persisted state on "+mode+" for backup purposes");
+ memento = newStateMemento(managementContext, source);
+ try {
+ planeState = newManagerMemento(managementContext, source);
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ log.warn("Unable to access management plane sync state on "+mode+" (ignoring): "+e, e);
+ }
+
+ PersistenceObjectStore destinationObjectStore = null;
+ String backupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_LOCATION_SPEC);
+ String nonBackupSpec = managementContext.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_LOCATION_SPEC);
+ try {
+ String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext)
+ .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString());
+ destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer);
+ log.debug("Backing up persisted state on "+mode+", to "+destinationObjectStore.getSummaryName());
+ BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore);
+ BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore);
+ if (!memento.isEmpty()) {
+ log.info("Back-up of persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName());
+ } else {
+ log.debug("Back-up of (empty) persisted state created on "+mode+", in "+destinationObjectStore.getSummaryName());
+ }
+
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ PersistenceObjectStore failedStore = destinationObjectStore;
+ if (!Strings.isBlank(backupSpec) && !"localhost".equals(backupSpec)) {
+ String failedSpec = backupSpec;
+ backupSpec = "localhost";
+ String backupContainer = BrooklynServerPaths.newBackupPersistencePathResolver(managementContext)
+ .location(backupSpec).nonBackupLocation(nonBackupSpec).resolveWithSubpathFor(managementContext, mode.toString());
+ destinationObjectStore = BrooklynPersistenceUtils.newPersistenceObjectStore(managementContext, backupSpec, backupContainer);
+ log.warn("Persisted state back-up to "+(failedStore!=null ? failedStore.getSummaryName() : failedSpec)
+ +" failed with "+e, e);
+
+ log.debug("Backing up persisted state on "+mode+", locally because remote failed, to "+destinationObjectStore.getSummaryName());
+ BrooklynPersistenceUtils.writeMemento(managementContext, memento, destinationObjectStore);
+ BrooklynPersistenceUtils.writeManagerMemento(managementContext, planeState, destinationObjectStore);
+ log.info("Back-up of persisted state created on "+mode+", locally because remote failed, in "+destinationObjectStore.getSummaryName());
+ }
+ }
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ log.warn("Unable to backup management plane sync state on "+mode+" (ignoring): "+e, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java
new file mode 100644
index 0000000..ce6731b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/CatalogItemLibrariesConverter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.brooklyn.api.catalog.CatalogItem.CatalogBundle;
+import org.apache.brooklyn.api.catalog.CatalogItem.CatalogItemLibraries;
+import org.apache.brooklyn.core.catalog.internal.CatalogBundleDto;
+
+import com.thoughtworks.xstream.converters.Converter;
+import com.thoughtworks.xstream.converters.MarshallingContext;
+import com.thoughtworks.xstream.converters.UnmarshallingContext;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+
+/**
+ * Convert old-style rebind file formats to the latest version.
+ * The code is needed only during transition to the new version, can be removed after a while.
+ */
+@Deprecated
+public class CatalogItemLibrariesConverter implements Converter {
+
+ @Override
+ public boolean canConvert(@SuppressWarnings("rawtypes") Class type) {
+ return CatalogItemLibraries.class.isAssignableFrom(type) ||
+ Collection.class.isAssignableFrom(type);
+ }
+
+ @Override
+ public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) {
+ context.convertAnother(source);
+ }
+
+ @Override
+ public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ Object obj = context.convertAnother(context.currentObject(), context.getRequiredType());
+ if (CatalogItemLibraries.class.isAssignableFrom(context.getRequiredType())) {
+ CatalogItemLibraries libs = (CatalogItemLibraries)obj;
+ Collection<String> bundles = libs.getBundles();
+ Collection<CatalogBundle> libraries = new ArrayList<CatalogBundle>(bundles.size());
+ for (String url : bundles) {
+ libraries.add(new CatalogBundleDto(null, null, url));
+ }
+ return libraries;
+ } else {
+ return obj;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java
new file mode 100644
index 0000000..cd54053
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedObjectStore.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.String.format;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
+import org.apache.brooklyn.core.server.BrooklynServerConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.core.internal.ssh.process.ProcessTool;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.FatalConfigurationRuntimeException;
+import org.apache.brooklyn.util.io.FileUtil;
+import org.apache.brooklyn.util.os.Os;
+import org.apache.brooklyn.util.os.Os.DeletionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * @author Andrea Turli
+ */
+public class FileBasedObjectStore implements PersistenceObjectStore {
+
+ private static final Logger log = LoggerFactory.getLogger(FileBasedObjectStore.class);
+
+ private static final int SHUTDOWN_TIMEOUT_MS = 10*1000;
+
+ private final File basedir;
+ private final ListeningExecutorService executor;
+ private ManagementContext mgmt;
+ private boolean prepared = false;
+ private boolean deferredBackupNeeded = false;
+ private AtomicBoolean doneFirstContentiousWrite = new AtomicBoolean(false);
+
+ /**
+ * @param basedir
+ */
+ public FileBasedObjectStore(File basedir) {
+ this.basedir = checkPersistenceDirPlausible(basedir);
+ this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ log.debug("File-based objectStore will use directory {}", basedir);
+ // don't check accessible yet, we do that when we prepare
+ }
+
+ @Override
+ public String getSummaryName() {
+ return getBaseDir().getAbsolutePath();
+ }
+
+ public File getBaseDir() {
+ return basedir;
+ }
+
+ public void prepareForMasterUse() {
+ if (doneFirstContentiousWrite.get())
+ return;
+ synchronized (this) {
+ if (doneFirstContentiousWrite.get())
+ return;
+ try {
+ if (deferredBackupNeeded) {
+ // defer backup and path creation until first write
+ // this way if node is standby or auto, the backup is not created superfluously
+
+ File backup = backupDirByCopying(basedir);
+ log.info("Persistence deferred backup, directory "+basedir+" backed up to "+backup.getAbsolutePath());
+
+ deferredBackupNeeded = false;
+ }
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ doneFirstContentiousWrite.getAndSet(true);
+ }
+ }
+
+ @Override
+ public void createSubPath(String subPath) {
+ if (!prepared) throw new IllegalStateException("Not yet prepared: "+this);
+
+ File dir = new File(getBaseDir(), subPath);
+ if (dir.mkdir()) {
+ try {
+ FileUtil.setFilePermissionsTo700(dir);
+ } catch (IOException e) {
+ log.warn("Unable to set sub-directory permissions to 700 (continuing): "+dir);
+ }
+ } else {
+ if (!dir.exists())
+ throw new IllegalStateException("Cannot create "+dir+"; call returned false");
+ }
+ checkPersistenceDirAccessible(dir);
+ }
+
+ @Override
+ public StoreObjectAccessor newAccessor(String path) {
+ if (!prepared) throw new IllegalStateException("Not yet prepared: "+this);
+
+ String tmpExt = ".tmp";
+ if (mgmt!=null && mgmt.getManagementNodeId()!=null) tmpExt = "."+mgmt.getManagementNodeId()+tmpExt;
+ return new FileBasedStoreObjectAccessor(new File(Os.mergePaths(getBaseDir().getAbsolutePath(), path)), tmpExt);
+ }
+
+ @Override
+ public List<String> listContentsWithSubPath(final String parentSubPath) {
+ if (!prepared) throw new IllegalStateException("Not yet prepared: "+this);
+
+ Preconditions.checkNotNull(parentSubPath);
+ File subPathDir = new File(basedir, parentSubPath);
+
+ FileFilter fileFilter = new FileFilter() {
+ @Override public boolean accept(File file) {
+ // An inclusion filter would be safer than exclusion
+ return !file.getName().endsWith(".tmp") && !file.getName().endsWith(".swp");
+ }
+ };
+ File[] subPathDirFiles = subPathDir.listFiles(fileFilter);
+ if (subPathDirFiles==null) return ImmutableList.<String>of();
+ return FluentIterable.from(Arrays.asList(subPathDirFiles))
+ .transform(new Function<File, String>() {
+ @Nullable
+ @Override
+ public String apply(@Nullable File input) {
+ return format("%s/%s", parentSubPath, input.getName());
+ }
+ }).toList();
+ }
+
+ @Override
+ public void close() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(SHUTDOWN_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("basedir", basedir).toString();
+ }
+
+ @Override
+ public void injectManagementContext(ManagementContext mgmt) {
+ if (this.mgmt!=null && !this.mgmt.equals(mgmt))
+ throw new IllegalStateException("Cannot change mgmt context of "+this);
+ this.mgmt = mgmt;
+ }
+
+ @Override
+ public void prepareForSharedUse(@Nullable PersistMode persistMode, HighAvailabilityMode haMode) {
+ if (mgmt==null) throw new NullPointerException("Must inject ManagementContext before preparing "+this);
+
+ if (persistMode==null || persistMode==PersistMode.DISABLED) {
+ // TODO is this check needed? shouldn't come here now without persistence on.
+ prepared = true;
+ return;
+ }
+
+ @SuppressWarnings("deprecation")
+ Boolean backups = mgmt.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED);
+ if (Boolean.TRUE.equals(backups)) {
+ log.warn("Using legacy backup for "+this+"; functionality will be removed in future versions, in favor of promotion/demotion-specific backups to a configurable backup location.");
+ }
+ // default backups behaviour here changed to false, Nov 2014, because these backups are now legacy;
+ // we prefer the made when persistence is enabled, using routines in BrooklynPersistenceUtils
+ if (backups==null) backups = false;
+
+ File dir = getBaseDir();
+ try {
+ String persistencePath = dir.getAbsolutePath();
+
+ switch (persistMode) {
+ case CLEAN:
+ if (dir.exists()) {
+ checkPersistenceDirAccessible(dir);
+ try {
+ if (backups) {
+ File old = backupDirByMoving(dir);
+ log.info("Persistence mode CLEAN, directory "+persistencePath+" backed up to "+old.getAbsolutePath());
+ } else {
+ deleteCompletely();
+ log.info("Persistence mode CLEAN, directory "+persistencePath+" deleted");
+ }
+ } catch (IOException e) {
+ throw new FatalConfigurationRuntimeException("Error using existing persistence directory "+dir.getAbsolutePath(), e);
+ }
+ } else {
+ log.debug("Persistence mode CLEAN, directory "+persistencePath+", no previous state");
+ }
+ break;
+ case REBIND:
+ checkPersistenceDirAccessible(dir);
+ checkPersistenceDirNonEmpty(dir);
+ try {
+ if (backups) {
+ if (haMode==HighAvailabilityMode.MASTER) {
+ File backup = backupDirByCopying(dir);
+ log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath());
+ } else {
+ deferredBackupNeeded = true;
+ }
+ }
+ } catch (IOException e) {
+ throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
+ }
+ break;
+ case AUTO:
+ if (dir.exists()) {
+ checkPersistenceDirAccessible(dir);
+ }
+ if (dir.exists() && !isMementoDirExistButEmpty(dir)) {
+ try {
+ if (backups) {
+ if (haMode==HighAvailabilityMode.MASTER) {
+ File backup = backupDirByCopying(dir);
+ log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath());
+ } else {
+ deferredBackupNeeded = true;
+ }
+ }
+ } catch (IOException e) {
+ throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
+ }
+ } else {
+ log.debug("Persistence mode AUTO, directory "+persistencePath+", no previous state");
+ }
+ break;
+ default:
+ throw new FatalConfigurationRuntimeException("Unexpected persist mode "+persistMode+"; modified during initialization?!");
+ };
+
+ if (!dir.exists()) {
+ boolean success = dir.mkdirs();
+ if (success) {
+ FileUtil.setFilePermissionsTo700(dir);
+ } else {
+ throw new FatalConfigurationRuntimeException("Failed to create persistence directory "+dir);
+ }
+ }
+
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+
+ prepared = true;
+ }
+
+ protected File checkPersistenceDirPlausible(File dir) {
+ checkNotNull(dir, "directory");
+ if (!dir.exists()) return dir;
+ if (dir.isFile()) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": must not be a file");
+ if (!(dir.canRead() && dir.canWrite())) throw new FatalConfigurationRuntimeException("Invalid persistence directory" + dir + ": " +
+ (!dir.canRead() ? "not readable" :
+ (!dir.canWrite() ? "not writable" : "unknown reason")));
+ return dir;
+ }
+
+ protected void checkPersistenceDirAccessible(File dir) {
+ if (!(dir.exists() && dir.isDirectory() && dir.canRead() && dir.canWrite())) {
+ FatalConfigurationRuntimeException problem = new FatalConfigurationRuntimeException("Invalid persistence directory " + dir + ": " +
+ (!dir.exists() ? "does not exist" :
+ (!dir.isDirectory() ? "not a directory" :
+ (!dir.canRead() ? "not readable" :
+ (!dir.canWrite() ? "not writable" : "unknown reason")))));
+ log.debug("Invalid persistence directory "+dir+" (rethrowing): "+problem, problem);
+ } else {
+ log.debug("Created dir {} for {}", dir, this);
+ }
+ }
+
+ protected void checkPersistenceDirNonEmpty(File persistenceDir) {
+ FatalConfigurationRuntimeException problem;
+ if (!persistenceDir.exists()) {
+ problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory does not exist");
+ log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem);
+ throw problem;
+ } if (isMementoDirExistButEmpty(persistenceDir)) {
+ problem = new FatalConfigurationRuntimeException("Invalid persistence directory "+persistenceDir+" because directory is empty");
+ log.debug("Invalid persistence directory "+persistenceDir+" (rethrowing): "+problem, problem);
+ throw problem;
+ }
+ }
+
+ protected File backupDirByCopying(File dir) throws IOException, InterruptedException {
+ File parentDir = dir.getParentFile();
+ String simpleName = dir.getName();
+ String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date());
+ File backupDir = new File(parentDir, simpleName+"."+timestamp+".bak");
+
+ FileUtil.copyDir(dir, backupDir);
+ FileUtil.setFilePermissionsTo700(backupDir);
+
+ return backupDir;
+ }
+
+ protected File backupDirByMoving(File dir) throws InterruptedException, IOException {
+ File parentDir = dir.getParentFile();
+ String simpleName = dir.getName();
+ String timestamp = new SimpleDateFormat("yyyyMMdd-hhmmssSSS").format(new Date());
+ File newDir = new File(parentDir, simpleName+"."+timestamp+".bak");
+
+ FileUtil.moveDir(dir, newDir);
+ return newDir;
+ }
+
+ private static boolean WARNED_ON_NON_ATOMIC_FILE_UPDATES = false;
+ /**
+ * Attempts an fs level atomic move then fall back to pure java rename.
+ * Assumes files are on same mount point.
+ * <p>
+ * TODO Java 7 gives an atomic Files.move() which would be preferred.
+ */
+ static void moveFile(File srcFile, File destFile) throws IOException, InterruptedException {
+ // Try rename first - it is a *much* cheaper call than invoking a system call in Java.
+ // However, rename is not guaranteed cross platform to succeed if the destination exists,
+ // and not guaranteed to be atomic, but it usually seems to do the right thing...
+ boolean result;
+ result = srcFile.renameTo(destFile);
+ if (result) {
+ if (log.isTraceEnabled()) log.trace("java rename of {} to {} completed", srcFile, destFile);
+ return;
+ }
+
+ if (!Os.isMicrosoftWindows()) {
+ // this command, if it succeeds, is guaranteed to be atomic, and it will usually overwrite
+ String cmd = "mv '"+srcFile.getAbsolutePath()+"' '"+destFile.getAbsolutePath()+"'";
+
+ int exitStatus = new ProcessTool().execCommands(MutableMap.<String,String>of(), MutableList.of(cmd), null);
+ // prefer the above to the below because it wraps it in the appropriate bash
+// Process proc = Runtime.getRuntime().exec(cmd);
+// result = proc.waitFor();
+
+ if (log.isTraceEnabled()) log.trace("FS move of {} to {} completed, code {}", new Object[] { srcFile, destFile, exitStatus });
+ if (exitStatus == 0) return;
+ }
+
+ // finally try a delete - but explicitly warn this is not going to be atomic
+ // so if another node reads it might see no master
+ if (!WARNED_ON_NON_ATOMIC_FILE_UPDATES) {
+ WARNED_ON_NON_ATOMIC_FILE_UPDATES = true;
+ log.warn("Unable to perform atomic file update ("+srcFile+" to "+destFile+"); file system not recommended for production HA/DR");
+ }
+ destFile.delete();
+ result = srcFile.renameTo(destFile);
+ if (log.isTraceEnabled()) log.trace("java delete and rename of {} to {} completed, code {}", new Object[] { srcFile, destFile, result });
+ if (result)
+ return;
+ Files.copy(srcFile, destFile);
+ srcFile.delete();
+ throw new IOException("Could not move "+destFile+" to "+srcFile);
+ }
+
+ /**
+ * True if directory exists, but is entirely empty, or only contains empty directories.
+ */
+ static boolean isMementoDirExistButEmpty(String dir) {
+ return isMementoDirExistButEmpty(new File(dir));
+ }
+
+ static boolean isMementoDirExistButEmpty(File dir) {
+ if (!dir.exists()) return false;
+ File[] contents = dir.listFiles();
+ if (contents == null) return false;
+
+ for (File sub : contents) {
+ if (sub.isFile()) return false;
+ if (sub.isDirectory() && sub.listFiles().length > 0) return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void deleteCompletely() {
+ deleteCompletely(getBaseDir());
+ }
+
+ public static void deleteCompletely(File d) {
+ DeletionResult result = Os.deleteRecursively(d);
+ if (!result.wasSuccessful())
+ log.warn("Unable to delete persistence dir "+d);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java
new file mode 100644
index 0000000..41aefc0
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/FileBasedStoreObjectAccessor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Date;
+
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.io.FileUtil;
+import org.apache.brooklyn.util.text.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.io.Files;
+
+/**
+ * Reads/writes to a file. This impl does it immediately, with no synchronisation.
+ * Callers should wrap in {@link StoreObjectAccessorLocking} if multiple threads may be accessing this.
+ *
+ * @author aled
+ */
+public class FileBasedStoreObjectAccessor implements PersistenceObjectStore.StoreObjectAccessor {
+ private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreObjectAccessor.class);
+
+ public FileBasedStoreObjectAccessor(File file, String tmpExtension) {
+ this.file = file;
+ this.tmpFile = new File(file.getParentFile(), file.getName()+(Strings.isBlank(tmpExtension) ? ".tmp" : tmpExtension));
+ }
+
+ private final File file;
+ private final File tmpFile;
+
+ @Override
+ public String get() {
+ try {
+ if (!exists()) return null;
+ return Files.asCharSource(file, Charsets.UTF_8).read();
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public byte[] getBytes() {
+ try {
+ if (!exists()) return null;
+ return Files.asByteSource(file).read();
+ } catch (IOException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public boolean exists() {
+ return file.exists();
+ }
+
+ // Setting permissions to 600 reduces objectAccessor.put performance from about 5000 per second to 3000 per second
+ // in java 6. With Java 7's Files.setPosixFilePermissions, this might well improve.
+ @Override
+ public void put(String val) {
+ try {
+ if (val==null) val = "";
+ FileUtil.setFilePermissionsTo600(tmpFile);
+ Files.write(val, tmpFile, Charsets.UTF_8);
+ FileBasedObjectStore.moveFile(tmpFile, file);
+ } catch (IOException e) {
+ throw Exceptions.propagate(e);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void append(String val) {
+ try {
+ if (val==null) val = "";
+ FileUtil.setFilePermissionsTo600(file);
+ Files.append(val, file, Charsets.UTF_8);
+
+ } catch (IOException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void delete() {
+ if (!file.delete()) {
+ if (!file.exists()) {
+ LOG.debug("Unable to delete " + file.getAbsolutePath() + ". Probably did not exist.");
+ } else {
+ LOG.warn("Unable to delete " + file.getAbsolutePath() + ". Probably still locked.");
+ }
+ }
+ if (tmpFile.exists() && !tmpFile.delete()) {
+ // tmpFile is probably already deleted, so don't even log debug if it does not exist
+ LOG.warn("Unable to delete " + tmpFile.getAbsolutePath() + ". Probably still locked.");
+ }
+ }
+
+ @Override
+ public Date getLastModifiedDate() {
+ long result = file.lastModified();
+ if (result==0) return null;
+ return new Date(result);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("file", file).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java
new file mode 100644
index 0000000..9a5c693
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/LocationWithObjectStore.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+/** Marker interface for locations which can create a {@link PersistenceObjectStore} */
+public interface LocationWithObjectStore {
+
+ /** Creates a {@link PersistenceObjectStore} pointed at the given container/directory. */
+ public PersistenceObjectStore newPersistenceObjectStore(String container);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java
new file mode 100644
index 0000000..1e924ec
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/MementoSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import org.apache.brooklyn.api.mgmt.ha.ManagementNodeSyncRecord;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext;
+
+/** Serializes the given object; it is often used with {@link BrooklynMemento} for persisting and restoring,
+ * though it can be used for any object (and is also used for the {@link ManagementNodeSyncRecord} instances) */
+public interface MementoSerializer<T> {
+
+ public static final MementoSerializer<String> NOOP = new MementoSerializer<String>() {
+ @Override
+ public String toString(String memento) {
+ return memento;
+ }
+ @Override
+ public String fromString(String string) {
+ return string;
+ }
+ @Override
+ public void setLookupContext(LookupContext lookupContext) {
+ // no-op
+ }
+ @Override
+ public void unsetLookupContext() {
+ // no-op
+ }
+ };
+
+ String toString(T memento);
+ T fromString(String string);
+ void setLookupContext(LookupContext lookupContext);
+ void unsetLookupContext();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java
new file mode 100644
index 0000000..fac764b
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+public enum PersistMode {
+ DISABLED,
+ AUTO,
+ REBIND,
+ CLEAN;
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java
new file mode 100644
index 0000000..14943bb
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceActivityMetrics.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+
+public class PersistenceActivityMetrics {
+
+ final static int MAX_ERRORS = 200;
+
+ long count=0, failureCount=0;
+ Long lastSuccessTime, lastDuration, lastFailureTime;
+ List<Map<String,Object>> errorMessages = MutableList.of();
+
+ public void noteSuccess(Duration duration) {
+ count++;
+ lastSuccessTime = System.currentTimeMillis();
+ lastDuration = duration.toMilliseconds();
+ }
+
+ public void noteFailure(Duration duration) {
+ count++;
+ failureCount++;
+ lastFailureTime = System.currentTimeMillis();
+ lastDuration = duration!=null ? duration.toMilliseconds() : -1;
+ }
+
+ public void noteError(String error) {
+ noteErrorObject(error);
+ }
+
+ public void noteError(List<?> error) {
+ noteErrorObject(error);
+ }
+
+ /** error should be json-serializable; exceptions can be problematic */
+ protected synchronized void noteErrorObject(Object error) {
+ errorMessages.add(0, MutableMap.<String,Object>of("error", error, "timestamp", System.currentTimeMillis()));
+ while (errorMessages.size() > MAX_ERRORS) {
+ errorMessages.remove(errorMessages.size()-1);
+ }
+ }
+
+ public synchronized Map<String,Object> asMap() {
+ Map<String,Object> result = MutableMap.of();
+ result.put("count", count);
+ result.put("lastSuccessTimeUtc", lastSuccessTime);
+ result.put("lastSuccessTimeMillisSince", since(lastSuccessTime));
+ result.put("lastDuration", lastDuration);
+ result.put("failureCount", failureCount);
+ result.put("lastFailureTimeUtc", lastFailureTime);
+ result.put("lastFailureTimeMillisSince", since(lastFailureTime));
+ result.put("errorMessages", MutableList.copyOf(errorMessages));
+ return result;
+ }
+
+ private Long since(Long time) {
+ if (time==null) return null;
+ return System.currentTimeMillis() - time;
+ }
+
+}
\ No newline at end of file