You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/03/21 15:30:46 UTC
svn commit: r1787994 [1/2] - in /qpid/java/trunk: ./
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/
bdbstore/src/main/java/org/apache/qpid/server/virtualhost/b...
Author: orudyy
Date: Tue Mar 21 15:30:44 2017
New Revision: 1787994
URL: http://svn.apache.org/viewvc?rev=1787994&view=rev
Log:
QPID-7663: LinkStore infrastructure and BDB store backend
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
- copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/
qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
- copied, changed from r1787932, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
- copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
- copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
Modified:
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/TerminusDurability.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
qpid/java/trunk/broker/pom.xml
qpid/java/trunk/pom.xml
qpid/java/trunk/systests/pom.xml
qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml
qpid/java/trunk/test-profiles/JavaBDBExcludes
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java Tue Mar 21 15:30:44 2017
@@ -52,4 +52,6 @@ public interface BDBEnvironmentContainer
@ManagedOperation(description = "Get the BDB database statistics", nonModifying = true,
changesConfiguredObjectState = false)
Map<String, Object> databaseStatistics(@Param(name="database", description = "database table for which to retrieve statistics", mandatory = true)String database, @Param(name="reset", defaultValue = "false", description = "If true, reset the statistics") boolean reset);
+
+ EnvironmentFacade getEnvironmentFacade();
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -62,7 +62,8 @@ public interface EnvironmentFacade
void upgradeIfNecessary(ConfiguredObject<?> parent);
Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
- Database clearDatabase(String databaseName, DatabaseConfig databaseConfig);
+
+ Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig);
Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
@@ -97,4 +98,6 @@ public interface EnvironmentFacade
Map<String, Object> getTransactionStatistics(boolean reset);
Map<String,Object> getDatabaseStatistics(String database, boolean reset);
+
+ void deleteDatabase(String databaseName);
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -310,6 +310,13 @@ public class StandardEnvironmentFacade i
}
@Override
+ public void deleteDatabase(final String databaseName)
+ {
+ closeDatabase(databaseName);
+ getEnvironment().removeDatabase(null, databaseName);
+ }
+
+ @Override
public Map<String, Object> getTransactionStatistics(boolean reset)
{
return EnvironmentUtils.getTransactionStatistics(getEnvironment(), reset);
@@ -466,11 +473,11 @@ public class StandardEnvironmentFacade i
@Override
- public Database clearDatabase(String name, DatabaseConfig databaseConfig)
+ public Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig)
{
- closeDatabase(name);
- getEnvironment().removeDatabase(null, name);
- return openDatabase(name, databaseConfig);
+ closeDatabase(databaseName);
+ getEnvironment().removeDatabase(txn, databaseName);
+ return getEnvironment().openDatabase(txn, databaseName, databaseConfig);
}
@Override
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -628,13 +628,12 @@ public class ReplicatedEnvironmentFacade
return cachedHandle;
}
-
@Override
- public Database clearDatabase(String name, DatabaseConfig databaseConfig)
+ public Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig)
{
- closeDatabase(name);
- getEnvironment().removeDatabase(null, name);
- return openDatabase(name, databaseConfig);
+ closeDatabase(databaseName);
+ getEnvironment().removeDatabase(txn, databaseName);
+ return getEnvironment().openDatabase(txn, databaseName, databaseConfig);
}
@Override
@@ -883,6 +882,13 @@ public class ReplicatedEnvironmentFacade
}
+ @Override
+ public void deleteDatabase(final String databaseName)
+ {
+ closeDatabase(databaseName);
+ getEnvironment().removeDatabase(null, databaseName);
+ }
+
private <T> T submitEnvironmentTask(final int timeout, final Callable<T> task, String action)
{
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java Tue Mar 21 15:30:44 2017
@@ -182,4 +182,15 @@ public class BDBVirtualHostImpl extends
}
return Collections.emptyMap();
}
+
+ @Override
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ BDBMessageStore bdbMessageStore = (BDBMessageStore) getMessageStore();
+ if (bdbMessageStore != null)
+ {
+ return bdbMessageStore.getEnvironmentFacade();
+ }
+ return null;
+ }
}
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Mar 21 15:30:44 2017
@@ -1461,6 +1461,13 @@ public class BDBHAVirtualHostNodeImpl ex
return Collections.emptyMap();
}
}
+
+ @Override
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ return _environmentFacade.get();
+ }
+
public static Map<String, Collection<String>> getSupportedChildTypes()
{
return Collections.singletonMap(VirtualHost.class.getSimpleName(), (Collection<String>) Collections.singleton(BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE));
Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java Tue Mar 21 15:30:44 2017
@@ -185,6 +185,17 @@ public class BDBVirtualHostNodeImpl exte
}
@Override
+ public EnvironmentFacade getEnvironmentFacade()
+ {
+ BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) getConfigurationStore();
+ if (bdbConfigurationStore != null)
+ {
+ return bdbConfigurationStore.getEnvironmentFacade();
+ }
+ return null;
+ }
+
+ @Override
public PreferenceStore getPreferenceStore()
{
return ((BDBConfigurationStore) getConfigurationStore()).getPreferenceStore();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Mar 21 15:30:44 2017
@@ -178,7 +178,7 @@ public abstract class AbstractVirtualHos
private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private volatile LinkRegistry _linkRegistry;
+ private volatile LinkRegistryModel _linkRegistry;
private AtomicBoolean _blocked = new AtomicBoolean();
private final Map<String, MessageDestination> _systemNodeDestinations =
@@ -595,7 +595,16 @@ public abstract class AbstractVirtualHos
PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
_preferenceStore = preferencesRoot.createPreferenceStore();
- Iterator<LinkRegistryFactory> linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
+ _linkRegistry = createLinkRegistry();
+
+ createHousekeepingExecutor();
+ }
+
+ LinkRegistryModel createLinkRegistry()
+ {
+ LinkRegistryModel linkRegistry;
+ Iterator<LinkRegistryFactory>
+ linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
if (linkRegistryFactories.hasNext())
{
final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
@@ -603,14 +612,13 @@ public abstract class AbstractVirtualHos
{
throw new RuntimeException("Found multiple implementations of LinkRegistry");
}
- _linkRegistry = linkRegistryFactory.create(this);
+ linkRegistry = linkRegistryFactory.create(this);
}
else
{
- _linkRegistry = null;
+ linkRegistry = null;
}
-
- createHousekeepingExecutor();
+ return linkRegistry;
}
private void createHousekeepingExecutor()
@@ -2113,6 +2121,10 @@ public abstract class AbstractVirtualHos
{
shutdownHouseKeeping();
closeNetworkConnectionScheduler();
+ if (_linkRegistry != null)
+ {
+ _linkRegistry.close();
+ }
closeMessageStore();
stopPreferenceTaskExecutor();
closePreferenceStore();
@@ -2165,6 +2177,10 @@ public abstract class AbstractVirtualHos
@Override
public void run()
{
+ if (_linkRegistry != null)
+ {
+ _linkRegistry.delete();
+ }
MessageStore ms = getMessageStore();
if (ms != null)
{
@@ -2513,6 +2529,11 @@ public abstract class AbstractVirtualHos
messageStore.upgradeStoreStructure();
+ if (_linkRegistry != null)
+ {
+ _linkRegistry.open();
+ }
+
getBroker().assignTargetSizes();
final PreferenceStoreUpdater updater = new PreferenceStoreUpdaterImpl();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java Tue Mar 21 15:30:44 2017
@@ -24,5 +24,5 @@ import org.apache.qpid.server.plugin.Plu
public interface LinkRegistryFactory extends Pluggable
{
- LinkRegistry create(NamedAddressSpace addressSpace);
+ LinkRegistryModel create(NamedAddressSpace addressSpace);
}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java (from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java Tue Mar 21 15:30:44 2017
@@ -22,8 +22,12 @@ package org.apache.qpid.server.virtualho
import org.apache.qpid.server.protocol.LinkModel;
-public interface LinkRegistry
+public interface LinkRegistryModel
{
<T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
<T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+
+ void open();
+ void close();
+ void delete();
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml Tue Mar 21 15:30:44 2017
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-java-build</artifactId>
+ <version>7.0.0-SNAPSHOT</version>
+ <relativePath>../..</relativePath>
+ </parent>
+
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-codegen</artifactId>
+ <version>${project.version}</version>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-bdbstore</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+</project>
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.StringBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseNotFoundException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ModelVersion;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+public class BDBLinkStore implements LinkStore
+{
+ private static final Logger LOGGER = LoggerFactory.getLogger(BDBLinkStore.class);
+ private static final String LINKS_DB_NAME = "AMQP_1_0_LINKS";
+ private static final String LINKS_VERSION_DB_NAME = "AMQP_1_0_LINKS_VERSION";
+
+ private volatile StoreState _storeState = StoreState.CLOSED;
+ private final ReentrantReadWriteLock _useOrCloseRWLock = new ReentrantReadWriteLock(true);
+ private final EnvironmentFacade _environmentFacade;
+
+ BDBLinkStore(final EnvironmentFacade facade)
+ {
+ _environmentFacade = facade;
+ }
+
+ @Override
+ public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater updater) throws StoreException
+ {
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ Collection<LinkDefinition> links = getLinkDefinitions(updater);
+ _storeState = StoreState.OPENED;
+ return links;
+ }
+ catch (RuntimeException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Failed recovery of links", e);
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void saveLink(final LinkDefinition link)
+ {
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ if (_storeState != StoreState.OPENED)
+ {
+ throw new StoreException("Store is not opened");
+ }
+
+ Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ save(linksDatabase, null, link);
+ }
+ catch (RuntimeException e)
+ {
+ throw _environmentFacade.handleDatabaseException(String.format("Failed saving of link '%s'", new LinkKey(link)), e);
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void deleteLink(final LinkDefinition linkDefinition)
+ {
+ LinkKey linkKey = new LinkKey(linkDefinition);
+ _useOrCloseRWLock.readLock().lock();
+ try
+ {
+ if (_storeState != StoreState.OPENED)
+ {
+ throw new StoreException("Store is not opened");
+ }
+
+ Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+
+ final DatabaseEntry databaseEntry = new DatabaseEntry();
+ LinkKeyEntryBinding.getInstance().objectToEntry(linkKey, databaseEntry);
+ OperationStatus status = linksDatabase.delete(null, databaseEntry);
+ if (status != OperationStatus.SUCCESS)
+ {
+ LOGGER.debug(String.format("Unexpected status '%s' for deletion of '%s'", status, linkKey));
+ }
+ }
+ catch (RuntimeException e)
+ {
+ throw _environmentFacade.handleDatabaseException(String.format("Failed deletion of link '%s'", linkKey), e);
+ }
+ finally
+ {
+ _useOrCloseRWLock.readLock().unlock();
+ }
+ }
+
+
+ @Override
+ public void close()
+ {
+ _useOrCloseRWLock.writeLock().lock();
+ try
+ {
+ _storeState = StoreState.CLOSED;
+ }
+ finally
+ {
+ _useOrCloseRWLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void delete()
+ {
+ _useOrCloseRWLock.writeLock().lock();
+ try
+ {
+ close();
+ _environmentFacade.deleteDatabase(LINKS_DB_NAME);
+ _environmentFacade.deleteDatabase(LINKS_VERSION_DB_NAME);
+ }
+ catch (RuntimeException e)
+ {
+ _environmentFacade.handleDatabaseException("Failed deletion of database", e);
+ LOGGER.info("Failed to delete links database", e);
+ }
+ finally
+ {
+ _useOrCloseRWLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public TerminusDurability getHighestSupportedTerminusDurability()
+ {
+ return TerminusDurability.CONFIGURATION;
+ }
+
+ private Collection<LinkDefinition> getLinkDefinitions(final LinkStoreUpdater updater)
+ {
+ Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ Collection<LinkDefinition> links = new HashSet<>();
+
+ ModelVersion currentVersion =
+ new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, BrokerModel.MODEL_MINOR_VERSION);
+ ModelVersion storedVersion = getStoredVersion();
+ if (currentVersion.lessThan(storedVersion))
+ {
+ throw new StoreException(String.format("Cannot downgrade preference store from '%s' to '%s'", storedVersion, currentVersion));
+ }
+
+ try (Cursor cursor = linksDatabase.openCursor(null, null))
+ {
+ final DatabaseEntry key = new DatabaseEntry();
+ final DatabaseEntry value = new DatabaseEntry();
+ LinkKeyEntryBinding keyEntryBinding = LinkKeyEntryBinding.getInstance();
+ LinkValueEntryBinding linkValueEntryBinding = LinkValueEntryBinding.getInstance();
+ while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+ {
+ LinkKey linkKey = keyEntryBinding.entryToObject(key);
+ LinkValue linkValue = linkValueEntryBinding.entryToObject(value);
+ LinkDefinition link = new LinkDefinitionImpl(linkKey.getRemoteContainerId(), linkKey.getLinkName(), linkKey.getRole(), linkValue.getSource(), linkValue.getTarget());
+ links.add(link);
+ }
+ }
+
+ if (storedVersion.lessThan(currentVersion))
+ {
+ links = updater.update(storedVersion.toString(), links);
+ final Transaction txn = _environmentFacade.beginTransaction(null);
+ try
+ {
+ linksDatabase = _environmentFacade.clearDatabase(txn, LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ for (LinkDefinition link : links)
+ {
+ save(linksDatabase, txn, link);
+ }
+ txn.commit();
+ linksDatabase.close();
+ }
+ catch (Exception e)
+ {
+ txn.abort();
+ throw e;
+ }
+ }
+
+ return links;
+ }
+
+ private void save(Database database, Transaction txn, final LinkDefinition link)
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ LinkKey linkKey = new LinkKey(link);
+ LinkKeyEntryBinding.getInstance().objectToEntry(linkKey, key);
+ LinkValueEntryBinding.getInstance().objectToEntry(new LinkValue(link), value);
+
+ OperationStatus status = database.put(txn, key, value); // TODO: create transaction
+ if (status != OperationStatus.SUCCESS)
+ {
+ throw new StoreException(String.format("Cannot save link %s", linkKey));
+ }
+ }
+
+ private ModelVersion getStoredVersion() throws RuntimeException
+ {
+ try(Cursor cursor = getLinksVersionDb().openCursor(null, null))
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ ModelVersion storedVersion = null;
+ while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+ {
+ String versionString = StringBinding.entryToString(key);
+ ModelVersion version = ModelVersion.fromString(versionString);
+ if (storedVersion == null || storedVersion.lessThan(version))
+ {
+ storedVersion = version;
+ }
+ }
+ if (storedVersion == null)
+ {
+ throw new StoreException("No link version information.");
+ }
+ return storedVersion;
+ }
+ catch (RuntimeException e)
+ {
+ throw _environmentFacade.handleDatabaseException("Cannot visit link version", e);
+ }
+ }
+
+ private Database getLinksVersionDb()
+ {
+ Database linksVersionDb;
+ try
+ {
+ DatabaseConfig config = new DatabaseConfig().setTransactional(true).setAllowCreate(false);
+ linksVersionDb = _environmentFacade.openDatabase(LINKS_VERSION_DB_NAME, config);
+ }
+ catch (DatabaseNotFoundException e)
+ {
+ linksVersionDb = _environmentFacade.openDatabase(LINKS_VERSION_DB_NAME, DEFAULT_DATABASE_CONFIG);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ StringBinding.stringToEntry(BrokerModel.MODEL_VERSION, key);
+ LongBinding.longToEntry(System.currentTimeMillis(), value);
+ linksVersionDb.put(null, key, value);
+ }
+
+ return linksVersionDb;
+ }
+
+ enum StoreState
+ {
+ CLOSED, OPENED
+ }
+}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+@PluggableService
+public class BDBLinkStoreFactory implements LinkStoreFactory
+{
+ private static final String TYPE = "BDB";
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public LinkStore create(final NamedAddressSpace addressSpace)
+ {
+ VirtualHost<?> virtualHost = (VirtualHost<?>)addressSpace;
+ EnvironmentFacade facade;
+ if (virtualHost instanceof BDBEnvironmentContainer)
+ {
+ facade = ((BDBEnvironmentContainer)virtualHost).getEnvironmentFacade();
+ }
+ else if (virtualHost.getParent() instanceof BDBEnvironmentContainer)
+ {
+ facade = ((BDBEnvironmentContainer)virtualHost.getParent()).getEnvironmentFacade();
+ }
+ else
+ {
+ throw new StoreException("Cannot create BDB Link Store for " + addressSpace);
+ }
+
+ if (facade == null)
+ {
+ throw new StoreException("Cannot find BDB environment for " + addressSpace);
+ }
+ return new BDBLinkStore(facade);
+ }
+
+ @Override
+ public boolean supports(final NamedAddressSpace addressSpace)
+ {
+ if (addressSpace instanceof VirtualHost)
+ {
+ if (addressSpace instanceof BDBEnvironmentContainer)
+ {
+ return true;
+ }
+ else if (((VirtualHost) addressSpace).getParent() instanceof BDBEnvironmentContainer)
+ {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public int getPriority()
+ {
+ return 100;
+ }
+}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkKeyEntryBinding extends TupleBinding<LinkKey>
+{
+ private static final LinkKeyEntryBinding INSTANCE = new LinkKeyEntryBinding();
+
+ private LinkKeyEntryBinding()
+ {
+
+ }
+
+ @Override
+ public LinkKey entryToObject(final TupleInput input)
+ {
+ String remoteContainerId = input.readString();
+ String linkName = input.readString();
+ Role role = Role.valueOf(input.readBoolean());
+
+ final String remoteContainerId1 = remoteContainerId;
+ final String linkName1 = linkName;
+ final Role role1 = role;
+ return new LinkKey(remoteContainerId1, linkName1, role1);
+ }
+
+ @Override
+ public void objectToEntry(final LinkKey linkKey, final TupleOutput output)
+ {
+ output.writeString(linkKey.getRemoteContainerId());
+ output.writeString(linkKey.getLinkName());
+ output.writeBoolean(linkKey.getRole().getValue());
+ }
+
+ public static LinkKeyEntryBinding getInstance()
+ {
+ return INSTANCE;
+ }
+}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+
+public class LinkValue
+{
+ static final byte CURRENT_VERSION = 0;
+ private final BaseSource _source;
+ private final BaseTarget _target;
+ private final byte _version;
+
+ public LinkValue(final BaseSource source, final BaseTarget target, final byte version)
+ {
+ _source = source;
+ _target = target;
+ _version = version;
+ }
+
+ public LinkValue(final LinkDefinition link)
+ {
+ this(link.getSource(), link.getTarget(), CURRENT_VERSION);
+ }
+
+ public BaseSource getSource()
+ {
+ return _source;
+ }
+
+ public BaseTarget getTarget()
+ {
+ return _target;
+ }
+
+ public byte getVersion()
+ {
+ return _version;
+ }
+}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.store.StoreException;
+
+public class LinkValueEntryBinding extends TupleBinding<LinkValue>
+{
+ private static final LinkValueEntryBinding INSTANCE = new LinkValueEntryBinding();
+
+ private AMQPDescribedTypeRegistry _describedTypeRegistry =
+ AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
+
+ private LinkValueEntryBinding()
+ {
+ }
+
+ @Override
+ public LinkValue entryToObject(final TupleInput input)
+ {
+ byte version = input.readByte();
+ Object source = read(input);
+ if (!(source instanceof Source))
+ {
+ throw new StoreException(String.format("Unexpected object '%s' stored in the store where Source is expected",
+ source.getClass()));
+ }
+
+ Object target = read(input);
+ if (!(target instanceof Target))
+ {
+ throw new StoreException(String.format("Unexpected object '%s' stored in the store where Target is expected",
+ target.getClass()));
+ }
+
+ return new LinkValue((Source) source, (Target)target, version);
+ }
+
+
+ @Override
+ public void objectToEntry(final LinkValue linkValue, final TupleOutput output)
+ {
+ output.writeByte(linkValue.getVersion());
+ write(linkValue.getSource(), output);
+ write(linkValue.getTarget(), output);
+ }
+
+ public static LinkValueEntryBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ private Object read(final TupleInput input)
+ {
+ int size = input.readInt();
+ byte[] bytes = new byte[size];
+ input.read(bytes);
+
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
+ ValueHandler valueHandler = new ValueHandler(_describedTypeRegistry);
+ Object object;
+ try
+ {
+ object = valueHandler.parse(qpidByteBuffer);
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new StoreException("Unexpected serialized data", e);
+ }
+ finally
+ {
+ qpidByteBuffer.dispose();
+ }
+ return object;
+ }
+
+ private void write(final Object object, final TupleOutput output)
+ {
+ ValueWriter valueWriter = _describedTypeRegistry.getValueWriter(object);
+ int encodedSize = valueWriter.getEncodedSize();
+ QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
+ valueWriter.writeToBuffer(qpidByteBuffer);
+
+ output.writeInt(encodedSize);
+ output.write(qpidByteBuffer.array());
+ qpidByteBuffer.dispose();
+ }
+}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+
+import com.google.common.io.Files;
+import com.sleepycat.je.CacheMode;
+
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreTestCase;
+import org.apache.qpid.server.store.berkeleydb.StandardEnvironmentConfiguration;
+import org.apache.qpid.server.store.berkeleydb.StandardEnvironmentFacade;
+import org.apache.qpid.server.util.FileUtils;
+
+public class BDBLinkStoreTest extends LinkStoreTestCase
+{
+ private StandardEnvironmentFacade _facade;
+ private File _storeFolder;
+
+ @Override
+ protected LinkStore createLinkStore()
+ {
+ _storeFolder = Files.createTempDir();
+ StandardEnvironmentConfiguration configuration = mock(StandardEnvironmentConfiguration.class);
+ when(configuration.getName()).thenReturn("test");
+ when(configuration.getStorePath()).thenReturn(_storeFolder.getAbsolutePath());
+ when(configuration.getCacheMode()).thenReturn(CacheMode.DEFAULT);
+ when(configuration.getParameters()).thenReturn(Collections.emptyMap());
+ _facade = new StandardEnvironmentFacade(configuration);
+
+ return new BDBLinkStore(_facade);
+ }
+
+ @Override
+ protected void deleteLinkStore()
+ {
+ _facade.close();
+ FileUtils.delete(_storeFolder, true);
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml Tue Mar 21 15:30:44 2017
@@ -62,6 +62,20 @@
</dependencies>
<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <!--version specified in parent pluginManagement -->
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
</build>
</project>
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Mar 21 15:30:44 2017
@@ -69,6 +69,8 @@ public interface AMQPConnection_1_0<C ex
boolean isClosed();
+ boolean isClosing();
+
void close(Error error);
Iterator<IdentifiedTransaction> getOpenTransactions();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Tue Mar 21 15:30:44 2017
@@ -349,16 +349,17 @@ public class AMQPConnection_1_0Impl exte
assertState(FrameReceivingState.ANY_FRAME);
_frameReceivingState = FrameReceivingState.CLOSED;
setClosedForInput(true);
- closeReceived();
switch (_connectionState)
{
case UNOPENED:
case AWAITING_OPEN:
+ closeReceived();
closeConnection(ConnectionError.CONNECTION_FORCED,
"Connection close sent before connection was opened");
break;
case OPEN:
_connectionState = ConnectionState.CLOSE_RECEIVED;
+ closeReceived();
if(close.getError() != null)
{
final Error error = close.getError();
@@ -373,10 +374,12 @@ public class AMQPConnection_1_0Impl exte
addCloseTicker();
break;
case CLOSE_SENT:
+ closeReceived();
_connectionState = ConnectionState.CLOSED;
_orderlyClose.set(true);
default:
+ closeReceived();
}
_remoteError = close.getError();
}
@@ -444,6 +447,14 @@ public class AMQPConnection_1_0Impl exte
|| _connectionState == ConnectionState.CLOSE_RECEIVED;
}
+ @Override
+ public boolean isClosing()
+ {
+ return _connectionState == ConnectionState.CLOSED
+ || _connectionState == ConnectionState.CLOSE_RECEIVED
+ || _connectionState == ConnectionState.CLOSE_SENT;
+ }
+
public boolean closedForInput()
{
return _closedForInput;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Mar 21 15:30:44 2017
@@ -279,7 +279,7 @@ class ConsumerTarget_1_0 extends Abstrac
getEndpoint().setSource(null);
getEndpoint().close();
- final LinkRegistry linkReg = getSession().getConnection()
+ final LinkRegistryModel linkReg = getSession().getConnection()
.getAddressSpace()
.getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
linkReg.unregisterSendingLink(getEndpoint().getName());
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java (from r1787932, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java Tue Mar 21 15:30:44 2017
@@ -20,21 +20,14 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import com.google.common.util.concurrent.ListenableFuture;
-
import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-public interface Link_1_0 extends LinkModel
+public interface LinkDefinition extends LinkModel
{
- ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
-
- void linkClosed();
-
- void discardEndpoint();
+ String getRemoteContainerId();
String getName();
@@ -43,10 +36,4 @@ public interface Link_1_0 extends LinkMo
BaseSource getSource();
BaseTarget getTarget();
-
- void setSource(BaseSource source);
-
- void setTarget(BaseTarget target);
-
- void setTermini(BaseSource source, BaseTarget target);
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkDefinitionImpl implements LinkDefinition
+{
+ private final String _remoteContainerId;
+ private final String _name;
+ private final Role _role;
+ private final BaseSource _source;
+ private final BaseTarget _target;
+
+ public LinkDefinitionImpl(final String remoteContainerId,
+ final String name,
+ final Role role,
+ final BaseSource source,
+ final BaseTarget target)
+ {
+ _remoteContainerId = remoteContainerId;
+ _name = name;
+ _role = role;
+ _source = source;
+ _target = target;
+ }
+
+ @Override
+ public String getRemoteContainerId()
+ {
+ return _remoteContainerId;
+ }
+
+ @Override
+ public String getName()
+ {
+ return _name;
+ }
+
+ @Override
+ public Role getRole()
+ {
+ return _role;
+ }
+
+ @Override
+ public BaseSource getSource()
+ {
+ return _source;
+ }
+
+ @Override
+ public BaseTarget getTarget()
+ {
+ return _target;
+ }
+}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Tue Mar 21 15:30:44 2017
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -43,17 +44,27 @@ public class LinkImpl implements Link_1_
{
private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
+ private final String _remoteContainerId;
private final String _linkName;
-
private final Role _role;
+ private final LinkRegistry _linkRegistry;
private volatile LinkEndpoint _linkEndpoint;
private volatile BaseSource _source;
private volatile BaseTarget _target;
- LinkImpl(final String linkName, final Role role)
+
+ public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry)
{
+ _remoteContainerId = remoteContainerId;
_linkName = linkName;
_role = role;
+ _linkRegistry = linkRegistry;
+ }
+
+ public LinkImpl(final LinkDefinition linkDefinition, final LinkRegistry linkRegistry)
+ {
+ this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry);
+ setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
}
@Override
@@ -85,12 +96,14 @@ public class LinkImpl implements Link_1_
}
_linkEndpoint.receiveAttach(attach);
+ _linkRegistry.linkChanged(this);
return Futures.immediateFuture((LinkEndpoint) _linkEndpoint);
}
}
- catch (Throwable t)
+ catch (Exception e)
{
- return rejectLink(session, t);
+ LOGGER.debug("Error attaching link", e);
+ return rejectLink(session, e);
}
}
@@ -163,6 +176,7 @@ public class LinkImpl implements Link_1_
public void linkClosed()
{
discardEndpoint();
+ _linkRegistry.linkClosed(this);
}
@Override
@@ -192,7 +206,7 @@ public class LinkImpl implements Link_1_
@Override
public void setSource(BaseSource source)
{
- _source = source;
+ setTermini(source, _target);
}
@Override
@@ -204,7 +218,7 @@ public class LinkImpl implements Link_1_
@Override
public void setTarget(BaseTarget target)
{
- _target = target;
+ setTermini(_source, target);
}
@Override
@@ -213,4 +227,16 @@ public class LinkImpl implements Link_1_
_source = source;
_target = target;
}
+
+ @Override
+ public TerminusDurability getHighestSupportedTerminusDurability()
+ {
+ return _linkRegistry.getHighestSupportedTerminusDurability();
+ }
+
+ @Override
+ public String getRemoteContainerId()
+ {
+ return _remoteContainerId;
+ }
}
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkKey
+{
+ private final String _linkName;
+ private final String _remoteContainerId;
+ private final Role _role;
+
+ public LinkKey(final String remoteContainerId, final String linkName, final Role role)
+ {
+ _linkName = linkName;
+ _remoteContainerId = remoteContainerId;
+ _role = role;
+ }
+
+ public LinkKey(final LinkDefinition link)
+ {
+ this(link.getRemoteContainerId(), link.getName(), link.getRole());
+ }
+
+ public String getLinkName()
+ {
+ return _linkName;
+ }
+
+ public String getRemoteContainerId()
+ {
+ return _remoteContainerId;
+ }
+
+ public Role getRole()
+ {
+ return _role;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LinkKey linkKey = (LinkKey) o;
+
+ if (!_linkName.equals(linkKey._linkName))
+ {
+ return false;
+ }
+ if (!_remoteContainerId.equals(linkKey._remoteContainerId))
+ {
+ return false;
+ }
+ return _role.equals(linkKey._role);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = _linkName.hashCode();
+ result = 31 * result + _remoteContainerId.hashCode();
+ result = 31 * result + _role.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "LinkKey{" +
+ "linkName='" + _linkName + '\'' +
+ ", remoteContainerId='" + _remoteContainerId + '\'' +
+ ", role=" + _role +
+ '}';
+ }
+}
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java (from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java Tue Mar 21 15:30:44 2017
@@ -1,5 +1,4 @@
/*
- *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,12 +17,17 @@
* under the License.
*
*/
-package org.apache.qpid.server.virtualhost;
-import org.apache.qpid.server.protocol.LinkModel;
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
-public interface LinkRegistry
+public interface LinkRegistry extends LinkRegistryModel
{
- <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
- <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+ void linkClosed(final Link_1_0 link);
+
+ void linkChanged(final Link_1_0 link);
+
+ TerminusDurability getHighestSupportedTerminusDurability();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java Tue Mar 21 15:30:44 2017
@@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
@PluggableService
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Tue Mar 21 15:30:44 2017
@@ -20,52 +20,151 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdaterImpl;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class LinkRegistryImpl implements LinkRegistry
{
- private final Map<String, Map<String, Link_1_0>> _sendingLinkRegistry = new HashMap<>();
- private final Map<String, Map<String, Link_1_0>> _receivingLinkRegistry = new HashMap<>();
+ private static final Logger LOGGER = LoggerFactory.getLogger(LinkRegistryImpl.class);
+ private final ConcurrentMap<LinkKey, Link_1_0> _sendingLinkRegistry = new ConcurrentHashMap<>();
+ private final ConcurrentMap<LinkKey, Link_1_0> _receivingLinkRegistry = new ConcurrentHashMap<>();
private final NamedAddressSpace _addressSpace;
+ private final LinkStore _linkStore;
+
LinkRegistryImpl(final NamedAddressSpace addressSpace)
{
_addressSpace = addressSpace;
+
+ LinkStoreFactory storeFactory = null;
+ Iterable<LinkStoreFactory> linkStoreFactories = new QpidServiceLoader().instancesOf(LinkStoreFactory.class);
+ for (LinkStoreFactory linkStoreFactory : linkStoreFactories)
+ {
+ if (linkStoreFactory.supports(addressSpace)
+ && (storeFactory == null || storeFactory.getPriority() < linkStoreFactory.getPriority()))
+ {
+ storeFactory = linkStoreFactory;
+ }
+ }
+ if (storeFactory == null)
+ {
+ throw new ServerScopedRuntimeException("Cannot find suitable link store");
+ }
+ _linkStore = storeFactory.create(addressSpace);
+
}
+ @Override
public Link_1_0 getSendingLink(final String remoteContainerId, final String linkName)
{
return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
}
+ @Override
public Link_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
{
return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
}
+ @Override
+ public void linkClosed(final Link_1_0 link)
+ {
+ ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole());
+ linkRegistry.remove(new LinkKey(link));
+ _linkStore.deleteLink(link);
+ }
+
+ @Override
+ public void linkChanged(final Link_1_0 link)
+ {
+ getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link);
+ if ((link.getRole() == Role.SENDER && link.getSource() instanceof Source
+ && ((Source) link.getSource()).getDurable() != TerminusDurability.NONE)
+ || (link.getRole() == Role.RECEIVER && link.getTarget() instanceof Target
+ && ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE))
+ {
+ _linkStore.saveLink(link);
+ }
+ }
+
+ @Override
+ public TerminusDurability getHighestSupportedTerminusDurability()
+ {
+ TerminusDurability supportedTerminusDurability = _linkStore.getHighestSupportedTerminusDurability();
+ return supportedTerminusDurability == TerminusDurability.UNSETTLED_STATE ? TerminusDurability.CONFIGURATION : supportedTerminusDurability;
+ }
+
+ @Override
+ public void open()
+ {
+ Collection<LinkDefinition> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+ for(LinkDefinition link: links)
+ {
+ ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole());
+ linkRegistry.put(new LinkKey(link), new LinkImpl(link, this));
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ _linkStore.close();
+ }
+
+ @Override
+ public void delete()
+ {
+ _linkStore.delete();
+ }
+
private Link_1_0 getLinkFromRegistry(final String remoteContainerId,
final String linkName,
- final Map<String, Map<String, Link_1_0>> linkRegistry,
+ final ConcurrentMap<LinkKey, Link_1_0> linkRegistry,
final Role role)
{
- Map<String, Link_1_0> containerRegistry = linkRegistry.get(remoteContainerId);
- if (containerRegistry == null)
- {
- containerRegistry = new HashMap<>();
- linkRegistry.put(remoteContainerId, containerRegistry);
- }
- Link_1_0 link = containerRegistry.get(linkName);
+ LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role);
+ Link_1_0 newLink = new LinkImpl(remoteContainerId, linkName, role, this);
+ Link_1_0 link = linkRegistry.putIfAbsent(linkKey, newLink);
if (link == null)
{
- link = new LinkImpl(linkName, role);
- containerRegistry.put(linkName, link);
+ link = newLink;
}
return link;
}
+
+ private ConcurrentMap<LinkKey, Link_1_0> getLinkRegistry(final Role role)
+ {
+ ConcurrentMap<LinkKey, Link_1_0> linkRegistry;
+ if (Role.SENDER == role)
+ {
+ linkRegistry = _sendingLinkRegistry;
+
+ }
+ else if (Role.RECEIVER == role)
+ {
+ linkRegistry = _receivingLinkRegistry;
+ }
+ else
+ {
+ throw new ServerScopedRuntimeException(String.format("Unsupported link role %s", role));
+ }
+
+ return linkRegistry;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Tue Mar 21 15:30:44 2017
@@ -22,13 +22,12 @@ package org.apache.qpid.server.protocol.
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.qpid.server.protocol.LinkModel;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-public interface Link_1_0 extends LinkModel
+public interface Link_1_0 extends LinkDefinition
{
ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
@@ -36,17 +35,11 @@ public interface Link_1_0 extends LinkMo
void discardEndpoint();
- String getName();
-
- Role getRole();
-
- BaseSource getSource();
-
- BaseTarget getTarget();
-
void setSource(BaseSource source);
void setTarget(BaseTarget target);
void setTermini(BaseSource source, BaseTarget target);
+
+ TerminusDurability getHighestSupportedTerminusDurability();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org