You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/04/03 21:59:01 UTC
svn commit: r1584365 [8/15] - in
/qpid/branches/java-broker-config-store-changes/qpid/java: ./
bdbstore/jmx/src/main/java/org/apache/qpid/server/store/berkeleydb/jmx/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/jav...
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java Thu Apr 3 19:58:53 2014
@@ -20,18 +20,18 @@
*/
package org.apache.qpid.server.security.auth.manager;
-import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
import org.apache.qpid.server.configuration.RecovererProvider;
import org.apache.qpid.server.configuration.updater.ChangeAttributesTask;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.*;
-import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.scram.ScramSHA1SaslServer;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
@@ -40,7 +40,6 @@ import javax.security.sasl.SaslException
import javax.security.sasl.SaslServer;
import javax.xml.bind.DatatypeConverter;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.security.AccessControlException;
import java.security.InvalidKeyException;
@@ -56,6 +55,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+@ManagedObject( category = false, type = "SCRAM-SHA1" )
public class ScramSHA1AuthenticationManager
extends AbstractAuthenticationManager<ScramSHA1AuthenticationManager>
implements PasswordCredentialManagingAuthenticationProvider<ScramSHA1AuthenticationManager>,
@@ -71,8 +71,7 @@ public class ScramSHA1AuthenticationMana
protected ScramSHA1AuthenticationManager(final Broker broker,
final Map<String, Object> defaults,
- final Map<String, Object> attributes,
- final boolean recovering)
+ final Map<String, Object> attributes)
{
super(broker, defaults, attributes);
}
@@ -265,7 +264,7 @@ public class ScramSHA1AuthenticationMana
userAttrs.put(User.NAME, username);
userAttrs.put(User.PASSWORD, createStoredPassword(password));
userAttrs.put(User.TYPE, SCRAM_USER_TYPE);
- ScramAuthUser user = new ScramAuthUser(userAttrs);
+ ScramAuthUser user = new ScramAuthUser(userAttrs, this);
_users.put(username, user);
return true;
@@ -425,30 +424,25 @@ public class ScramSHA1AuthenticationMana
@Override
public ConfiguredObjectRecoverer<? extends ConfiguredObject> getRecoverer(final String type)
{
- if("User".equals(type))
- {
- return new UserRecoverer();
- }
- else
- {
- return null;
- }
+ return null;
}
- private class ScramAuthUser extends AbstractConfiguredObject<ScramAuthUser> implements User<ScramAuthUser>
+ @ManagedObject( category = false, type = "scram")
+ static class ScramAuthUser extends AbstractConfiguredObject<ScramAuthUser> implements User<ScramAuthUser>
{
-
- protected ScramAuthUser(final Map<String, Object> attributes)
+ private ScramSHA1AuthenticationManager _authenticationManager;
+ protected ScramAuthUser(final Map<String, Object> attributes, ScramSHA1AuthenticationManager parent)
{
- super(parentsMap(ScramSHA1AuthenticationManager.this),
+ super(parentsMap(parent),
Collections.<String,Object>emptyMap(),
- attributes, ScramSHA1AuthenticationManager.this.getTaskExecutor());
-
+ attributes, parent.getTaskExecutor());
+ _authenticationManager = parent;
if(!ASCII.newEncoder().canEncode(getName()))
{
throw new IllegalArgumentException("Scram SHA1 user names are restricted to characters in the ASCII charset");
}
+
}
@Override
@@ -456,9 +450,9 @@ public class ScramSHA1AuthenticationMana
{
if(desiredState == State.DELETED)
{
- getSecurityManager().authoriseUserOperation(Operation.DELETE, getName());
- _users.remove(getName());
- ScramSHA1AuthenticationManager.this.childRemoved(this);
+ _authenticationManager.getSecurityManager().authoriseUserOperation(Operation.DELETE, getName());
+ _authenticationManager._users.remove(getName());
+ _authenticationManager.childRemoved(this);
return true;
}
else
@@ -479,7 +473,7 @@ public class ScramSHA1AuthenticationMana
{
try
{
- modifiedAttributes.put(User.PASSWORD, createStoredPassword(newPassword));
+ modifiedAttributes.put(User.PASSWORD, _authenticationManager.createStoredPassword(newPassword));
}
catch (SaslException e)
{
@@ -514,11 +508,12 @@ public class ScramSHA1AuthenticationMana
@Override
public void setPassword(final String password)
{
- getSecurityManager().authoriseUserOperation(Operation.UPDATE, getName());
+ _authenticationManager.getSecurityManager().authoriseUserOperation(Operation.UPDATE, getName());
try
{
- changeAttribute(User.PASSWORD, getAttribute(User.PASSWORD), createStoredPassword(password));
+ changeAttribute(User.PASSWORD, getAttribute(User.PASSWORD), _authenticationManager.createStoredPassword(
+ password));
}
catch (SaslException e)
{
@@ -579,7 +574,7 @@ public class ScramSHA1AuthenticationMana
@Override
public Map<String, Object> getPreferences()
{
- PreferencesProvider preferencesProvider = getPreferencesProvider();
+ PreferencesProvider preferencesProvider = _authenticationManager.getPreferencesProvider();
if (preferencesProvider == null)
{
return null;
@@ -601,7 +596,7 @@ public class ScramSHA1AuthenticationMana
@Override
public Map<String, Object> setPreferences(Map<String, Object> preferences)
{
- PreferencesProvider preferencesProvider = getPreferencesProvider();
+ PreferencesProvider preferencesProvider = _authenticationManager.getPreferencesProvider();
if (preferencesProvider == null)
{
return null;
@@ -612,7 +607,7 @@ public class ScramSHA1AuthenticationMana
@Override
public boolean deletePreferences()
{
- PreferencesProvider preferencesProvider = getPreferencesProvider();
+ PreferencesProvider preferencesProvider = _authenticationManager.getPreferencesProvider();
if (preferencesProvider == null)
{
return false;
@@ -680,17 +675,13 @@ public class ScramSHA1AuthenticationMana
}
}
- private class UserRecoverer implements ConfiguredObjectRecoverer<ScramAuthUser>
+ public void instantiateUser(User<?> user)
{
- @Override
- public ScramAuthUser create(final RecovererProvider recovererProvider,
- final ConfigurationEntry entry,
- final ConfiguredObject... parents)
+ if(!(user instanceof ScramAuthUser))
{
-
- Map<String,Object> attributes = new HashMap<String, Object>(entry.getAttributes());
- attributes.put(User.ID,entry.getId());
- return new ScramAuthUser(attributes);
+ throw new IllegalArgumentException("Only users of type " + ScramAuthUser.class.getSimpleName() + " can be add to a " + getClass().getSimpleName());
}
+ _users.put(user.getName(), (ScramAuthUser) user);
+
}
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerFactory.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerFactory.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManagerFactory.java Thu Apr 3 19:58:53 2014
@@ -21,15 +21,14 @@ package org.apache.qpid.server.security.
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.plugin.AuthenticationManagerFactory;
-import org.apache.qpid.server.util.ResourceBundleLoader;
+import org.apache.qpid.server.model.ConfiguredObject;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-public class ScramSHA1AuthenticationManagerFactory implements AuthenticationManagerFactory
+public class ScramSHA1AuthenticationManagerFactory extends AbstractAuthenticationManagerFactory<ScramSHA1AuthenticationManager>
{
public static final String PROVIDER_TYPE = "SCRAM-SHA1";
@@ -40,18 +39,9 @@ public class ScramSHA1AuthenticationMana
AuthenticationProvider.TYPE
));
- @Override
- public ScramSHA1AuthenticationManager createInstance(Broker broker,
- Map<String, Object> attributes,
- final boolean recovering)
+ public ScramSHA1AuthenticationManagerFactory()
{
- if (attributes == null || !PROVIDER_TYPE.equals(attributes.get(AuthenticationProvider.TYPE)))
- {
- return null;
- }
-
-
- return new ScramSHA1AuthenticationManager(broker, Collections.<String,Object>emptyMap(),attributes, false);
+ super(ScramSHA1AuthenticationManager.class);
}
@Override
@@ -61,14 +51,16 @@ public class ScramSHA1AuthenticationMana
}
@Override
- public String getType()
+ public Map<String, String> getAttributeDescriptions()
{
- return PROVIDER_TYPE;
+ return Collections.emptyMap();
}
@Override
- public Map<String, String> getAttributeDescriptions()
+ public ScramSHA1AuthenticationManager createInstance(final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
{
- return Collections.emptyMap();
+ return new ScramSHA1AuthenticationManager(getParent(Broker.class, parents), Collections.<String,Object>emptyMap(),attributes);
}
+
}
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1UserRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1UserRecoverer.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1UserRecoverer.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1UserRecoverer.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.security.auth.manager;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.ConfiguredObject;
+
+import java.util.Map;
+
+public class ScramSHA1UserRecoverer extends AbstractConfiguredObjectTypeFactory<ScramSHA1AuthenticationManager.ScramAuthUser>
+{
+ public ScramSHA1UserRecoverer()
+ {
+ super(ScramSHA1AuthenticationManager.ScramAuthUser.class);
+ }
+
+ @Override
+ public ScramSHA1AuthenticationManager.ScramAuthUser createInstance(final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
+ {
+ return new ScramSHA1AuthenticationManager.ScramAuthUser(attributes, (ScramSHA1AuthenticationManager)getParent(AuthenticationProvider.class, parents));
+ }
+}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleAuthenticationManager.java Thu Apr 3 19:58:53 2014
@@ -38,11 +38,13 @@ import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.sasl.plain.PlainPasswordCallback;
import org.apache.qpid.server.security.auth.sasl.plain.PlainSaslServer;
+@ManagedObject( category = false, type = "Simple" )
public class SimpleAuthenticationManager extends AbstractAuthenticationManager<SimpleAuthenticationManager>
{
private static final Logger _logger = Logger.getLogger(SimpleAuthenticationManager.class);
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManager.java Thu Apr 3 19:58:53 2014
@@ -50,6 +50,7 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.ManagedObject;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
@@ -62,6 +63,7 @@ import org.apache.qpid.server.util.Serve
import org.apache.qpid.server.util.StringUtil;
import org.apache.qpid.ssl.SSLContextFactory;
+@ManagedObject( category = false, type = "SimpleLDAP" )
public class SimpleLDAPAuthenticationManager extends AbstractAuthenticationManager<SimpleLDAPAuthenticationManager>
{
private static final Logger _logger = Logger.getLogger(SimpleLDAPAuthenticationManager.class);
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactory.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactory.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/SimpleLDAPAuthenticationManagerFactory.java Thu Apr 3 19:58:53 2014
@@ -19,22 +19,19 @@
*/
package org.apache.qpid.server.security.auth.manager;
+import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.util.ResourceBundleLoader;
+
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.AuthenticationProvider;
-import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.TrustStore;
-import org.apache.qpid.server.plugin.AuthenticationManagerFactory;
-import org.apache.qpid.server.util.ResourceBundleLoader;
-
-public class SimpleLDAPAuthenticationManagerFactory implements AuthenticationManagerFactory
+public class SimpleLDAPAuthenticationManagerFactory extends AbstractAuthenticationManagerFactory<SimpleLDAPAuthenticationManager>
{
public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.auth.manager.SimpleLDAPAuthenticationProviderAttributeDescriptions";
- private static final String DEFAULT_LDAP_CONTEXT_FACTORY = "com.sun.jndi.ldap.LdapCtxFactory";
public static final String PROVIDER_TYPE = "SimpleLDAP";
@@ -56,18 +53,9 @@ public class SimpleLDAPAuthenticationMan
ATTRIBUTE_LDAP_CONTEXT_FACTORY
));
- @Override
- public SimpleLDAPAuthenticationManager createInstance(Broker broker,
- Map<String, Object> attributes,
- final boolean recovering)
+ public SimpleLDAPAuthenticationManagerFactory()
{
- if (attributes == null || !PROVIDER_TYPE.equals(attributes.get(AuthenticationProvider.TYPE)))
- {
- return null;
- }
-
-
- return new SimpleLDAPAuthenticationManager(broker, Collections.<String,Object>emptyMap(),attributes);
+ super(SimpleLDAPAuthenticationManager.class);
}
@Override
@@ -77,14 +65,16 @@ public class SimpleLDAPAuthenticationMan
}
@Override
- public String getType()
+ public Map<String, String> getAttributeDescriptions()
{
- return PROVIDER_TYPE;
+ return ResourceBundleLoader.getResources(RESOURCE_BUNDLE);
}
@Override
- public Map<String, String> getAttributeDescriptions()
+ public SimpleLDAPAuthenticationManager createInstance(final Map<String, Object> attributes,
+ final ConfiguredObject<?>... parents)
{
- return ResourceBundleLoader.getResources(RESOURCE_BUNDLE);
+ return new SimpleLDAPAuthenticationManager(getParent(Broker.class, parents), Collections.<String,Object>emptyMap(),attributes);
}
+
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Thu Apr 3 19:58:53 2014
@@ -103,4 +103,9 @@ public interface StatisticsGatherer
* Reset the counters for this, and any child {@link StatisticsGatherer}s.
*/
void resetStatistics();
+
+ interface Source
+ {
+ StatisticsGatherer getStatisticsGatherer();
+ }
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractDurableConfiguredObjectRecoverer.java Thu Apr 3 19:58:53 2014
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.store;
-import java.util.Map;
import java.util.UUID;
public abstract class AbstractDurableConfiguredObjectRecoverer<T> implements DurableConfiguredObjectRecoverer
@@ -41,21 +40,26 @@ public abstract class AbstractDurableCon
else
{
durableConfigurationRecoverer.addResolutionListener(dependency.getType(), dependency.getId(),
- new DependencyListener()
- {
+ new DependencyListener()
+ {
- @Override
- public void dependencyResolved(final String depType,
- final UUID depId,
- final Object o)
- {
- dependency.resolve(o);
- if(obj.getUnresolvedDependencies().length == 0)
- {
- durableConfigurationRecoverer.resolve(getType(), record.getId(), obj.resolve());
- }
- }
- });
+ @Override
+ public void dependencyResolved(final String depType,
+ final UUID depId,
+ final Object o)
+ {
+ dependency.resolve(o);
+ if (obj.getUnresolvedDependencies().length
+ == 0)
+ {
+ durableConfigurationRecoverer.resolve(
+ getType(),
+ record.getId(),
+ obj.resolve());
+ }
+ }
+ }
+ );
}
}
if(obj.getUnresolvedDependencies().length == 0)
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java Thu Apr 3 19:58:53 2014
@@ -21,11 +21,7 @@
package org.apache.qpid.server.store;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
@@ -36,16 +32,24 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.transport.ConnectionOpen;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -77,15 +81,15 @@ abstract public class AbstractJDBCMessag
private static final int DEFAULT_CONFIG_VERSION = 0;
- public static String[] ALL_TABLES =
- new String[]{DB_VERSION_TABLE_NAME, XID_ACTIONS_TABLE_NAME,
- XID_TABLE_NAME, QUEUE_ENTRY_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME, META_DATA_TABLE_NAME,
- CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURED_OBJECT_HIERARCHY_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME};
+ public static final Set<String> CONFIGURATION_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(CONFIGURED_OBJECTS_TABLE_NAME, CONFIGURATION_VERSION_TABLE_NAME));
+ public static final Set<String> MESSAGE_STORE_TABLE_NAMES = new HashSet<String>(Arrays.asList(DB_VERSION_TABLE_NAME,
+ META_DATA_TABLE_NAME, MESSAGE_CONTENT_TABLE_NAME,
+ QUEUE_ENTRY_TABLE_NAME,
+ XID_TABLE_NAME, XID_ACTIONS_TABLE_NAME));
private static final int DB_VERSION = 8;
private final AtomicLong _messageId = new AtomicLong(0);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+ DB_VERSION_TABLE_NAME + " ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+ DB_VERSION_TABLE_NAME + " ( version ) VALUES ( ? )";
@@ -172,69 +176,130 @@ abstract public class AbstractJDBCMessag
protected final EventManager _eventManager = new EventManager();
- protected final StateManager _stateManager;
+ private final AtomicBoolean _messageStoreOpen = new AtomicBoolean();
+ private final AtomicBoolean _configurationStoreOpen = new AtomicBoolean();
- private MessageStoreRecoveryHandler _messageRecoveryHandler;
- private TransactionLogRecoveryHandler _tlogRecoveryHandler;
- private ConfigurationRecoveryHandler _configRecoveryHandler;
- private VirtualHost _virtualHost;
+ private boolean _initialized;
- public AbstractJDBCMessageStore()
- {
- _stateManager = new StateManager(_eventManager);
- }
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler configRecoveryHandler)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
- _stateManager.attainState(State.INITIALISING);
- _configRecoveryHandler = configRecoveryHandler;
- _virtualHost = virtualHost;
+ if (_configurationStoreOpen.compareAndSet(false, true))
+ {
+ initialiseIfNecessary(parent.getName(), storeSettings);
+ try
+ {
+ createOrOpenConfigurationStoreDatabase();
+ upgradeIfVersionTableExists(parent);
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Cannot create databases or upgrade", e);
+ }
+ }
+ }
+ private void initialiseIfNecessary(String virtualHostName, Map<String, Object> storeSettings)
+ {
+ if (!_initialized)
+ {
+ try
+ {
+ implementationSpecificConfiguration(virtualHostName, storeSettings);
+ }
+ catch (ClassNotFoundException e)
+ {
+ throw new StoreException("Cannot find driver class", e);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unexpected exception occured", e);
+ }
+ _initialized = true;
+ }
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
- if(_stateManager.isInState(State.INITIAL))
+ checkConfigurationStoreOpen();
+
+ try
{
- _stateManager.attainState(State.INITIALISING);
+ recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
+ loadConfiguredObjects(recoveryHandler);
+ setConfigVersion(recoveryHandler.completeConfigurationRecovery());
}
+ catch (SQLException e)
+ {
+ throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
+ }
+ }
- _virtualHost = virtualHost;
- _tlogRecoveryHandler = tlogRecoveryHandler;
- _messageRecoveryHandler = recoveryHandler;
-
- completeInitialisation();
+ private void checkConfigurationStoreOpen()
+ {
+ if (!_configurationStoreOpen.get())
+ {
+ throw new IllegalStateException("Configuration store is not open");
+ }
}
- private void completeInitialisation()
+ private void checkMessageStoreOpen()
{
- commonConfiguration();
+ if (!_messageStoreOpen.get())
+ {
+ throw new IllegalStateException("Message store is not open");
+ }
+ }
- _stateManager.attainState(State.INITIALISED);
+ private void upgradeIfVersionTableExists(ConfiguredObject<?> parent)
+ throws SQLException {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ if (tableExists(DB_VERSION_TABLE_NAME, conn))
+ {
+ upgradeIfNecessary(parent);
+ }
+ }
+ finally
+ {
+ if (conn != null)
+ {
+ conn.close();
+ }
+ }
}
@Override
- public void activate()
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
- if(_stateManager.isInState(State.INITIALISING))
+ if (_messageStoreOpen.compareAndSet(false, true))
{
- completeInitialisation();
+ initialiseIfNecessary(parent.getName(), messageStoreSettings);
+ try
+ {
+ createOrOpenMessageStoreDatabase();
+ upgradeIfNecessary(parent);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Unable to activate message store ", e);
+ }
}
- _stateManager.attainState(State.ACTIVATING);
+ }
- // this recovers durable exchanges, queues, and bindings
- if(_configRecoveryHandler != null)
- {
- recoverConfiguration(_configRecoveryHandler);
- }
- if(_messageRecoveryHandler != null)
+ @Override
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
+ {
+ checkMessageStoreOpen();
+
+ if(messageRecoveryHandler != null)
{
try
{
- recoverMessages(_messageRecoveryHandler);
+ recoverMessages(messageRecoveryHandler);
}
catch (SQLException e)
{
@@ -242,11 +307,11 @@ abstract public class AbstractJDBCMessag
"persistent store ", e);
}
}
- if(_tlogRecoveryHandler != null)
+ if(transactionLogRecoveryHandler != null)
{
try
{
- TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(_tlogRecoveryHandler);
+ TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = recoverQueueEntries(transactionLogRecoveryHandler);
recoverXids(dtxrh);
}
catch (SQLException e)
@@ -256,29 +321,9 @@ abstract public class AbstractJDBCMessag
}
}
-
- _stateManager.attainState(State.ACTIVE);
}
- private void commonConfiguration()
- {
- try
- {
- implementationSpecificConfiguration(_virtualHost.getName(), _virtualHost);
- createOrOpenDatabase();
- upgradeIfNecessary();
- }
- catch (ClassNotFoundException e)
- {
- throw new StoreException("Unable to configure message store ", e);
- }
- catch (SQLException e)
- {
- throw new StoreException("Unable to configure message store ", e);
- }
- }
-
- protected void upgradeIfNecessary() throws SQLException
+ protected void upgradeIfNecessary(ConfiguredObject<?> parent) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -300,7 +345,7 @@ abstract public class AbstractJDBCMessag
case 6:
upgradeFromV6();
case 7:
- upgradeFromV7();
+ upgradeFromV7(parent);
case DB_VERSION:
return;
default:
@@ -330,7 +375,7 @@ abstract public class AbstractJDBCMessag
}
- private void upgradeFromV7() throws SQLException
+ private void upgradeFromV7(ConfiguredObject<?> parent) throws SQLException
{
Connection connection = newConnection();
try
@@ -390,7 +435,7 @@ abstract public class AbstractJDBCMessag
{
stmt.setString(1, id.toString());
stmt.setString(2, "VirtualHost");
- stmt.setString(3, _virtualHost.getId().toString());
+ stmt.setString(3, parent.getId().toString());
stmt.execute();
}
for(Map.Entry<UUID, Map<String,Object>> bindingEntry : bindingsToUpdate.entrySet())
@@ -481,8 +526,7 @@ abstract public class AbstractJDBCMessag
}
}
- protected abstract void implementationSpecificConfiguration(String name,
- VirtualHost virtualHost) throws ClassNotFoundException, SQLException;
+ protected abstract void implementationSpecificConfiguration(String name, Map<String, Object> messageStoreSettings) throws ClassNotFoundException, SQLException;
abstract protected Logger getLogger();
@@ -492,14 +536,11 @@ abstract public class AbstractJDBCMessag
abstract protected String getSqlBigIntType();
- protected void createOrOpenDatabase() throws SQLException
+ protected void createOrOpenMessageStoreDatabase() throws SQLException
{
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
- createConfigVersionTable(conn);
- createConfiguredObjectsTable(conn);
- createConfiguredObjectHierarchyTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
@@ -508,6 +549,17 @@ abstract public class AbstractJDBCMessag
conn.close();
}
+ protected void createOrOpenConfigurationStoreDatabase() throws SQLException
+ {
+ Connection conn = newAutoCommitConnection();
+
+ createConfigVersionTable(conn);
+ createConfiguredObjectsTable(conn);
+ createConfiguredObjectHierarchyTable(conn);
+
+ conn.close();
+ }
+
private void createVersionTable(final Connection conn) throws SQLException
{
if(!tableExists(DB_VERSION_TABLE_NAME, conn))
@@ -596,8 +648,6 @@ abstract public class AbstractJDBCMessag
}
}
-
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -662,8 +712,6 @@ abstract public class AbstractJDBCMessag
}
-
-
private void createXidTable(final Connection conn) throws SQLException
{
if(!tableExists(XID_TABLE_NAME, conn))
@@ -734,21 +782,6 @@ abstract public class AbstractJDBCMessag
}
}
- protected void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler)
- {
- try
- {
- recoveryHandler.beginConfigurationRecovery(this, getConfigVersion());
- loadConfiguredObjects(recoveryHandler);
-
- setConfigVersion(recoveryHandler.completeConfigurationRecovery());
- }
- catch (SQLException e)
- {
- throw new StoreException("Error recovering persistent state: " + e.getMessage(), e);
- }
- }
-
private void setConfigVersion(int version) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -811,24 +844,36 @@ abstract public class AbstractJDBCMessag
}
@Override
- public void close()
+ public void closeMessageStore()
{
- if (_closed.compareAndSet(false, true))
+ if (_messageStoreOpen.compareAndSet(true, false))
{
- _stateManager.attainState(State.CLOSING);
-
- doClose();
-
- _stateManager.attainState(State.CLOSED);
+ if (!_configurationStoreOpen.get())
+ {
+ doClose();
+ }
}
}
+ @Override
+ public void closeConfigurationStore()
+ {
+ if (_configurationStoreOpen.compareAndSet(true, false))
+ {
+ if (!_messageStoreOpen.get())
+ {
+ doClose();
+ }
+ }
+ }
protected abstract void doClose();
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
+ checkMessageStoreOpen();
+
if(metaData.isPersistent())
{
return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
@@ -839,12 +884,7 @@ abstract public class AbstractJDBCMessag
}
}
- public StoredMessage getMessage(long messageNumber)
- {
- return null;
- }
-
- public void removeMessage(long messageId)
+ private void removeMessage(long messageId)
{
try
{
@@ -908,27 +948,24 @@ abstract public class AbstractJDBCMessag
@Override
public void create(ConfiguredObjectRecord object) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- insertConfiguredObject(object, conn);
- conn.commit();
- }
- finally
- {
- conn.close();
- }
+ insertConfiguredObject(object, conn);
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error creating ConfiguredObject " + object);
+ conn.close();
}
}
-
+ catch (SQLException e)
+ {
+ throw new StoreException("Error creating ConfiguredObject " + object);
+ }
}
/**
@@ -986,46 +1023,15 @@ abstract public class AbstractJDBCMessag
protected abstract Connection getConnection() throws SQLException;
- private byte[] convertStringMapToBytes(final Map<String, String> arguments) throws StoreException
- {
- byte[] argumentBytes;
- if(arguments == null)
- {
- argumentBytes = new byte[0];
- }
- else
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
-
-
- try
- {
- dos.writeInt(arguments.size());
- for(Map.Entry<String,String> arg : arguments.entrySet())
- {
- dos.writeUTF(arg.getKey());
- dos.writeUTF(arg.getValue());
- }
- }
- catch (IOException e)
- {
- // This should never happen
- throw new StoreException(e.getMessage(), e);
- }
- argumentBytes = bos.toByteArray();
- }
- return argumentBytes;
- }
-
@Override
public Transaction newTransaction()
{
+ checkMessageStoreOpen();
+
return new JDBCTransaction();
}
- public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1068,8 +1074,7 @@ abstract public class AbstractJDBCMessag
}
- public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws
- StoreException
+ private void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -1233,11 +1238,6 @@ abstract public class AbstractJDBCMessag
}
- protected boolean isConfigStoreOnly()
- {
- return _messageRecoveryHandler == null;
- }
-
private static final class ConnectionWrapper
{
private final Connection _connection;
@@ -1254,7 +1254,7 @@ abstract public class AbstractJDBCMessag
}
- public void commitTran(ConnectionWrapper connWrapper) throws StoreException
+ private void commitTran(ConnectionWrapper connWrapper) throws StoreException
{
try
@@ -1279,13 +1279,13 @@ abstract public class AbstractJDBCMessag
}
}
- public StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
+ private StoreFuture commitTranAsync(ConnectionWrapper connWrapper) throws StoreException
{
commitTran(connWrapper);
return StoreFuture.IMMEDIATE_FUTURE;
}
- public void abortTran(ConnectionWrapper connWrapper) throws StoreException
+ private void abortTran(ConnectionWrapper connWrapper) throws StoreException
{
if (connWrapper == null)
{
@@ -1310,11 +1310,6 @@ abstract public class AbstractJDBCMessag
}
- public Long getNewMessageId()
- {
- return _messageId.incrementAndGet();
- }
-
private void storeMetaData(Connection conn, long messageId, StorableMessageMetaData metaData)
throws SQLException
{
@@ -1368,7 +1363,7 @@ abstract public class AbstractJDBCMessag
}
- protected void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
+ private void recoverMessages(MessageStoreRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1425,7 +1420,7 @@ abstract public class AbstractJDBCMessag
}
- protected TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler recoverQueueEntries(TransactionLogRecoveryHandler recoveryHandler) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1555,7 +1550,7 @@ abstract public class AbstractJDBCMessag
}
}
- protected void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
+ private void recoverXids(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh) throws SQLException
{
Connection conn = newAutoCommitConnection();
try
@@ -1642,7 +1637,7 @@ abstract public class AbstractJDBCMessag
}
- StorableMessageMetaData getMetaData(long messageId) throws SQLException
+ private StorableMessageMetaData getMetaData(long messageId) throws SQLException
{
Connection conn = newAutoCommitConnection();
@@ -1724,7 +1719,7 @@ abstract public class AbstractJDBCMessag
}
- public int getContent(long messageId, int offset, ByteBuffer dst)
+ private int getContent(long messageId, int offset, ByteBuffer dst)
{
Connection conn = null;
PreparedStatement stmt = null;
@@ -1805,6 +1800,8 @@ abstract public class AbstractJDBCMessag
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
@@ -1825,12 +1822,16 @@ abstract public class AbstractJDBCMessag
@Override
public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.dequeueMessage(_connWrapper, queue, message.getMessageNumber());
}
@Override
public void commitTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
}
@@ -1838,6 +1839,8 @@ abstract public class AbstractJDBCMessag
@Override
public StoreFuture commitTranAsync()
{
+ checkMessageStoreOpen();
+
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
return storeFuture;
@@ -1846,18 +1849,24 @@ abstract public class AbstractJDBCMessag
@Override
public void abortTran()
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@Override
public void removeXid(long format, byte[] globalId, byte[] branchId)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.removeXid(_connWrapper, format, globalId, branchId);
}
@Override
public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
{
+ checkMessageStoreOpen();
+
AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
}
}
@@ -1899,6 +1908,7 @@ abstract public class AbstractJDBCMessag
StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
if(metaData == null)
{
+ checkMessageStoreOpen();
try
{
metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
@@ -1954,6 +1964,7 @@ abstract public class AbstractJDBCMessag
}
else
{
+ checkMessageStoreOpen();
return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
}
}
@@ -1972,6 +1983,8 @@ abstract public class AbstractJDBCMessag
@Override
public synchronized StoreFuture flushToStore()
{
+ checkMessageStoreOpen();
+
Connection conn = null;
try
{
@@ -2003,6 +2016,8 @@ abstract public class AbstractJDBCMessag
@Override
public void remove()
{
+ checkMessageStoreOpen();
+
int delta = getMetaData().getContentSize();
AbstractJDBCMessageStore.this.removeMessage(_messageId);
storedSizeChange(-delta);
@@ -2147,12 +2162,13 @@ abstract public class AbstractJDBCMessag
{
throw new StoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
}
-
}
@Override
public UUID[] remove(ConfiguredObjectRecord... objects) throws StoreException
{
+ checkConfigurationStoreOpen();
+
Collection<UUID> removed = new ArrayList<UUID>(objects.length);
try
{
@@ -2209,31 +2225,27 @@ abstract public class AbstractJDBCMessag
public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException
{
- if (_stateManager.isInState(State.ACTIVE) || _stateManager.isInState(State.ACTIVATING))
+ checkConfigurationStoreOpen();
+ try
{
+ Connection conn = newConnection();
try
{
- Connection conn = newConnection();
- try
- {
- for(ConfiguredObjectRecord record : records)
- {
- updateConfiguredObject(record, createIfNecessary, conn);
- }
- conn.commit();
- }
- finally
+ for(ConfiguredObjectRecord record : records)
{
- conn.close();
+ updateConfiguredObject(record, createIfNecessary, conn);
}
+ conn.commit();
}
- catch (SQLException e)
+ finally
{
- throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ conn.close();
}
-
}
-
+ catch (SQLException e)
+ {
+ throw new StoreException("Error updating configured objects in database: " + e.getMessage(), e);
+ }
}
private void updateConfiguredObject(ConfiguredObjectRecord configuredObject,
@@ -2241,89 +2253,88 @@ abstract public class AbstractJDBCMessag
Connection conn)
throws SQLException, StoreException
{
- PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
try
{
- stmt.setString(1, configuredObject.getId().toString());
- ResultSet rs = stmt.executeQuery();
- try
+ final ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.registerModule(_module);
+ if (rs.next())
{
- final ObjectMapper objectMapper = new ObjectMapper();
- objectMapper.registerModule(_module);
- if (rs.next())
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
- try
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
{
- stmt2.setString(1, configuredObject.getType());
- if (configuredObject.getAttributes() != null)
- {
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
- configuredObject.getAttributes());
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
- }
- else
- {
- stmt2.setNull(2, Types.BLOB);
- }
- stmt2.setString(3, configuredObject.getId().toString());
- stmt2.execute();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(
+ configuredObject.getAttributes());
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
}
- finally
+ else
{
- stmt2.close();
+ stmt2.setNull(2, Types.BLOB);
}
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
}
- else if(createIfNecessary)
+ }
+ else if(createIfNecessary)
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
{
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
- try
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
{
- insertStmt.setString(1, configuredObject.getId().toString());
- insertStmt.setString(2, configuredObject.getType());
- if(configuredObject.getAttributes() == null)
- {
- insertStmt.setNull(3, Types.BLOB);
- }
- else
- {
- final Map<String, Object> attributes = configuredObject.getAttributes();
- byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
- ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
- insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
- }
- insertStmt.execute();
+ insertStmt.setNull(3, Types.BLOB);
}
- finally
+ else
{
- insertStmt.close();
+ final Map<String, Object> attributes = configuredObject.getAttributes();
+ byte[] attributesAsBytes = objectMapper.writeValueAsBytes(attributes);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
}
- writeHierarchy(configuredObject, conn);
+ insertStmt.execute();
}
+ finally
+ {
+ insertStmt.close();
+ }
+ writeHierarchy(configuredObject, conn);
}
- finally
- {
- rs.close();
- }
- }
- catch (JsonMappingException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (JsonGenerationException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
- }
- catch (IOException e)
- {
- throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
}
finally
{
- stmt.close();
+ rs.close();
}
-
+ }
+ catch (JsonMappingException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (JsonGenerationException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ catch (IOException e)
+ {
+ throw new StoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ finally
+ {
+ stmt.close();
+ }
}
private void writeHierarchy(final ConfiguredObjectRecord configuredObject, final Connection conn) throws SQLException, StoreException
@@ -2450,18 +2461,27 @@ abstract public class AbstractJDBCMessag
@Override
public void onDelete()
{
+ // TODO should probably check we are closed
try
{
Connection conn = newAutoCommitConnection();
try
{
- for (String tableName : ALL_TABLES)
+ List<String> tables = new ArrayList<String>();
+ tables.addAll(CONFIGURATION_STORE_TABLE_NAMES);
+ tables.addAll(MESSAGE_STORE_TABLE_NAMES);
+
+ for (String tableName : tables)
{
Statement stmt = conn.createStatement();
try
{
stmt.execute("DROP TABLE " + tableName);
}
+ catch(SQLException e)
+ {
+ getLogger().warn("Failed to drop table '" + tableName + "' :" + e);
+ }
finally
{
stmt.close();
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java Thu Apr 3 19:58:53 2014
@@ -20,17 +20,14 @@
*/
package org.apache.qpid.server.store;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.VirtualHost;
/** A simple message store that stores the messages in a thread-safe structure in memory. */
abstract public class AbstractMemoryMessageStore extends NullMessageStore
{
private final AtomicLong _messageId = new AtomicLong(1);
- private final AtomicBoolean _closed = new AtomicBoolean(false);
private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
{
@@ -71,43 +68,8 @@ abstract public class AbstractMemoryMess
}
};
- private final StateManager _stateManager;
private final EventManager _eventManager = new EventManager();
- public AbstractMemoryMessageStore()
- {
- _stateManager = new StateManager(_eventManager);
- }
-
- @Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
- {
- _stateManager.attainState(State.INITIALISING);
- }
-
- @Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
- {
- if(_stateManager.isInState(State.INITIAL))
- {
- _stateManager.attainState(State.INITIALISING);
- }
- _stateManager.attainState(State.INITIALISED);
- }
-
- @Override
- public void activate()
- {
-
- if(_stateManager.isInState(State.INITIALISING))
- {
- _stateManager.attainState(State.INITIALISED);
- }
- _stateManager.attainState(State.ACTIVATING);
-
- _stateManager.attainState(State.ACTIVE);
- }
@Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
@@ -131,16 +93,6 @@ abstract public class AbstractMemoryMess
}
@Override
- public void close()
- {
- if (_closed.compareAndSet(false, true))
- {
- _stateManager.attainState(State.CLOSING);
- _stateManager.attainState(State.CLOSED);
- }
- }
-
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectDependency.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectDependency.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectDependency.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectDependency.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+
+public interface ConfiguredObjectDependency<C extends ConfiguredObject<C>>
+{
+ Class<C> getCategoryClass();
+ void resolve(C object);
+}
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectIdDependency.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectIdDependency.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectIdDependency.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectIdDependency.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+
+import java.util.UUID;
+
+public interface ConfiguredObjectIdDependency<C extends ConfiguredObject<C>> extends ConfiguredObjectDependency<C>
+{
+ UUID getId();
+}
Added: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectNameDependency.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectNameDependency.java?rev=1584365&view=auto
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectNameDependency.java (added)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfiguredObjectNameDependency.java Thu Apr 3 19:58:53 2014
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+
+public interface ConfiguredObjectNameDependency<C extends ConfiguredObject<C>> extends ConfiguredObjectDependency<C>
+{
+ String getName();
+}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationRecoverer.java Thu Apr 3 19:58:53 2014
@@ -27,6 +27,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
@@ -74,6 +75,7 @@ public class DurableConfigurationRecover
_store = store;
_upgrader = _upgraderProvider.getUpgrader(configVersion, this);
+ _eventLogger.message(_logSubject, ConfigStoreMessages.RECOVERY_START());
}
@Override
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Thu Apr 3 19:58:53 2014
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.model.ConfiguredObject;
+
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.VirtualHost;
-
public interface DurableConfigurationStore
{
+ String STORE_TYPE = "storeType";
+ String STORE_PATH = "storePath";
+ String IS_MESSAGE_STORE_TOO = "isMessageStoreToo";
+
public static interface Source
{
@@ -37,15 +41,16 @@ public interface DurableConfigurationSto
* Called after instantiation in order to configure the message store. A particular implementation can define
* whatever parameters it wants.
*
- *
- *
- *
- *
- * @param virtualHost
- * @param recoveryHandler Handler to be called as the store recovers on start up
+ * @param parent
+ * @param storeSettings store settings
*/
- void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler);
+ void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) throws StoreException;
+ /**
+ * Recovers configuration from the store using given recovery handler
+ * @param recoveryHandler recovery handler
+ */
+ void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler) throws StoreException;
/**
* Makes the specified object persistent.
@@ -78,6 +83,6 @@ public interface DurableConfigurationSto
*/
void update(boolean createIfNecessary, ConfiguredObjectRecord... records) throws StoreException;
+ void closeConfigurationStore() throws StoreException;
- void close() throws Exception;
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Event.java Thu Apr 3 19:58:53 2014
@@ -21,22 +21,6 @@ package org.apache.qpid.server.store;
public enum Event
{
- BEFORE_INIT,
- AFTER_INIT,
-
- BEFORE_ACTIVATE,
- AFTER_ACTIVATE,
-
- BEFORE_PASSIVATE,
- AFTER_PASSIVATE,
-
- BEFORE_CLOSE,
- AFTER_CLOSE,
-
- BEFORE_QUIESCE,
- AFTER_QUIESCE,
- BEFORE_RESTART,
-
PERSISTENT_MESSAGE_SIZE_OVERFULL,
PERSISTENT_MESSAGE_SIZE_UNDERFULL
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStore.java Thu Apr 3 19:58:53 2014
@@ -89,12 +89,16 @@ public class JsonFileConfigStore impleme
}
@Override
- public void configureConfigStore(final VirtualHost virtualHost, final ConfigurationRecoveryHandler recoveryHandler)
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
{
- _name = virtualHost.getName();
-
- setup(virtualHost);
+ _name = parent.getName();
+ setup(storeSettings);
load();
+ }
+
+ @Override
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
+ {
recoveryHandler.beginConfigurationRecovery(this,_configVersion);
List<ConfiguredObjectRecord> records = new ArrayList<ConfiguredObjectRecord>(_objectsById.values());
for(ConfiguredObjectRecord record : records)
@@ -109,9 +113,9 @@ public class JsonFileConfigStore impleme
}
}
- protected void setup(final VirtualHost virtualHost)
+ private void setup(final Map<String, Object> configurationStoreSettings)
{
- Object storePathAttr = virtualHost.getAttribute(VirtualHost.CONFIG_STORE_PATH);
+ Object storePathAttr = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
if(!(storePathAttr instanceof String))
{
throw new StoreException("Cannot determine path for configuration storage");
@@ -533,12 +537,17 @@ public class JsonFileConfigStore impleme
save();
}
- public void close() throws Exception
+ @Override
+ public void closeConfigurationStore()
{
try
{
releaseFileLock();
}
+ catch (IOException e)
+ {
+ throw new StoreException("Failed to release lock", e);
+ }
finally
{
_fileLock = null;
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/JsonFileConfigStoreFactory.java Thu Apr 3 19:58:53 2014
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.util.Map;
+
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.plugin.DurableConfigurationStoreFactory;
@@ -41,10 +42,13 @@ public class JsonFileConfigStoreFactory
@Override
public void validateAttributes(Map<String, Object> attributes)
{
- Object storePath = attributes.get(VirtualHost.CONFIG_STORE_PATH);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> configurationStoreSettings = (Map<String, Object>) attributes.get(VirtualHost.CONFIGURATION_STORE_SETTINGS);
+
+ Object storePath = configurationStoreSettings.get(DurableConfigurationStore.STORE_PATH);
if(!(storePath instanceof String))
{
- throw new IllegalArgumentException("Attribute '"+ VirtualHost.CONFIG_STORE_PATH
+ throw new IllegalArgumentException("Setting '"+ DurableConfigurationStore.STORE_PATH
+"' is required and must be of type String.");
}
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java Thu Apr 3 19:58:53 2014
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.server.model.VirtualHost;
+import java.util.Map;
+
+import org.apache.qpid.server.model.ConfiguredObject;
/**
* MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
@@ -28,22 +30,25 @@ import org.apache.qpid.server.model.Virt
*/
public interface MessageStore
{
+ String STORE_TYPE = "storeType";
+ String STORE_PATH = "storePath";
+ String UNDERFULL_SIZE = "storeUnderfullSize";
+ String OVERFULL_SIZE = "storeOverfullSize";
+
/**
- * Called after instantiation in order to configure the message store. A particular implementation can define
+ * Called after instantiation in order to open and initialize the message store. A particular implementation can define
* whatever parameters it wants.
- *
- *
- *
- *
- * @param virtualHost
- * @param messageRecoveryHandler Handler to be called as the store recovers on start up
- * @param tlogRecoveryHandler
- * @throws Exception If any error occurs that means the store is unable to configure itself.
+ * @param parent virtual host name
+ * @param messageStoreSettings store settings
*/
- void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler messageRecoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler);
+ void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings);
- void activate();
+ /**
+ * Called after opening to recover messages and transactions with given recovery handlers
+ * @param messageRecoveryHandler
+ * @param transactionLogRecoveryHandler
+ */
+ void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler);
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
@@ -59,15 +64,14 @@ public interface MessageStore
/**
* Called to close and cleanup any resources used by the message store.
- *
- * @throws Exception If the close fails.
*/
- void close();
+ void closeMessageStore();
void addEventListener(EventListener eventListener, Event... events);
String getStoreLocation();
+ // TODO dead method - remove??
String getStoreType();
void onDelete();
Modified: qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1584365&r1=1584364&r2=1584365&view=diff
==============================================================================
--- qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/java-broker-config-store-changes/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Thu Apr 3 19:58:53 2014
@@ -19,14 +19,21 @@
*/
package org.apache.qpid.server.store;
+import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.ConfiguredObject;
public abstract class NullMessageStore implements MessageStore, DurableConfigurationStore
{
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
@Override
- public void configureConfigStore(VirtualHost virtualHost, ConfigurationRecoveryHandler recoveryHandler)
+ public void recoverConfigurationStore(ConfigurationRecoveryHandler recoveryHandler)
{
}
@@ -52,13 +59,17 @@ public abstract class NullMessageStore i
}
@Override
- public void configureMessageStore(VirtualHost virtualHost, MessageStoreRecoveryHandler recoveryHandler,
- TransactionLogRecoveryHandler tlogRecoveryHandler)
+ public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
}
@Override
- public void close()
+ public void closeMessageStore()
+ {
+ }
+
+ @Override
+ public void closeConfigurationStore()
{
}
@@ -81,7 +92,7 @@ public abstract class NullMessageStore i
}
@Override
- public void activate()
+ public void recoverMessageStore(MessageStoreRecoveryHandler messageRecoveryHandler, TransactionLogRecoveryHandler transactionLogRecoveryHandler)
{
}
@@ -100,4 +111,5 @@ public abstract class NullMessageStore i
public void onDelete()
{
}
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org