You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/19 13:10:23 UTC
[65/72] [abbrv] incubator-brooklyn git commit: BROOKLYN-162 - jclouds
last few package prefixes needed,
and tidy in core and elsewhere related (or observed in the process)
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
new file mode 100644
index 0000000..aa83e14
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/PersistenceObjectStore.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.ha.HighAvailabilityMode;
+import org.apache.brooklyn.util.time.Duration;
+
+import com.google.common.annotations.Beta;
+
+/**
+ * Interface for working with persistence targets, including file system and jclouds object stores.
+ * @author Andrea Turli
+ */
+public interface PersistenceObjectStore {
+
+ /** accessor to an object/item in a {@link PersistenceObjectStore} */
+ public interface StoreObjectAccessor {
+ /** gets the object, or null if not found */
+ String get();
+ byte[] getBytes();
+ boolean exists();
+ void put(String contentsToReplaceOrCreate);
+ void append(String contentsToAppendOrCreate);
+ void delete();
+ // NB: creation date is available for many blobstores but
+ // not on java.io.File and filesystems, so it is not included here
+ /** last modified date, null if not supported or does not exist */
+ Date getLastModifiedDate();
+ }
+ public interface StoreObjectAccessorWithLock extends StoreObjectAccessor {
+ /** waits for all currently scheduled write lock operations (puts, appends, and deletes) to complete;
+ * but does not wait on or prevent subsequent modifications.
+ * this is suitable for a model where the caller is managing synchronization.
+ * <p>
+ * for more complex uses, readers should <code>getLockObject().readLock().lockInterruptibly()</code>
+ * and ensure they subsequently <code>unlock()</code> it of course. see {@link #getLockObject()}. */
+ void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException;
+
+ /** returns the underlying lock in case callers need more complex synchronization control */
+ ReadWriteLock getLockObject();
+ }
+
+ /** human-readable name of this object store */
+ public String getSummaryName();
+
+ /**
+ * Allows a way for an object store to be created ahead of time, and a mgmt context injected.
+ * Currently subsequent changes are not permitted.
+ * <p>
+ * A {@link ManagementContext} must be supplied via constructor or this method before invoking other methods.
+ */
+ @Beta
+ public void injectManagementContext(ManagementContext managementContext);
+
+ /**
+ * Prepares the persistence store for read use and non-contentious write use,
+ * in particular detecting whether we should clean or register a need for backup etc.
+ * Typically called early in the setup lifecycle, after {@link #injectManagementContext(ManagementContext)},
+ * but before {@link #prepareForMasterUse()}.
+ * <p>
+ * See {@link #prepareForMasterUse()} for discussion of "contentious writes".
+ */
+ @Beta
+ public void prepareForSharedUse(PersistMode persistMode, HighAvailabilityMode highAvailabilityMode);
+
+ /**
+ * Prepares the persistence store for "contentious writes".
+ * These are defined as those writes which might overwrite important information.
+ * Implementations usually perform backup/versioning of the store if required.
+ * <p>
+ * Caller must call {@link #prepareForSharedUse(PersistMode, HighAvailabilityMode)} first
+ * (and {@link #injectManagementContext(ManagementContext)} before that).
+ * <p>
+ * This is typically invoked "at the last moment" e.g. before the any such write,
+ * mainly in order to prevent backups being made unnecessarily (e.g. if a node is standby,
+ * or if it tries to become master but is not capable),
+ * but also to prevent simultaneous backups which can cause problems with some stores
+ * (only a mgmt who knows he is the master should invoke this).
+ **/
+ @Beta
+ public void prepareForMasterUse();
+
+ /**
+ * For reading/writing data to the item at the given path.
+ * Note that the accessor is not generally thread safe, usually does not support blocking,
+ * and multiple instances may conflict with each other.
+ * <p>
+ * Clients should wrap in a dedicated {@link StoreObjectAccessorLocking} and share
+ * if multiple threads may be accessing the store.
+ * This method may be changed in future to allow access to a shared locking accessor.
+ */
+ @Beta
+ // TODO requiring clients to wrap and cache accessors is not very nice API,
+ // better would be to do caching here probably,
+ // but we've already been doing it this way above for now (Jun 2014)
+ StoreObjectAccessor newAccessor(String path);
+
+ /** create the directory at the given subPath relative to the base of this store */
+ void createSubPath(String subPath);
+
+ /**
+ * Lists the paths of objects contained at the given path, including the subpath.
+ * For example, if a file-based ObjectStore is configured to write to file://path/to/root/
+ * then parentSubPath=entities would return the contents of /path/to/root/entities/, such as
+ * [entities/e1, entities/e2, entities/e3].
+ * The returned paths values are usable in calls to {@link #newAccessor(String)}.
+ */
+ List<String> listContentsWithSubPath(String subPath);
+
+ /** Entirely delete the contents of this persistence location.
+ * Use with care, primarily in tests. This will recursively wipe the indicated location. */
+ public void deleteCompletely();
+
+ /**
+ * Closes all resources used by this ObjectStore. No subsequent calls should be made to the ObjectStore;
+ * behaviour of such calls is undefined but likely to throw exceptions.
+ */
+ void close();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java
new file mode 100644
index 0000000..480a2ec
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/RetryingMementoSerializer.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryingMementoSerializer<T> implements MementoSerializer<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryingMementoSerializer.class);
+
+ private final MementoSerializer<T> delegate;
+ private final int maxAttempts;
+
+ public RetryingMementoSerializer(MementoSerializer<T> delegate, int maxAttempts) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.maxAttempts = maxAttempts;
+ if (maxAttempts < 1) throw new IllegalArgumentException("Max attempts must be at least 1, but was "+maxAttempts);
+ }
+
+ @Override
+ public String toString(T memento) {
+ RuntimeException lastException = null;
+ int attempt = 0;
+ do {
+ attempt++;
+ try {
+ String result = delegate.toString(memento);
+ if (attempt>1)
+ LOG.info("Success following previous serialization error");
+ return result;
+ } catch (RuntimeException e) {
+ LOG.warn("Error serializing memento (attempt "+attempt+" of "+maxAttempts+") for "+memento+
+ "; expected sometimes if attribute value modified", e);
+ lastException = e;
+ }
+ } while (attempt < maxAttempts);
+
+ throw lastException;
+ }
+
+ @Override
+ public T fromString(String string) {
+ if (string==null)
+ return null;
+
+ RuntimeException lastException = null;
+ int attempt = 0;
+ do {
+ attempt++;
+ try {
+ T result = delegate.fromString(string);
+ if (attempt>1)
+ LOG.info("Success following previous deserialization error, got: "+result);
+ return result;
+ } catch (RuntimeException e) {
+ // trying multiple times only makes sense for a few errors (namely ConcModExceptions); perhaps deprecate that strategy?
+ LOG.warn("Error deserializing memento (attempt "+attempt+" of "+maxAttempts+"): "+e, e);
+ if (attempt==1) LOG.debug("Memento which was not deserialized is:\n"+string);
+ lastException = e;
+ }
+ } while (attempt < maxAttempts);
+
+ throw lastException;
+ }
+
+ @Override
+ public void setLookupContext(LookupContext lookupContext) {
+ delegate.setLookupContext(lookupContext);
+ }
+
+ @Override
+ public void unsetLookupContext() {
+ delegate.unsetLookupContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
new file mode 100644
index 0000000..302121f
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/StoreObjectAccessorLocking.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import java.util.Comparator;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.brooklyn.core.mgmt.persist.PersistenceObjectStore.StoreObjectAccessor;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.javalang.JavaClassNames;
+import org.apache.brooklyn.util.time.Duration;
+
+/** Wraps access to an object (the delegate {@link StoreObjectAccessor}
+ * in a guarded read-write context such that callers will be blocked if another thread
+ * is accessing the object in an incompatible way (e.g. trying to read when someone is writing).
+ * See {@link ReadWriteLock}.
+ * <p>
+ * This has no visibility or control over other access to the delegate or underlying object, of course.
+ * It can only affect callers coming through this wrapper instance. Thus callers must share instances
+ * of this class for a given item.
+ * <p>
+ * No locking is done with respect to {@link #getLastModifiedDate()}.
+ **/
+public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreObjectAccessorWithLock {
+
+ protected static class ThreadComparator implements Comparator<Thread> {
+ @Override
+ public int compare(Thread o1, Thread o2) {
+ if (o1.getId()<o2.getId()) return -1;
+ if (o1.getId()>o2.getId()) return 1;
+ return 0;
+ }
+ }
+
+ ReadWriteLock lock = new ReentrantReadWriteLock(true);
+ Set<Thread> queuedReaders = new ConcurrentSkipListSet<Thread>(new ThreadComparator());
+ Set<Thread> queuedWriters = new ConcurrentSkipListSet<Thread>(new ThreadComparator());
+
+ final PersistenceObjectStore.StoreObjectAccessor delegate;
+
+ public StoreObjectAccessorLocking(PersistenceObjectStore.StoreObjectAccessor delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String get() {
+ try {
+ queuedReaders.add(Thread.currentThread());
+ lock.readLock().lockInterruptibly();
+ try {
+ return delegate.get();
+
+ } finally {
+ lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ queuedReaders.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ public byte[] getBytes() {
+ try {
+ queuedReaders.add(Thread.currentThread());
+ lock.readLock().lockInterruptibly();
+ try {
+ return delegate.getBytes();
+
+ } finally {
+ lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ queuedReaders.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ queuedReaders.add(Thread.currentThread());
+ lock.readLock().lockInterruptibly();
+ try {
+ return delegate.exists();
+
+ } finally {
+ lock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ queuedReaders.remove(Thread.currentThread());
+ }
+ }
+
+ protected boolean hasScheduledPutOrDeleteWithNoRead() {
+ // skip write if there is another write queued and no reader waiting
+ return (!queuedWriters.isEmpty() && queuedReaders.isEmpty());
+ }
+
+ @Override
+ public void put(String val) {
+ try {
+ queuedWriters.add(Thread.currentThread());
+ lock.writeLock().lockInterruptibly();
+ try {
+ queuedWriters.remove(Thread.currentThread());
+ if (hasScheduledPutOrDeleteWithNoRead())
+ // don't bother writing if someone will write after us and no one is reading
+ return;
+ delegate.put(val);
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ queuedWriters.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ public void append(String val) {
+ try {
+ lock.writeLock().lockInterruptibly();
+ try {
+ if (hasScheduledPutOrDeleteWithNoRead())
+ // don't bother appending if someone will write after us and no one is reading
+ return;
+
+ delegate.append(val);
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void delete() {
+ try {
+ queuedWriters.add(Thread.currentThread());
+ lock.writeLock().lockInterruptibly();
+ try {
+ queuedWriters.remove(Thread.currentThread());
+ if (hasScheduledPutOrDeleteWithNoRead()) {
+ // don't bother deleting if someone will write after us and no one is reading
+ return;
+ }
+ delegate.delete();
+
+ } finally {
+ lock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } finally {
+ queuedWriters.remove(Thread.currentThread());
+ }
+ }
+
+ @Override
+ public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
+ try {
+ boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
+ if (locked) {
+ lock.readLock().unlock();
+ } else {
+ throw new TimeoutException("Timeout waiting for writes of "+delegate+" after "+timeout);
+ }
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public Date getLastModifiedDate() {
+ return delegate.getLastModifiedDate();
+ }
+
+ @Override
+ public ReadWriteLock getLockObject() {
+ return lock;
+ }
+
+ @Override
+ public String toString() {
+ return JavaClassNames.simpleClassName(this)+":"+delegate.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java
new file mode 100644
index 0000000..799ad91
--- /dev/null
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/persist/XmlMementoSerializer.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.core.mgmt.persist;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.util.NoSuchElementException;
+import java.util.Stack;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.CatalogItem;
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.internal.AbstractBrooklynObjectSpec;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
+import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister.LookupContext;
+import org.apache.brooklyn.api.objs.Identifiable;
+import org.apache.brooklyn.api.policy.Policy;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.api.sensor.Feed;
+import org.apache.brooklyn.core.catalog.internal.CatalogBundleDto;
+import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
+import org.apache.brooklyn.core.config.BasicConfigKey;
+import org.apache.brooklyn.core.mgmt.classloading.BrooklynClassLoadingContextSequential;
+import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext;
+import org.apache.brooklyn.core.mgmt.classloading.JavaBrooklynClassLoadingContext;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicCatalogItemMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicEnricherMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicEntityMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicFeedMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicLocationMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.BasicPolicyMemento;
+import org.apache.brooklyn.core.mgmt.rebind.dto.MutableBrooklynMemento;
+import org.apache.brooklyn.effector.core.BasicParameterType;
+import org.apache.brooklyn.effector.core.EffectorAndBody;
+import org.apache.brooklyn.effector.core.EffectorTasks.EffectorBodyTaskFactory;
+import org.apache.brooklyn.effector.core.EffectorTasks.EffectorTaskFactory;
+import org.apache.brooklyn.sensor.core.BasicAttributeSensor;
+import org.apache.brooklyn.util.core.xstream.XmlSerializer;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.text.Strings;
+
+import com.thoughtworks.xstream.converters.Converter;
+import com.thoughtworks.xstream.converters.MarshallingContext;
+import com.thoughtworks.xstream.converters.SingleValueConverter;
+import com.thoughtworks.xstream.converters.UnmarshallingContext;
+import com.thoughtworks.xstream.converters.reflection.ReflectionConverter;
+import com.thoughtworks.xstream.core.ReferencingMarshallingContext;
+import com.thoughtworks.xstream.core.util.HierarchicalStreams;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.path.PathTrackingReader;
+import com.thoughtworks.xstream.mapper.Mapper;
+import com.thoughtworks.xstream.mapper.MapperWrapper;
+
+/* uses xml, cleaned up a bit
+ *
+ * there is an early attempt at doing this with JSON in pull request #344 but
+ * it is not nicely deserializable, see comments at http://xstream.codehaus.org/json-tutorial.html */
+public class XmlMementoSerializer<T> extends XmlSerializer<T> implements MementoSerializer<T> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(XmlMementoSerializer.class);
+
+ private final ClassLoader classLoader;
+ private LookupContext lookupContext;
+
+ public XmlMementoSerializer(ClassLoader classLoader) {
+ this.classLoader = checkNotNull(classLoader, "classLoader");
+ xstream.setClassLoader(this.classLoader);
+
+ // old (deprecated in 070? or earlier) single-file persistence uses this keyword; TODO remove soon in 080 ?
+ xstream.alias("brooklyn", MutableBrooklynMemento.class);
+
+ xstream.alias("entity", BasicEntityMemento.class);
+ xstream.alias("location", BasicLocationMemento.class);
+ xstream.alias("policy", BasicPolicyMemento.class);
+ xstream.alias("feed", BasicFeedMemento.class);
+ xstream.alias("enricher", BasicEnricherMemento.class);
+ xstream.alias("configKey", BasicConfigKey.class);
+ xstream.alias("catalogItem", BasicCatalogItemMemento.class);
+ xstream.alias("bundle", CatalogBundleDto.class);
+ xstream.alias("attributeSensor", BasicAttributeSensor.class);
+
+ xstream.alias("effector", Effector.class);
+ xstream.addDefaultImplementation(EffectorAndBody.class, Effector.class);
+ xstream.alias("parameter", BasicParameterType.class);
+ xstream.addDefaultImplementation(EffectorBodyTaskFactory.class, EffectorTaskFactory.class);
+
+ xstream.alias("entityRef", Entity.class);
+ xstream.alias("locationRef", Location.class);
+ xstream.alias("policyRef", Policy.class);
+ xstream.alias("enricherRef", Enricher.class);
+
+ xstream.registerConverter(new LocationConverter());
+ xstream.registerConverter(new PolicyConverter());
+ xstream.registerConverter(new EnricherConverter());
+ xstream.registerConverter(new EntityConverter());
+ xstream.registerConverter(new FeedConverter());
+ xstream.registerConverter(new CatalogItemConverter());
+ xstream.registerConverter(new SpecConverter());
+
+ xstream.registerConverter(new ManagementContextConverter());
+ xstream.registerConverter(new TaskConverter(xstream.getMapper()));
+
+ //For compatibility with existing persistence stores content.
+ xstream.aliasField("registeredTypeName", BasicCatalogItemMemento.class, "symbolicName");
+ xstream.registerLocalConverter(BasicCatalogItemMemento.class, "libraries", new CatalogItemLibrariesConverter());
+ }
+
+ // Warning: this is called in the super-class constuctor, so before this constructor!
+ @Override
+ protected MapperWrapper wrapMapper(MapperWrapper next) {
+ MapperWrapper mapper = super.wrapMapper(next);
+ mapper = new CustomMapper(mapper, Entity.class, "entityProxy");
+ mapper = new CustomMapper(mapper, Location.class, "locationProxy");
+ return mapper;
+ }
+
+ @Override
+ public void serialize(Object object, Writer writer) {
+ super.serialize(object, writer);
+ try {
+ writer.append("\n");
+ } catch (IOException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ @Override
+ public void setLookupContext(LookupContext lookupContext) {
+ this.lookupContext = checkNotNull(lookupContext, "lookupContext");
+ }
+
+ @Override
+ public void unsetLookupContext() {
+ this.lookupContext = null;
+ }
+
+ /**
+ * For changing the tag used for anything that implements/extends the given type.
+ * Necessary for using EntityRef rather than the default "dynamic-proxy" tag.
+ *
+ * @author aled
+ */
+ public class CustomMapper extends MapperWrapper {
+ private final Class<?> clazz;
+ private final String alias;
+
+ public CustomMapper(Mapper wrapped, Class<?> clazz, String alias) {
+ super(wrapped);
+ this.clazz = checkNotNull(clazz, "clazz");
+ this.alias = checkNotNull(alias, "alias");
+ }
+
+ public String getAlias() {
+ return alias;
+ }
+
+ @Override
+ public String serializedClass(@SuppressWarnings("rawtypes") Class type) {
+ if (type != null && clazz.isAssignableFrom(type)) {
+ return alias;
+ } else {
+ return super.serializedClass(type);
+ }
+ }
+
+ @Override
+ public Class<?> realClass(String elementName) {
+ if (elementName.equals(alias)) {
+ return clazz;
+ } else {
+ return super.realClass(elementName);
+ }
+ }
+ }
+
+ public abstract class IdentifiableConverter<IT extends Identifiable> implements SingleValueConverter {
+ private final Class<IT> clazz;
+
+ IdentifiableConverter(Class<IT> clazz) {
+ this.clazz = clazz;
+ }
+ @Override
+ public boolean canConvert(@SuppressWarnings("rawtypes") Class type) {
+ boolean result = clazz.isAssignableFrom(type);
+ return result;
+ }
+
+ @Override
+ public String toString(Object obj) {
+ return obj == null ? null : ((Identifiable)obj).getId();
+ }
+ @Override
+ public Object fromString(String str) {
+ if (lookupContext == null) {
+ LOG.warn("Cannot unmarshal from persisted xml {} {}; no lookup context supplied!", clazz.getSimpleName(), str);
+ return null;
+ } else {
+ return lookup(str);
+ }
+ }
+
+ protected abstract IT lookup(String id);
+ }
+
+ public class LocationConverter extends IdentifiableConverter<Location> {
+ LocationConverter() {
+ super(Location.class);
+ }
+ @Override
+ protected Location lookup(String id) {
+ return lookupContext.lookupLocation(id);
+ }
+ }
+
+ public class PolicyConverter extends IdentifiableConverter<Policy> {
+ PolicyConverter() {
+ super(Policy.class);
+ }
+ @Override
+ protected Policy lookup(String id) {
+ return lookupContext.lookupPolicy(id);
+ }
+ }
+
+ public class EnricherConverter extends IdentifiableConverter<Enricher> {
+ EnricherConverter() {
+ super(Enricher.class);
+ }
+ @Override
+ protected Enricher lookup(String id) {
+ return lookupContext.lookupEnricher(id);
+ }
+ }
+
+ public class FeedConverter extends IdentifiableConverter<Feed> {
+ FeedConverter() {
+ super(Feed.class);
+ }
+ @Override
+ protected Feed lookup(String id) {
+ return lookupContext.lookupFeed(id);
+ }
+ }
+
+ public class EntityConverter extends IdentifiableConverter<Entity> {
+ EntityConverter() {
+ super(Entity.class);
+ }
+ @Override
+ protected Entity lookup(String id) {
+ return lookupContext.lookupEntity(id);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public class CatalogItemConverter extends IdentifiableConverter<CatalogItem> {
+ CatalogItemConverter() {
+ super(CatalogItem.class);
+ }
+ @Override
+ protected CatalogItem<?,?> lookup(String id) {
+ return lookupContext.lookupCatalogItem(id);
+ }
+ }
+
+
+ static boolean loggedTaskWarning = false;
+ public class TaskConverter implements Converter {
+ private final Mapper mapper;
+
+ TaskConverter(Mapper mapper) {
+ this.mapper = mapper;
+ }
+ @Override
+ public boolean canConvert(@SuppressWarnings("rawtypes") Class type) {
+ return Task.class.isAssignableFrom(type);
+ }
+ @SuppressWarnings("deprecation")
+ @Override
+ public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) {
+ if (source == null) return;
+ if (((Task<?>)source).isDone() && !((Task<?>)source).isError()) {
+ try {
+ context.convertAnother(((Task<?>)source).get());
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ } catch (ExecutionException e) {
+ LOG.warn("Unexpected exception getting done (and non-error) task result for "+source+"; continuing: "+e, e);
+ }
+ } else {
+ // TODO How to log sensibly, without it logging this every second?!
+ // jun 2014, have added a "log once" which is not ideal but better than the log never behaviour
+ if (!loggedTaskWarning) {
+ LOG.warn("Intercepting and skipping request to serialize a Task"
+ + (context instanceof ReferencingMarshallingContext ? " at "+((ReferencingMarshallingContext)context).currentPath() : "")+
+ " (only logging this once): "+source);
+ loggedTaskWarning = true;
+ }
+
+ return;
+ }
+ }
+ @Override
+ public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ if (reader.hasMoreChildren()) {
+ Class<?> type = HierarchicalStreams.readClassType(reader, mapper);
+ reader.moveDown();
+ Object result = context.convertAnother(null, type);
+ reader.moveUp();
+ return result;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ public class ManagementContextConverter implements Converter {
+ @Override
+ public boolean canConvert(@SuppressWarnings("rawtypes") Class type) {
+ return ManagementContext.class.isAssignableFrom(type);
+ }
+ @Override
+ public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) {
+ // write nothing, and always insert the current mgmt context
+ }
+ @Override
+ public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ return lookupContext.lookupManagementContext();
+ }
+ }
+
+ /** When reading/writing specs, it checks whether there is a catalog item id set and uses it to load */
+ public class SpecConverter extends ReflectionConverter {
+ SpecConverter() {
+ super(xstream.getMapper(), xstream.getReflectionProvider());
+ }
+ @Override
+ public boolean canConvert(@SuppressWarnings("rawtypes") Class type) {
+ return AbstractBrooklynObjectSpec.class.isAssignableFrom(type);
+ }
+ @Override
+ public void marshal(Object source, HierarchicalStreamWriter writer, MarshallingContext context) {
+ if (source == null) return;
+ AbstractBrooklynObjectSpec<?, ?> spec = (AbstractBrooklynObjectSpec<?, ?>) source;
+ String catalogItemId = spec.getCatalogItemId();
+ if (Strings.isNonBlank(catalogItemId)) {
+ // write this field first, so we can peek at it when we read
+ writer.startNode("catalogItemId");
+ writer.setValue(catalogItemId);
+ writer.endNode();
+
+ // we're going to write the catalogItemId field twice :( but that's okay.
+ // better solution would be to have mark/reset on reader so we can peek for such a field;
+ // see comment below
+ super.marshal(source, writer, context);
+ } else {
+ super.marshal(source, writer, context);
+ }
+ }
+ @Override
+ public Object unmarshal(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ String catalogItemId = null;
+ instantiateNewInstanceSettingCache(reader, context);
+
+ if (reader instanceof PathTrackingReader) {
+ // have to assume this is first; there is no mark/reset support on these readers
+ // (if there were then it would be easier, we could just look for that child anywhere,
+ // and not need a custom writer!)
+ if ("catalogItemId".equals( ((PathTrackingReader)reader).peekNextChild() )) {
+ // cache the instance
+
+ reader.moveDown();
+ catalogItemId = reader.getValue();
+ reader.moveUp();
+ }
+ }
+ boolean customLoaderSet = false;
+ try {
+ if (Strings.isNonBlank(catalogItemId)) {
+ if (lookupContext==null) throw new NullPointerException("lookupContext required to load catalog item "+catalogItemId);
+ CatalogItem<?, ?> cat = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId);
+ if (cat==null) throw new NoSuchElementException("catalog item: "+catalogItemId);
+ BrooklynClassLoadingContext clcNew = CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), cat);
+ pushXstreamCustomClassLoader(clcNew);
+ customLoaderSet = true;
+ }
+
+ AbstractBrooklynObjectSpec<?, ?> result = (AbstractBrooklynObjectSpec<?, ?>) super.unmarshal(reader, context);
+ // we wrote it twice so this shouldn't be necessary; but if we fix it so we only write once, we'd need this
+ result.catalogItemId(catalogItemId);
+ return result;
+ } finally {
+ instance = null;
+ if (customLoaderSet) {
+ popXstreamCustomClassLoader();
+ }
+ }
+ }
+
+ Object instance;
+
+ @Override
+ protected Object instantiateNewInstance(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ // the super calls getAttribute which requires that we have not yet done moveDown,
+ // so we do this earlier and cache it for when we call super.unmarshal
+ if (instance==null)
+ throw new IllegalStateException("Instance should be created and cached");
+ return instance;
+ }
+ protected void instantiateNewInstanceSettingCache(HierarchicalStreamReader reader, UnmarshallingContext context) {
+ instance = super.instantiateNewInstance(reader, context);
+ }
+ }
+
+ Stack<BrooklynClassLoadingContext> contexts = new Stack<BrooklynClassLoadingContext>();
+ Stack<ClassLoader> cls = new Stack<ClassLoader>();
+ AtomicReference<Thread> xstreamLockOwner = new AtomicReference<Thread>();
+ int lockCount;
+
+ /** Must be accompanied by a corresponding {@link #popXstreamCustomClassLoader()} when finished. */
+ @SuppressWarnings("deprecation")
+ protected void pushXstreamCustomClassLoader(BrooklynClassLoadingContext clcNew) {
+ acquireXstreamLock();
+ BrooklynClassLoadingContext oldClc;
+ if (!contexts.isEmpty()) {
+ oldClc = contexts.peek();
+ } else {
+ // TODO XmlMementoSerializer should take a BCLC instead of a CL
+ oldClc = JavaBrooklynClassLoadingContext.create(lookupContext.lookupManagementContext(), xstream.getClassLoader());
+ }
+ BrooklynClassLoadingContextSequential clcMerged = new BrooklynClassLoadingContextSequential(lookupContext.lookupManagementContext(),
+ oldClc, clcNew);
+ contexts.push(clcMerged);
+ cls.push(xstream.getClassLoader());
+ ClassLoader newCL = ClassLoaderFromBrooklynClassLoadingContext.of(clcMerged);
+ xstream.setClassLoader(newCL);
+ }
+
+ protected void popXstreamCustomClassLoader() {
+ synchronized (xstreamLockOwner) {
+ releaseXstreamLock();
+ xstream.setClassLoader(cls.pop());
+ contexts.pop();
+ }
+ }
+
+ protected void acquireXstreamLock() {
+ synchronized (xstreamLockOwner) {
+ while (true) {
+ if (xstreamLockOwner.compareAndSet(null, Thread.currentThread()) ||
+ Thread.currentThread().equals( xstreamLockOwner.get() )) {
+ break;
+ }
+ try {
+ xstreamLockOwner.wait(1000);
+ } catch (InterruptedException e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+ lockCount++;
+ }
+ }
+
+ protected void releaseXstreamLock() {
+ synchronized (xstreamLockOwner) {
+ if (lockCount<=0) {
+ throw new IllegalStateException("xstream not locked");
+ }
+ if (--lockCount == 0) {
+ if (!xstreamLockOwner.compareAndSet(Thread.currentThread(), null)) {
+ Thread oldOwner = xstreamLockOwner.getAndSet(null);
+ throw new IllegalStateException("xstream was locked by "+oldOwner+" but unlock attempt by "+Thread.currentThread());
+ }
+ xstreamLockOwner.notifyAll();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java
index 6b586a8..d6c3160 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ActivePartialRebindIteration.java
@@ -36,8 +36,8 @@ import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.objs.BrooklynObjectType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynMementoPersisterToObjectStore;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.entity.core.EntityInternal;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java
index 74a0497..c3b657d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/BasicLocationRebindSupport.java
@@ -30,7 +30,7 @@ import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.mgmt.rebind.dto.MementosGenerators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.location.basic.AbstractLocation;
+import org.apache.brooklyn.location.core.AbstractLocation;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.FlagUtils;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java
index c69dca4..81785bc 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/ImmediateDeltaChangeListener.java
@@ -37,7 +37,7 @@ import org.apache.brooklyn.api.mgmt.rebind.mementos.PolicyMemento;
import org.apache.brooklyn.api.objs.BrooklynObject;
import org.apache.brooklyn.api.policy.Policy;
import org.apache.brooklyn.api.sensor.Enricher;
-import org.apache.brooklyn.location.basic.LocationInternal;
+import org.apache.brooklyn.location.core.internal.LocationInternal;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java
index 8a36a0d..8b4cc29 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/InitialFullRebindIteration.java
@@ -31,7 +31,7 @@ import org.apache.brooklyn.core.mgmt.internal.BrooklynObjectManagementMode;
import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal;
import org.apache.brooklyn.core.mgmt.internal.LocationManagerInternal;
import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
import org.apache.brooklyn.entity.core.EntityInternal;
import org.apache.brooklyn.util.text.Strings;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
index 7d68438..663061d 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/PeriodicDeltaChangeListener.java
@@ -44,8 +44,8 @@ import org.apache.brooklyn.api.sensor.Enricher;
import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.entity.core.EntityInternal;
import org.apache.brooklyn.util.collections.MutableMap;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
index fc50d30..83cc155 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindIteration.java
@@ -71,8 +71,8 @@ import org.apache.brooklyn.core.mgmt.internal.EntityManagerInternal;
import org.apache.brooklyn.core.mgmt.internal.LocationManagerInternal;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
import org.apache.brooklyn.core.mgmt.internal.ManagementTransitionMode;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
import org.apache.brooklyn.core.mgmt.rebind.RebindManagerImpl.RebindTracker;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics;
import org.apache.brooklyn.core.objs.AbstractBrooklynObject;
import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
import org.apache.brooklyn.core.objs.proxy.InternalEntityFactory;
@@ -82,8 +82,8 @@ import org.apache.brooklyn.core.objs.proxy.InternalPolicyFactory;
import org.apache.brooklyn.entity.core.AbstractApplication;
import org.apache.brooklyn.entity.core.AbstractEntity;
import org.apache.brooklyn.entity.core.EntityInternal;
-import org.apache.brooklyn.location.basic.AbstractLocation;
-import org.apache.brooklyn.location.basic.LocationInternal;
+import org.apache.brooklyn.location.core.AbstractLocation;
+import org.apache.brooklyn.location.core.internal.LocationInternal;
import org.apache.brooklyn.policy.core.AbstractPolicy;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.sensor.feed.AbstractFeed;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
index 3372fef..4c82dbe 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/RebindManagerImpl.java
@@ -48,10 +48,10 @@ import org.apache.brooklyn.core.config.ConfigKeys;
import org.apache.brooklyn.core.internal.BrooklynFeatureEnablement;
import org.apache.brooklyn.core.mgmt.ha.HighAvailabilityManagerImpl;
import org.apache.brooklyn.core.mgmt.internal.ManagementContextInternal;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynMementoPersisterToObjectStore;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceActivityMetrics;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils.CreateBackupMode;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynMementoPersisterToObjectStore;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
+import org.apache.brooklyn.core.mgmt.persist.PersistenceActivityMetrics;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils.CreateBackupMode;
import org.apache.brooklyn.core.mgmt.rebind.transformer.CompoundTransformer;
import org.apache.brooklyn.core.server.BrooklynServerConfig;
import org.apache.brooklyn.entity.core.Entities;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
index c62b064..505c844 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/dto/MementosGenerators.java
@@ -49,13 +49,13 @@ import org.apache.brooklyn.api.sensor.Feed;
import org.apache.brooklyn.api.sensor.AttributeSensor.SensorPersistenceMode;
import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.catalog.internal.CatalogItemDo;
+import org.apache.brooklyn.core.mgmt.persist.BrooklynPersistenceUtils;
import org.apache.brooklyn.core.mgmt.rebind.AbstractBrooklynObjectRebindSupport;
import org.apache.brooklyn.core.mgmt.rebind.TreeUtils;
-import org.apache.brooklyn.core.mgmt.rebind.persister.BrooklynPersistenceUtils;
import org.apache.brooklyn.core.objs.BrooklynTypes;
import org.apache.brooklyn.entity.core.EntityDynamicType;
import org.apache.brooklyn.entity.core.EntityInternal;
-import org.apache.brooklyn.location.basic.LocationInternal;
+import org.apache.brooklyn.location.core.internal.LocationInternal;
import org.apache.brooklyn.policy.core.AbstractPolicy;
import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
import org.apache.brooklyn.sensor.feed.AbstractFeed;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a1ad34d7/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java
deleted file mode 100644
index eb22d2e..0000000
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ /dev/null
@@ -1,697 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.brooklyn.core.mgmt.rebind.persister;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.IOException;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.CatalogItem;
-import org.apache.brooklyn.api.mgmt.rebind.PersistenceExceptionHandler;
-import org.apache.brooklyn.api.mgmt.rebind.RebindExceptionHandler;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMemento;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoManifest;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoPersister;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.BrooklynMementoRawData;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.CatalogItemMemento;
-import org.apache.brooklyn.api.mgmt.rebind.mementos.Memento;
-import org.apache.brooklyn.api.objs.BrooklynObject;
-import org.apache.brooklyn.api.objs.BrooklynObjectType;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.config.StringConfigMap;
-import org.apache.brooklyn.core.catalog.internal.CatalogUtils;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.core.mgmt.classloading.ClassLoaderFromBrooklynClassLoadingContext;
-import org.apache.brooklyn.core.mgmt.rebind.PeriodicDeltaChangeListener;
-import org.apache.brooklyn.core.mgmt.rebind.PersisterDeltaImpl;
-import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoImpl;
-import org.apache.brooklyn.core.mgmt.rebind.dto.BrooklynMementoManifestImpl;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceObjectStore.StoreObjectAccessor;
-import org.apache.brooklyn.core.mgmt.rebind.persister.PersistenceObjectStore.StoreObjectAccessorWithLock;
-import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.core.xstream.XmlUtil;
-import org.apache.brooklyn.util.exceptions.CompoundRuntimeException;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.text.Strings;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-
-import com.google.common.annotations.Beta;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-/** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
- * {@link PersistenceObjectStore} such as a file system or a jclouds object store */
-public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister {
-
- // TODO Crazy amount of duplication between handling entity, location, policy, enricher + feed;
- // Need to remove that duplication.
-
- // TODO Should stop() take a timeout, and shutdown the executor gracefully?
-
- private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class);
-
- public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey(
- "persister.threadpool.maxSize",
- "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)",
- 10);
-
- public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
- "persister.maxSerializationAttempts",
- "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)",
- 5);
-
- private final PersistenceObjectStore objectStore;
- private final MementoSerializer<Object> serializerWithStandardClassLoader;
-
- private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
-
- private final ListeningExecutorService executor;
-
- private volatile boolean writesAllowed = false;
- private volatile boolean writesShuttingDown = false;
- private StringConfigMap brooklynProperties;
-
- private List<Delta> queuedDeltas = new CopyOnWriteArrayList<BrooklynMementoPersister.Delta>();
-
- /**
- * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
- * for any concurrent call to complete.
- */
- private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
-
- public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, StringConfigMap brooklynProperties, ClassLoader classLoader) {
- this.objectStore = checkNotNull(objectStore, "objectStore");
- this.brooklynProperties = brooklynProperties;
-
- int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
- MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
- this.serializerWithStandardClassLoader = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
-
- int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
-
- objectStore.createSubPath("entities");
- objectStore.createSubPath("locations");
- objectStore.createSubPath("policies");
- objectStore.createSubPath("enrichers");
- objectStore.createSubPath("feeds");
- objectStore.createSubPath("catalog");
-
- // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ?
- objectStore.createSubPath("plane");
-
- executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() {
- @Override public Thread newThread(Runnable r) {
- // Note: Thread name referenced in logback-includes' ThreadNameDiscriminator
- return new Thread(r, "brooklyn-persister");
- }}));
- }
-
- public MementoSerializer<Object> getMementoSerializer() {
- return getSerializerWithStandardClassLoader();
- }
-
- protected MementoSerializer<Object> getSerializerWithStandardClassLoader() {
- return serializerWithStandardClassLoader;
- }
-
- protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, BrooklynObjectType type, String objectId) {
- ClassLoader cl = getCustomClassLoaderForBrooklynObject(lookupContext, type, objectId);
- if (cl==null) return serializerWithStandardClassLoader;
- return getSerializerWithCustomClassLoader(lookupContext, cl);
- }
-
- protected MementoSerializer<Object> getSerializerWithCustomClassLoader(LookupContext lookupContext, ClassLoader classLoader) {
- int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
- MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
- MementoSerializer<Object> result = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
- result.setLookupContext(lookupContext);
- return result;
- }
-
- @Nullable protected ClassLoader getCustomClassLoaderForBrooklynObject(LookupContext lookupContext, BrooklynObjectType type, String objectId) {
- BrooklynObject item = lookupContext.peek(type, objectId);
- String catalogItemId = (item == null) ? null : item.getCatalogItemId();
- // TODO enrichers etc aren't yet known -- would need to backtrack to the entity to get them from bundles
- if (catalogItemId == null) {
- return null;
- }
- // See RebindIteration.BrooklynObjectInstantiator.load(), for handling where catalog item is missing;
- // similar logic here.
- CatalogItem<?, ?> catalogItem = CatalogUtils.getCatalogItemOptionalVersion(lookupContext.lookupManagementContext(), catalogItemId);
- if (catalogItem == null) {
- // TODO do we need to only log once, rather than risk log.warn too often? I think this only happens on rebind, so ok.
- LOG.warn("Unable to load catalog item "+catalogItemId+" for custom class loader of "+type+" "+objectId+"; will use default class loader");
- return null;
- } else {
- return ClassLoaderFromBrooklynClassLoadingContext.of(CatalogUtils.newClassLoadingContext(lookupContext.lookupManagementContext(), catalogItem));
- }
- }
-
- @Override public void enableWriteAccess() {
- writesAllowed = true;
- }
-
- @Override
- public void disableWriteAccess(boolean graceful) {
- writesShuttingDown = true;
- try {
- writesAllowed = false;
- // a very long timeout to ensure we don't lose state.
- // If persisting thousands of entities over slow network to Object Store, could take minutes.
- waitForWritesCompleted(Duration.ONE_HOUR);
-
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- } finally {
- writesShuttingDown = false;
- }
- }
-
- @Override
- public void stop(boolean graceful) {
- disableWriteAccess(graceful);
-
- if (executor != null) {
- if (graceful) {
- executor.shutdown();
- try {
- // should be quick because we've just turned off writes, waiting for their completion
- executor.awaitTermination(1, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- } else {
- executor.shutdownNow();
- }
- }
- }
-
- public PersistenceObjectStore getObjectStore() {
- return objectStore;
- }
-
- protected StoreObjectAccessorWithLock getWriter(String path) {
- String id = path.substring(path.lastIndexOf('/')+1);
- synchronized (writers) {
- StoreObjectAccessorWithLock writer = writers.get(id);
- if (writer == null) {
- writer = new StoreObjectAccessorLocking( objectStore.newAccessor(path) );
- writers.put(id, writer);
- }
- return writer;
- }
- }
-
- private Map<String,String> makeIdSubPathMap(Iterable<String> subPathLists) {
- Map<String,String> result = MutableMap.of();
- for (String subpath: subPathLists) {
- String id = subpath;
- id = id.substring(id.lastIndexOf('/')+1);
- id = id.substring(id.lastIndexOf('\\')+1);
- // assumes id is the filename; should work even if not, as id is later read from xpath
- // but you'll get warnings (and possibility of loss if there is a collision)
- result.put(id, subpath);
- }
- return result;
- }
-
- protected BrooklynMementoRawData listMementoSubPathsAsData(final RebindExceptionHandler exceptionHandler) {
- final BrooklynMementoRawData.Builder subPathDataBuilder = BrooklynMementoRawData.builder();
-
- Stopwatch stopwatch = Stopwatch.createStarted();
- try {
- for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER)
- subPathDataBuilder.putAll(type, makeIdSubPathMap(objectStore.listContentsWithSubPath(type.getSubPathName())));
-
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadMementoFailed(BrooklynObjectType.UNKNOWN, "Failed to list files", e);
- throw new IllegalStateException("Failed to list memento files in "+objectStore, e);
- }
-
- BrooklynMementoRawData subPathData = subPathDataBuilder.build();
- LOG.debug("Loaded rebind lists; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{
- Time.makeTimeStringRounded(stopwatch),
- subPathData.getEntities().size(), subPathData.getLocations().size(), subPathData.getPolicies().size(), subPathData.getEnrichers().size(),
- subPathData.getFeeds().size(), subPathData.getCatalogItems().size(),
- objectStore.getSummaryName() });
-
- return subPathData;
- }
-
- public BrooklynMementoRawData loadMementoRawData(final RebindExceptionHandler exceptionHandler) {
- BrooklynMementoRawData subPathData = listMementoSubPathsAsData(exceptionHandler);
-
- final BrooklynMementoRawData.Builder builder = BrooklynMementoRawData.builder();
-
- Visitor loaderVisitor = new Visitor() {
- @Override
- public void visit(BrooklynObjectType type, String id, String contentsSubpath) throws Exception {
- String contents = null;
- try {
- contents = read(contentsSubpath);
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadMementoFailed(type, "memento "+id+" read error", e);
- }
-
- String xmlId = (String) XmlUtil.xpath(contents, "/"+type.toCamelCase()+"/id");
- String safeXmlId = Strings.makeValidFilename(xmlId);
- if (!Objects.equal(id, safeXmlId))
- LOG.warn("ID mismatch on "+type.toCamelCase()+", "+id+" from path, "+safeXmlId+" from xml");
-
- builder.put(type, xmlId, contents);
- }
- };
-
- Stopwatch stopwatch = Stopwatch.createStarted();
-
- visitMemento("loading raw", subPathData, loaderVisitor, exceptionHandler);
-
- BrooklynMementoRawData result = builder.build();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded rebind raw data; took {}; {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{
- Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntities().size(),
- result.getLocations().size(), result.getPolicies().size(), result.getEnrichers().size(),
- result.getFeeds().size(), result.getCatalogItems().size(),
- objectStore.getSummaryName() });
- }
-
- return result;
- }
-
- @Override
- public BrooklynMementoManifest loadMementoManifest(BrooklynMementoRawData mementoData, final RebindExceptionHandler exceptionHandler) throws IOException {
- if (mementoData==null)
- mementoData = loadMementoRawData(exceptionHandler);
-
- final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
-
- Visitor visitor = new Visitor() {
- @Override
- public void visit(BrooklynObjectType type, String objectId, final String contents) throws Exception {
- final String prefix = "/"+type.toCamelCase()+"/";
-
- class XPathHelper {
- private String get(String innerPath) {
- return (String) XmlUtil.xpath(contents, prefix+innerPath);
- }
- }
- XPathHelper x = new XPathHelper();
-
- switch (type) {
- case ENTITY:
- builder.entity(x.get("id"), x.get("type"),
- Strings.emptyToNull(x.get("parent")), Strings.emptyToNull(x.get("catalogItemId")));
- break;
- case LOCATION:
- case POLICY:
- case ENRICHER:
- case FEED:
- builder.putType(type, x.get("id"), x.get("type"));
- break;
- case CATALOG_ITEM:
- try {
- CatalogItemMemento memento = (CatalogItemMemento) getSerializerWithStandardClassLoader().fromString(contents);
- if (memento == null) {
- LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing");
- } else {
- builder.catalogItem(memento);
- }
- } catch (Exception e) {
- exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" early catalog deserialization error", e);
- }
- break;
- default:
- throw new IllegalStateException("Unexpected brooklyn type: "+type);
- }
- }
- };
-
- Stopwatch stopwatch = Stopwatch.createStarted();
-
- visitMemento("manifests", mementoData, visitor, exceptionHandler);
-
- BrooklynMementoManifest result = builder.build();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded rebind manifests; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items; from {}", new Object[]{
- Time.makeTimeStringRounded(stopwatch),
- result.getEntityIdToManifest().size(), result.getLocationIdToType().size(),
- result.getPolicyIdToType().size(), result.getEnricherIdToType().size(), result.getFeedIdToType().size(),
- result.getCatalogItemMementos().size(),
- objectStore.getSummaryName() });
- }
-
- return result;
- }
-
- @Override
- public BrooklynMemento loadMemento(BrooklynMementoRawData mementoData, final LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException {
- if (mementoData==null)
- mementoData = loadMementoRawData(exceptionHandler);
-
- Stopwatch stopwatch = Stopwatch.createStarted();
-
- final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
-
- Visitor visitor = new Visitor() {
- @Override
- public void visit(BrooklynObjectType type, String objectId, String contents) throws Exception {
- try {
- Memento memento = (Memento) getSerializerWithCustomClassLoader(lookupContext, type, objectId).fromString(contents);
- if (memento == null) {
- LOG.warn("No "+type.toCamelCase()+"-memento deserialized from " + objectId + "; ignoring and continuing");
- } else {
- builder.memento(memento);
- }
- } catch (Exception e) {
- exceptionHandler.onLoadMementoFailed(type, "memento "+objectId+" deserialization error", e);
- }
- }
-
- };
-
- // TODO not convinced this is single threaded on reads; maybe should get a new one each time?
- getSerializerWithStandardClassLoader().setLookupContext(lookupContext);
- try {
- visitMemento("deserialization", mementoData, visitor, exceptionHandler);
- } finally {
- getSerializerWithStandardClassLoader().unsetLookupContext();
- }
-
- BrooklynMemento result = builder.build();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loaded rebind mementos; took {}: {} entities, {} locations, {} policies, {} enrichers, {} feeds, {} catalog items, from {}", new Object[]{
- Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(),
- result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
- result.getFeedIds().size(), result.getCatalogItemIds().size(),
- objectStore.getSummaryName() });
- }
-
- return result;
- }
-
- protected interface Visitor {
- public void visit(BrooklynObjectType type, String id, String contents) throws Exception;
- }
-
- protected void visitMemento(final String phase, final BrooklynMementoRawData rawData, final Visitor visitor, final RebindExceptionHandler exceptionHandler) {
- List<ListenableFuture<?>> futures = Lists.newArrayList();
-
- class VisitorWrapper implements Runnable {
- private final BrooklynObjectType type;
- private final Map.Entry<String,String> objectIdAndData;
- public VisitorWrapper(BrooklynObjectType type, Map.Entry<String,String> objectIdAndData) {
- this.type = type;
- this.objectIdAndData = objectIdAndData;
- }
- public void run() {
- try {
- visitor.visit(type, objectIdAndData.getKey(), objectIdAndData.getValue());
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- exceptionHandler.onLoadMementoFailed(type, "memento "+objectIdAndData.getKey()+" "+phase+" error", e);
- }
- }
- }
-
- for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
- for (final Map.Entry<String,String> entry : rawData.getObjectsOfType(type).entrySet()) {
- futures.add(executor.submit(new VisitorWrapper(type, entry)));
- }
- }
-
- try {
- // Wait for all, failing fast if any exceptions.
- Futures.allAsList(futures).get();
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
-
- List<Exception> exceptions = Lists.newArrayList();
-
- for (ListenableFuture<?> future : futures) {
- if (future.isDone()) {
- try {
- future.get();
- } catch (InterruptedException e2) {
- throw Exceptions.propagate(e2);
- } catch (ExecutionException e2) {
- LOG.warn("Problem loading memento ("+phase+"): "+e2, e2);
- exceptions.add(e2);
- }
- future.cancel(true);
- }
- }
- if (exceptions.isEmpty()) {
- throw Exceptions.propagate(e);
- } else {
- // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
- throw new CompoundRuntimeException("Problem loading mementos ("+phase+")", exceptions);
- }
- }
- }
-
- protected void checkWritesAllowed() {
- if (!writesAllowed && !writesShuttingDown) {
- throw new IllegalStateException("Writes not allowed in "+this);
- }
- }
-
- /** See {@link BrooklynPersistenceUtils} for conveniences for using this method. */
- @Override
- @Beta
- public void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler exceptionHandler) {
- checkWritesAllowed();
- try {
- lock.writeLock().lockInterruptibly();
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
-
- try {
- objectStore.prepareForMasterUse();
-
- Stopwatch stopwatch = Stopwatch.createStarted();
- List<ListenableFuture<?>> futures = Lists.newArrayList();
-
- for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
- for (Map.Entry<String, String> entry : newMemento.getObjectsOfType(type).entrySet()) {
- futures.add(asyncPersist(type.getSubPathName(), type, entry.getKey(), entry.getValue(), exceptionHandler));
- }
- }
-
- try {
- // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
- // But then propagate failure if any fail. (hence the two calls).
- Futures.successfulAsList(futures).get();
- Futures.allAsList(futures).get();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) {
- checkWritesAllowed();
-
- while (!queuedDeltas.isEmpty()) {
- Delta extraDelta = queuedDeltas.remove(0);
- doDelta(extraDelta, exceptionHandler, true);
- }
-
- doDelta(delta, exceptionHandler, false);
- }
-
- protected void doDelta(Delta delta, PersistenceExceptionHandler exceptionHandler, boolean previouslyQueued) {
- Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
-
- if (LOG.isDebugEnabled()) LOG.debug("Checkpointed "+(previouslyQueued ? "previously queued " : "")+"delta of memento in {}: "
- + "updated {} entities, {} locations, {} policies, {} enrichers, {} catalog items; "
- + "removed {} entities, {} locations, {} policies, {} enrichers, {} catalog items",
- new Object[] {Time.makeTimeStringRounded(stopwatch),
- delta.entities().size(), delta.locations().size(), delta.policies().size(), delta.enrichers().size(), delta.catalogItems().size(),
- delta.removedEntityIds().size(), delta.removedLocationIds().size(), delta.removedPolicyIds().size(), delta.removedEnricherIds().size(), delta.removedCatalogItemIds().size()});
- }
-
- @Override
- public void queueDelta(Delta delta) {
- queuedDeltas.add(delta);
- }
-
- /**
- * Concurrent calls will queue-up (the lock is "fair", which means an "approximately arrival-order policy").
- * Current usage is with the {@link PeriodicDeltaChangeListener} so we expect only one call at a time.
- *
- * TODO Longer term, if we care more about concurrent calls we could merge the queued deltas so that we
- * don't do unnecessary repeated writes of an entity.
- */
- private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
- try {
- lock.writeLock().lockInterruptibly();
- } catch (InterruptedException e) {
- throw Exceptions.propagate(e);
- }
- try {
- objectStore.prepareForMasterUse();
-
- Stopwatch stopwatch = Stopwatch.createStarted();
- List<ListenableFuture<?>> futures = Lists.newArrayList();
-
- for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
- for (Memento entity : delta.getObjectsOfType(type)) {
- futures.add(asyncPersist(type.getSubPathName(), entity, exceptionHandler));
- }
- }
- for (BrooklynObjectType type: BrooklynPersistenceUtils.STANDARD_BROOKLYN_OBJECT_TYPE_PERSISTENCE_ORDER) {
- for (String id : delta.getRemovedIdsOfType(type)) {
- futures.add(asyncDelete(type.getSubPathName(), id, exceptionHandler));
- }
- }
-
- try {
- // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
- // But then propagate failure if any fail. (hence the two calls).
- Futures.successfulAsList(futures).get();
- Futures.allAsList(futures).get();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
-
- return stopwatch;
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
- boolean locked = lock.readLock().tryLock(timeout.toMillisecondsRoundingUp(), TimeUnit.MILLISECONDS);
- if (locked) {
- ImmutableSet<StoreObjectAccessorWithLock> wc;
- synchronized (writers) {
- wc = ImmutableSet.copyOf(writers.values());
- }
- lock.readLock().unlock();
-
- // Belt-and-braces: the lock above should be enough to ensure no outstanding writes, because
- // each writer is now synchronous.
- for (StoreObjectAccessorWithLock writer : wc) {
- writer.waitForCurrentWrites(timeout);
- }
- } else {
- throw new TimeoutException("Timeout waiting for writes to "+objectStore);
- }
- }
-
- private String read(String subPath) {
- StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
- return objectAccessor.get();
- }
-
- private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) {
- try {
- getWriter(getPath(subPath, memento.getId())).put(getSerializerWithStandardClassLoader().toString(memento));
- } catch (Exception e) {
- exceptionHandler.onPersistMementoFailed(memento, e);
- }
- }
-
- private void persist(String subPath, BrooklynObjectType type, String id, String content, PersistenceExceptionHandler exceptionHandler) {
- try {
- if (content==null) {
- LOG.warn("Null content for "+type+" "+id);
- }
- getWriter(getPath(subPath, id)).put(content);
- } catch (Exception e) {
- exceptionHandler.onPersistRawMementoFailed(type, id, e);
- }
- }
-
- private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) {
- try {
- StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
- w.delete();
- synchronized (writers) {
- writers.remove(id);
- }
- } catch (Exception e) {
- exceptionHandler.onDeleteMementoFailed(id, e);
- }
- }
-
- private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
- return executor.submit(new Runnable() {
- public void run() {
- persist(subPath, memento, exceptionHandler);
- }});
- }
-
- private ListenableFuture<?> asyncPersist(final String subPath, final BrooklynObjectType type, final String id, final String content, final PersistenceExceptionHandler exceptionHandler) {
- return executor.submit(new Runnable() {
- public void run() {
- persist(subPath, type, id, content, exceptionHandler);
- }});
- }
-
- private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) {
- return executor.submit(new Runnable() {
- public void run() {
- delete(subPath, id, exceptionHandler);
- }});
- }
-
- private String getPath(String subPath, String id) {
- return subPath+"/"+Strings.makeValidFilename(id);
- }
-
- @Override
- public String getBackingStoreDescription() {
- return getObjectStore().getSummaryName();
- }
-}