You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/03/21 15:30:46 UTC

svn commit: r1787994 [1/2] - in /qpid/java/trunk: ./ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ bdbstore/src/main/java/org/apache/qpid/server/virtualhost/b...

Author: orudyy
Date: Tue Mar 21 15:30:44 2017
New Revision: 1787994

URL: http://svn.apache.org/viewvc?rev=1787994&view=rev
Log:
QPID-7663: LinkStore infrastructure and BDB store backend

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java
      - copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/
    qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java
      - copied, changed from r1787932, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
      - copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NullLinkStoreFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStore.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdater.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreUpdaterImpl.java
      - copied, changed from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/TestLinkStoreFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/store/LinkStoreTestCase.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java
Modified:
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Header.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Source.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Target.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/TerminusDurability.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryTest.java
    qpid/java/trunk/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementAddressSpace.java
    qpid/java/trunk/broker/pom.xml
    qpid/java/trunk/pom.xml
    qpid/java/trunk/systests/pom.xml
    qpid/java/trunk/systests/qpid-systests-jms_2.0/pom.xml
    qpid/java/trunk/test-profiles/JavaBDBExcludes

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBEnvironmentContainer.java Tue Mar 21 15:30:44 2017
@@ -52,4 +52,6 @@ public interface BDBEnvironmentContainer
     @ManagedOperation(description = "Get the BDB database statistics", nonModifying = true,
             changesConfiguredObjectState = false)
     Map<String, Object> databaseStatistics(@Param(name="database", description = "database table for which to retrieve statistics", mandatory = true)String database, @Param(name="reset", defaultValue = "false", description = "If true, reset the statistics") boolean reset);
+
+    EnvironmentFacade getEnvironmentFacade();
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -62,7 +62,8 @@ public interface EnvironmentFacade
     void upgradeIfNecessary(ConfiguredObject<?> parent);
 
     Database openDatabase(String databaseName, DatabaseConfig databaseConfig);
-    Database clearDatabase(String databaseName, DatabaseConfig databaseConfig);
+
+    Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig);
 
     Sequence openSequence(Database database, DatabaseEntry sequenceKey, SequenceConfig sequenceConfig);
 
@@ -97,4 +98,6 @@ public interface EnvironmentFacade
     Map<String, Object> getTransactionStatistics(boolean reset);
 
     Map<String,Object> getDatabaseStatistics(String database, boolean reset);
+
+    void deleteDatabase(String databaseName);
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -310,6 +310,13 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
+    public void deleteDatabase(final String databaseName)
+    {
+        closeDatabase(databaseName);
+        getEnvironment().removeDatabase(null, databaseName);
+    }
+
+    @Override
     public Map<String, Object> getTransactionStatistics(boolean reset)
     {
         return EnvironmentUtils.getTransactionStatistics(getEnvironment(), reset);
@@ -466,11 +473,11 @@ public class StandardEnvironmentFacade i
 
 
     @Override
-    public Database clearDatabase(String name, DatabaseConfig databaseConfig)
+    public Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig)
     {
-        closeDatabase(name);
-        getEnvironment().removeDatabase(null, name);
-        return openDatabase(name, databaseConfig);
+        closeDatabase(databaseName);
+        getEnvironment().removeDatabase(txn, databaseName);
+        return getEnvironment().openDatabase(txn, databaseName, databaseConfig);
     }
 
     @Override

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Tue Mar 21 15:30:44 2017
@@ -628,13 +628,12 @@ public class ReplicatedEnvironmentFacade
         return cachedHandle;
     }
 
-
     @Override
-    public Database clearDatabase(String name, DatabaseConfig databaseConfig)
+    public Database clearDatabase(Transaction txn, String databaseName, DatabaseConfig databaseConfig)
     {
-        closeDatabase(name);
-        getEnvironment().removeDatabase(null, name);
-        return openDatabase(name, databaseConfig);
+        closeDatabase(databaseName);
+        getEnvironment().removeDatabase(txn, databaseName);
+        return getEnvironment().openDatabase(txn, databaseName, databaseConfig);
     }
 
     @Override
@@ -883,6 +882,13 @@ public class ReplicatedEnvironmentFacade
 
     }
 
+    @Override
+    public void deleteDatabase(final String databaseName)
+    {
+        closeDatabase(databaseName);
+        getEnvironment().removeDatabase(null, databaseName);
+    }
+
 
     private <T> T submitEnvironmentTask(final int timeout, final Callable<T> task, String action)
     {

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHostImpl.java Tue Mar 21 15:30:44 2017
@@ -182,4 +182,15 @@ public class BDBVirtualHostImpl extends
         }
         return Collections.emptyMap();
     }
+
+    @Override
+    public EnvironmentFacade getEnvironmentFacade()
+    {
+        BDBMessageStore bdbMessageStore = (BDBMessageStore) getMessageStore();
+        if (bdbMessageStore != null)
+        {
+            return bdbMessageStore.getEnvironmentFacade();
+        }
+        return null;
+    }
 }

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Tue Mar 21 15:30:44 2017
@@ -1461,6 +1461,13 @@ public class BDBHAVirtualHostNodeImpl ex
             return Collections.emptyMap();
         }
     }
+
+    @Override
+    public EnvironmentFacade getEnvironmentFacade()
+    {
+        return _environmentFacade.get();
+    }
+
     public static Map<String, Collection<String>> getSupportedChildTypes()
     {
         return Collections.singletonMap(VirtualHost.class.getSimpleName(), (Collection<String>) Collections.singleton(BDBHAVirtualHostImpl.VIRTUAL_HOST_TYPE));

Modified: qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java (original)
+++ qpid/java/trunk/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBVirtualHostNodeImpl.java Tue Mar 21 15:30:44 2017
@@ -185,6 +185,17 @@ public class BDBVirtualHostNodeImpl exte
     }
 
     @Override
+    public EnvironmentFacade getEnvironmentFacade()
+    {
+        BDBConfigurationStore bdbConfigurationStore = (BDBConfigurationStore) getConfigurationStore();
+        if (bdbConfigurationStore != null)
+        {
+            return bdbConfigurationStore.getEnvironmentFacade();
+        }
+        return null;
+    }
+
+    @Override
     public PreferenceStore getPreferenceStore()
     {
         return ((BDBConfigurationStore) getConfigurationStore()).getPreferenceStore();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java Tue Mar 21 15:30:44 2017
@@ -178,7 +178,7 @@ public abstract class AbstractVirtualHos
 
     private final StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
-    private volatile LinkRegistry _linkRegistry;
+    private volatile LinkRegistryModel _linkRegistry;
     private AtomicBoolean _blocked = new AtomicBoolean();
 
     private final Map<String, MessageDestination> _systemNodeDestinations =
@@ -595,7 +595,16 @@ public abstract class AbstractVirtualHos
         PreferencesRoot preferencesRoot = (VirtualHostNode) getParent();
         _preferenceStore = preferencesRoot.createPreferenceStore();
 
-        Iterator<LinkRegistryFactory> linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
+        _linkRegistry = createLinkRegistry();
+
+        createHousekeepingExecutor();
+    }
+
+    LinkRegistryModel createLinkRegistry()
+    {
+        LinkRegistryModel linkRegistry;
+        Iterator<LinkRegistryFactory>
+                linkRegistryFactories = (new QpidServiceLoader()).instancesOf(LinkRegistryFactory.class).iterator();
         if (linkRegistryFactories.hasNext())
         {
             final LinkRegistryFactory linkRegistryFactory = linkRegistryFactories.next();
@@ -603,14 +612,13 @@ public abstract class AbstractVirtualHos
             {
                 throw new RuntimeException("Found multiple implementations of LinkRegistry");
             }
-            _linkRegistry = linkRegistryFactory.create(this);
+            linkRegistry = linkRegistryFactory.create(this);
         }
         else
         {
-            _linkRegistry = null;
+            linkRegistry = null;
         }
-
-        createHousekeepingExecutor();
+        return linkRegistry;
     }
 
     private void createHousekeepingExecutor()
@@ -2113,6 +2121,10 @@ public abstract class AbstractVirtualHos
             {
                 shutdownHouseKeeping();
                 closeNetworkConnectionScheduler();
+                if (_linkRegistry != null)
+                {
+                    _linkRegistry.close();
+                }
                 closeMessageStore();
                 stopPreferenceTaskExecutor();
                 closePreferenceStore();
@@ -2165,6 +2177,10 @@ public abstract class AbstractVirtualHos
                     @Override
                     public void run()
                     {
+                        if (_linkRegistry != null)
+                        {
+                            _linkRegistry.delete();
+                        }
                         MessageStore ms = getMessageStore();
                         if (ms != null)
                         {
@@ -2513,6 +2529,11 @@ public abstract class AbstractVirtualHos
 
         messageStore.upgradeStoreStructure();
 
+        if (_linkRegistry != null)
+        {
+            _linkRegistry.open();
+        }
+
         getBroker().assignTargetSizes();
 
         final PreferenceStoreUpdater updater = new PreferenceStoreUpdaterImpl();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryFactory.java Tue Mar 21 15:30:44 2017
@@ -24,5 +24,5 @@ import org.apache.qpid.server.plugin.Plu
 
 public interface LinkRegistryFactory extends Pluggable
 {
-    LinkRegistry create(NamedAddressSpace addressSpace);
+    LinkRegistryModel create(NamedAddressSpace addressSpace);
 }

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java (from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistryModel.java Tue Mar 21 15:30:44 2017
@@ -22,8 +22,12 @@ package org.apache.qpid.server.virtualho
 
 import org.apache.qpid.server.protocol.LinkModel;
 
-public interface LinkRegistry
+public interface LinkRegistryModel
 {
     <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
     <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+
+    void open();
+    void close();
+    void delete();
 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/pom.xml Tue Mar 21 15:30:44 2017
@@ -0,0 +1,84 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one
+  ~  or more contributor license agreements.  See the NOTICE file
+  ~  distributed with this work for additional information
+  ~  regarding copyright ownership.  The ASF licenses this file
+  ~  to you under the Apache License, Version 2.0 (the
+  ~  "License"); you may not use this file except in compliance
+  ~  with the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing,
+  ~  software distributed under the License is distributed on an
+  ~  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~  KIND, either express or implied.  See the License for the
+  ~  specific language governing permissions and limitations
+  ~  under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.qpid</groupId>
+        <artifactId>qpid-java-build</artifactId>
+        <version>7.0.0-SNAPSHOT</version>
+        <relativePath>../..</relativePath>
+    </parent>
+
+    <groupId>org.apache.qpid</groupId>
+    <artifactId>qpid-broker-plugins-amqp-1-0-protocol-bdb-store</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-codegen</artifactId>
+            <version>${project.version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-bdbstore</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.sleepycat</groupId>
+            <artifactId>je</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-broker-plugins-amqp-1-0-protocol</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+    </dependencies>
+</project>

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStore.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,308 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.bind.tuple.StringBinding;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseNotFoundException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ModelVersion;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.LinkDefinitionImpl;
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdater;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+public class BDBLinkStore implements LinkStore
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(BDBLinkStore.class);
+    private static final String LINKS_DB_NAME = "AMQP_1_0_LINKS";
+    private static final String LINKS_VERSION_DB_NAME = "AMQP_1_0_LINKS_VERSION";
+
+    private volatile StoreState _storeState = StoreState.CLOSED;
+    private final ReentrantReadWriteLock _useOrCloseRWLock = new ReentrantReadWriteLock(true);
+    private final EnvironmentFacade _environmentFacade;
+
+    BDBLinkStore(final EnvironmentFacade facade)
+    {
+        _environmentFacade = facade;
+    }
+
+    @Override
+    public Collection<LinkDefinition> openAndLoad(final LinkStoreUpdater updater) throws StoreException
+    {
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            Collection<LinkDefinition> links = getLinkDefinitions(updater);
+            _storeState = StoreState.OPENED;
+            return links;
+        }
+        catch (RuntimeException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Failed recovery of links", e);
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void saveLink(final LinkDefinition link)
+    {
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            if (_storeState != StoreState.OPENED)
+            {
+                throw new StoreException("Store is not opened");
+            }
+
+            Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+            save(linksDatabase, null, link);
+        }
+        catch (RuntimeException e)
+        {
+            throw _environmentFacade.handleDatabaseException(String.format("Failed saving of link '%s'", new LinkKey(link)), e);
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public void deleteLink(final LinkDefinition linkDefinition)
+    {
+        LinkKey linkKey = new LinkKey(linkDefinition);
+        _useOrCloseRWLock.readLock().lock();
+        try
+        {
+            if (_storeState != StoreState.OPENED)
+            {
+                throw new StoreException("Store is not opened");
+            }
+
+            Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+
+            final DatabaseEntry databaseEntry = new DatabaseEntry();
+            LinkKeyEntryBinding.getInstance().objectToEntry(linkKey, databaseEntry);
+            OperationStatus status = linksDatabase.delete(null, databaseEntry);
+            if (status != OperationStatus.SUCCESS)
+            {
+                LOGGER.debug(String.format("Unexpected status '%s' for deletion of '%s'", status, linkKey));
+            }
+        }
+        catch (RuntimeException e)
+        {
+            throw _environmentFacade.handleDatabaseException(String.format("Failed deletion of link '%s'", linkKey), e);
+        }
+        finally
+        {
+            _useOrCloseRWLock.readLock().unlock();
+        }
+    }
+
+
+    @Override
+    public void close()
+    {
+        _useOrCloseRWLock.writeLock().lock();
+        try
+        {
+            _storeState = StoreState.CLOSED;
+        }
+        finally
+        {
+            _useOrCloseRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void delete()
+    {
+        _useOrCloseRWLock.writeLock().lock();
+        try
+        {
+            close();
+            _environmentFacade.deleteDatabase(LINKS_DB_NAME);
+            _environmentFacade.deleteDatabase(LINKS_VERSION_DB_NAME);
+        }
+        catch (RuntimeException e)
+        {
+            _environmentFacade.handleDatabaseException("Failed deletion of database", e);
+            LOGGER.info("Failed to delete links database", e);
+        }
+        finally
+        {
+            _useOrCloseRWLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public TerminusDurability getHighestSupportedTerminusDurability()
+    {
+        return TerminusDurability.CONFIGURATION;
+    }
+
+    private Collection<LinkDefinition> getLinkDefinitions(final LinkStoreUpdater updater)
+    {
+        Database linksDatabase = _environmentFacade.openDatabase(LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+        Collection<LinkDefinition> links = new HashSet<>();
+
+        ModelVersion currentVersion =
+                new ModelVersion(BrokerModel.MODEL_MAJOR_VERSION, BrokerModel.MODEL_MINOR_VERSION);
+        ModelVersion storedVersion = getStoredVersion();
+        if (currentVersion.lessThan(storedVersion))
+        {
+            throw new StoreException(String.format("Cannot downgrade preference store from '%s' to '%s'", storedVersion, currentVersion));
+        }
+
+        try (Cursor cursor = linksDatabase.openCursor(null, null))
+        {
+            final DatabaseEntry key = new DatabaseEntry();
+            final DatabaseEntry value = new DatabaseEntry();
+            LinkKeyEntryBinding keyEntryBinding = LinkKeyEntryBinding.getInstance();
+            LinkValueEntryBinding linkValueEntryBinding = LinkValueEntryBinding.getInstance();
+            while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+            {
+                LinkKey linkKey = keyEntryBinding.entryToObject(key);
+                LinkValue linkValue = linkValueEntryBinding.entryToObject(value);
+                LinkDefinition link = new LinkDefinitionImpl(linkKey.getRemoteContainerId(), linkKey.getLinkName(), linkKey.getRole(), linkValue.getSource(), linkValue.getTarget());
+                links.add(link);
+            }
+        }
+
+        if (storedVersion.lessThan(currentVersion))
+        {
+            links = updater.update(storedVersion.toString(), links);
+            final Transaction txn = _environmentFacade.beginTransaction(null);
+            try
+            {
+                linksDatabase = _environmentFacade.clearDatabase(txn, LINKS_DB_NAME, DEFAULT_DATABASE_CONFIG);
+                for (LinkDefinition link : links)
+                {
+                    save(linksDatabase, txn, link);
+                }
+                txn.commit();
+                linksDatabase.close();
+            }
+            catch (Exception e)
+            {
+                txn.abort();
+                throw e;
+            }
+        }
+
+        return links;
+    }
+
+    private void save(Database database, Transaction txn, final LinkDefinition link)
+    {
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry value = new DatabaseEntry();
+
+        LinkKey linkKey = new LinkKey(link);
+        LinkKeyEntryBinding.getInstance().objectToEntry(linkKey, key);
+        LinkValueEntryBinding.getInstance().objectToEntry(new LinkValue(link), value);
+
+        OperationStatus status = database.put(txn, key, value); // TODO: create transaction
+        if (status != OperationStatus.SUCCESS)
+        {
+            throw new StoreException(String.format("Cannot save link %s", linkKey));
+        }
+    }
+
+    private ModelVersion getStoredVersion() throws RuntimeException
+    {
+        try(Cursor cursor = getLinksVersionDb().openCursor(null, null))
+        {
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+
+            ModelVersion storedVersion = null;
+            while (cursor.getNext(key, value, LockMode.READ_UNCOMMITTED) == OperationStatus.SUCCESS)
+            {
+                String versionString = StringBinding.entryToString(key);
+                ModelVersion version = ModelVersion.fromString(versionString);
+                if (storedVersion == null || storedVersion.lessThan(version))
+                {
+                    storedVersion = version;
+                }
+            }
+            if (storedVersion == null)
+            {
+                throw new StoreException("No link version information.");
+            }
+            return storedVersion;
+        }
+        catch (RuntimeException e)
+        {
+            throw _environmentFacade.handleDatabaseException("Cannot visit link version", e);
+        }
+    }
+
+    private Database getLinksVersionDb()
+    {
+        Database linksVersionDb;
+        try
+        {
+            DatabaseConfig config = new DatabaseConfig().setTransactional(true).setAllowCreate(false);
+            linksVersionDb = _environmentFacade.openDatabase(LINKS_VERSION_DB_NAME, config);
+        }
+        catch (DatabaseNotFoundException e)
+        {
+            linksVersionDb = _environmentFacade.openDatabase(LINKS_VERSION_DB_NAME, DEFAULT_DATABASE_CONFIG);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            StringBinding.stringToEntry(BrokerModel.MODEL_VERSION, key);
+            LongBinding.longToEntry(System.currentTimeMillis(), value);
+            linksVersionDb.put(null, key, value);
+        }
+
+        return linksVersionDb;
+    }
+
+    enum StoreState
+    {
+        CLOSED, OPENED
+    }
+}

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreFactory.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,89 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.PluggableService;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.berkeleydb.BDBEnvironmentContainer;
+import org.apache.qpid.server.store.berkeleydb.EnvironmentFacade;
+
+@PluggableService
+public class BDBLinkStoreFactory implements LinkStoreFactory
+{
+    private static final String TYPE = "BDB";
+    @Override
+    public String getType()
+    {
+        return TYPE;
+    }
+
+    @Override
+    public LinkStore create(final NamedAddressSpace addressSpace)
+    {
+        VirtualHost<?> virtualHost = (VirtualHost<?>)addressSpace;
+        EnvironmentFacade facade;
+        if (virtualHost instanceof BDBEnvironmentContainer)
+        {
+            facade = ((BDBEnvironmentContainer)virtualHost).getEnvironmentFacade();
+        }
+        else if (virtualHost.getParent()  instanceof BDBEnvironmentContainer)
+        {
+            facade = ((BDBEnvironmentContainer)virtualHost.getParent()).getEnvironmentFacade();
+        }
+        else
+        {
+            throw new StoreException("Cannot create BDB Link Store for " + addressSpace);
+        }
+
+        if (facade == null)
+        {
+            throw new StoreException("Cannot find BDB environment for " + addressSpace);
+        }
+        return new BDBLinkStore(facade);
+    }
+
+    @Override
+    public boolean supports(final NamedAddressSpace addressSpace)
+    {
+        if (addressSpace instanceof VirtualHost)
+        {
+            if (addressSpace instanceof BDBEnvironmentContainer)
+            {
+                return true;
+            }
+            else if (((VirtualHost) addressSpace).getParent()  instanceof BDBEnvironmentContainer)
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    @Override
+    public int getPriority()
+    {
+        return 100;
+    }
+}

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkKeyEntryBinding.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,64 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.server.protocol.v1_0.LinkKey;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkKeyEntryBinding extends TupleBinding<LinkKey>
+{
+    private static final LinkKeyEntryBinding INSTANCE = new LinkKeyEntryBinding();
+
+    private LinkKeyEntryBinding()
+    {
+
+    }
+
+    @Override
+    public LinkKey entryToObject(final TupleInput input)
+    {
+        String remoteContainerId =  input.readString();
+        String linkName = input.readString();
+        Role role = Role.valueOf(input.readBoolean());
+
+        final String remoteContainerId1 = remoteContainerId;
+        final String linkName1 = linkName;
+        final Role role1 = role;
+        return new LinkKey(remoteContainerId1, linkName1, role1);
+    }
+
+    @Override
+    public void objectToEntry(final LinkKey linkKey, final TupleOutput output)
+    {
+        output.writeString(linkKey.getRemoteContainerId());
+        output.writeString(linkKey.getLinkName());
+        output.writeBoolean(linkKey.getRole().getValue());
+    }
+
+    public static LinkKeyEntryBinding getInstance()
+    {
+        return INSTANCE;
+    }
+}

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValue.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,59 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import org.apache.qpid.server.protocol.v1_0.LinkDefinition;
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+
+public class LinkValue
+{
+    static final byte CURRENT_VERSION = 0;
+    private final BaseSource _source;
+    private final BaseTarget _target;
+    private final byte _version;
+
+    public LinkValue(final BaseSource source, final BaseTarget target, final byte version)
+    {
+        _source = source;
+        _target = target;
+        _version = version;
+    }
+
+    public LinkValue(final LinkDefinition link)
+    {
+        this(link.getSource(), link.getTarget(), CURRENT_VERSION);
+    }
+
+    public BaseSource getSource()
+    {
+        return _source;
+    }
+
+    public BaseTarget getTarget()
+    {
+        return _target;
+    }
+
+    public byte getVersion()
+    {
+        return _version;
+    }
+}

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/main/java/org/apache/qpid/server/protocol/v1_0/store/bdb/LinkValueEntryBinding.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,116 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.store.StoreException;
+
+public class LinkValueEntryBinding extends TupleBinding<LinkValue>
+{
+    private static final LinkValueEntryBinding INSTANCE = new LinkValueEntryBinding();
+
+    private AMQPDescribedTypeRegistry _describedTypeRegistry =
+            AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer();
+
+    private LinkValueEntryBinding()
+    {
+    }
+
+    @Override
+    public LinkValue entryToObject(final TupleInput input)
+    {
+        byte version = input.readByte();
+        Object source = read(input);
+        if (!(source instanceof Source))
+        {
+            throw new StoreException(String.format("Unexpected object '%s' stored in the store where Source is expected",
+                                                   source.getClass()));
+        }
+
+        Object target = read(input);
+        if (!(target instanceof Target))
+        {
+            throw new StoreException(String.format("Unexpected object '%s' stored in the store where Target is expected",
+                                                   target.getClass()));
+        }
+
+        return new LinkValue((Source) source, (Target)target, version);
+    }
+
+
+    @Override
+    public void objectToEntry(final LinkValue linkValue, final TupleOutput output)
+    {
+        output.writeByte(linkValue.getVersion());
+        write(linkValue.getSource(), output);
+        write(linkValue.getTarget(), output);
+    }
+
+    public static LinkValueEntryBinding getInstance()
+    {
+        return INSTANCE;
+    }
+
+    private Object read(final TupleInput input)
+    {
+        int size = input.readInt();
+        byte[] bytes = new byte[size];
+        input.read(bytes);
+
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(bytes);
+        ValueHandler valueHandler = new ValueHandler(_describedTypeRegistry);
+        Object object;
+        try
+        {
+            object = valueHandler.parse(qpidByteBuffer);
+        }
+        catch (AmqpErrorException e)
+        {
+            throw new StoreException("Unexpected serialized data", e);
+        }
+        finally
+        {
+            qpidByteBuffer.dispose();
+        }
+        return object;
+    }
+
+    private void write(final Object object, final TupleOutput output)
+    {
+        ValueWriter valueWriter = _describedTypeRegistry.getValueWriter(object);
+        int encodedSize = valueWriter.getEncodedSize();
+        QpidByteBuffer qpidByteBuffer = QpidByteBuffer.allocate(encodedSize);
+        valueWriter.writeToBuffer(qpidByteBuffer);
+
+        output.writeInt(encodedSize);
+        output.write(qpidByteBuffer.array());
+        qpidByteBuffer.dispose();
+    }
+}

Added: qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-bdb-store/src/test/java/org/apache/qpid/server/protocol/v1_0/store/bdb/BDBLinkStoreTest.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.store.bdb;
+
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.Collections;
+
+import com.google.common.io.Files;
+import com.sleepycat.je.CacheMode;
+
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreTestCase;
+import org.apache.qpid.server.store.berkeleydb.StandardEnvironmentConfiguration;
+import org.apache.qpid.server.store.berkeleydb.StandardEnvironmentFacade;
+import org.apache.qpid.server.util.FileUtils;
+
+public class BDBLinkStoreTest extends LinkStoreTestCase
+{
+    private StandardEnvironmentFacade _facade;
+    private File _storeFolder;
+
+    @Override
+    protected LinkStore createLinkStore()
+    {
+        _storeFolder = Files.createTempDir();
+        StandardEnvironmentConfiguration configuration = mock(StandardEnvironmentConfiguration.class);
+        when(configuration.getName()).thenReturn("test");
+        when(configuration.getStorePath()).thenReturn(_storeFolder.getAbsolutePath());
+        when(configuration.getCacheMode()).thenReturn(CacheMode.DEFAULT);
+        when(configuration.getParameters()).thenReturn(Collections.emptyMap());
+       _facade = new StandardEnvironmentFacade(configuration);
+
+        return new BDBLinkStore(_facade);
+    }
+
+    @Override
+    protected void deleteLinkStore()
+    {
+        _facade.close();
+        FileUtils.delete(_storeFolder, true);
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/pom.xml Tue Mar 21 15:30:44 2017
@@ -62,6 +62,20 @@
   </dependencies>
 
   <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <!--version specified in parent pluginManagement -->
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
   </build>
 
 </project>

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Tue Mar 21 15:30:44 2017
@@ -69,6 +69,8 @@ public interface AMQPConnection_1_0<C ex
 
     boolean isClosed();
 
+    boolean isClosing();
+
     void close(Error error);
 
     Iterator<IdentifiedTransaction> getOpenTransactions();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java Tue Mar 21 15:30:44 2017
@@ -349,16 +349,17 @@ public class AMQPConnection_1_0Impl exte
         assertState(FrameReceivingState.ANY_FRAME);
         _frameReceivingState = FrameReceivingState.CLOSED;
         setClosedForInput(true);
-        closeReceived();
         switch (_connectionState)
         {
             case UNOPENED:
             case AWAITING_OPEN:
+                closeReceived();
                 closeConnection(ConnectionError.CONNECTION_FORCED,
                                 "Connection close sent before connection was opened");
                 break;
             case OPEN:
                 _connectionState = ConnectionState.CLOSE_RECEIVED;
+                closeReceived();
                 if(close.getError() != null)
                 {
                     final Error error = close.getError();
@@ -373,10 +374,12 @@ public class AMQPConnection_1_0Impl exte
                 addCloseTicker();
                 break;
             case CLOSE_SENT:
+                closeReceived();
                 _connectionState = ConnectionState.CLOSED;
                 _orderlyClose.set(true);
 
             default:
+                closeReceived();
         }
         _remoteError = close.getError();
     }
@@ -444,6 +447,14 @@ public class AMQPConnection_1_0Impl exte
                || _connectionState == ConnectionState.CLOSE_RECEIVED;
     }
 
+    @Override
+    public boolean isClosing()
+    {
+        return _connectionState == ConnectionState.CLOSED
+               || _connectionState == ConnectionState.CLOSE_RECEIVED
+               || _connectionState == ConnectionState.CLOSE_SENT;
+    }
+
     public boolean closedForInput()
     {
         return _closedForInput;

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Tue Mar 21 15:30:44 2017
@@ -279,7 +279,7 @@ class ConsumerTarget_1_0 extends Abstrac
         getEndpoint().setSource(null);
         getEndpoint().close();
 
-        final LinkRegistry linkReg = getSession().getConnection()
+        final LinkRegistryModel linkReg = getSession().getConnection()
                 .getAddressSpace()
                 .getLinkRegistry(getEndpoint().getSession().getConnection().getRemoteContainerId());
         linkReg.unregisterSendingLink(getEndpoint().getName());

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java (from r1787932, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinition.java Tue Mar 21 15:30:44 2017
@@ -20,21 +20,14 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public interface Link_1_0 extends LinkModel
+public interface LinkDefinition extends LinkModel
 {
-    ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
-
-    void linkClosed();
-
-    void discardEndpoint();
+    String getRemoteContainerId();
 
     String getName();
 
@@ -43,10 +36,4 @@ public interface Link_1_0 extends LinkMo
     BaseSource getSource();
 
     BaseTarget getTarget();
-
-    void setSource(BaseSource source);
-
-    void setTarget(BaseTarget target);
-
-    void setTermini(BaseSource source, BaseTarget target);
 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkDefinitionImpl.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkDefinitionImpl implements LinkDefinition
+{
+    private final String _remoteContainerId;
+    private final String _name;
+    private final Role _role;
+    private final BaseSource _source;
+    private final BaseTarget _target;
+
+    public LinkDefinitionImpl(final String remoteContainerId,
+                              final String name,
+                              final Role role,
+                              final BaseSource source,
+                              final BaseTarget target)
+    {
+        _remoteContainerId = remoteContainerId;
+        _name = name;
+        _role = role;
+        _source = source;
+        _target = target;
+    }
+
+    @Override
+    public String getRemoteContainerId()
+    {
+        return _remoteContainerId;
+    }
+
+    @Override
+    public String getName()
+    {
+        return _name;
+    }
+
+    @Override
+    public Role getRole()
+    {
+        return _role;
+    }
+
+    @Override
+    public BaseSource getSource()
+    {
+        return _source;
+    }
+
+    @Override
+    public BaseTarget getTarget()
+    {
+        return _target;
+    }
+}

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkImpl.java Tue Mar 21 15:30:44 2017
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -43,17 +44,27 @@ public class LinkImpl implements Link_1_
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(LinkImpl.class);
 
+    private final String _remoteContainerId;
     private final String _linkName;
-
     private final Role _role;
+    private final LinkRegistry _linkRegistry;
 
     private volatile LinkEndpoint _linkEndpoint;
     private volatile BaseSource _source;
     private volatile BaseTarget _target;
-    LinkImpl(final String linkName, final Role role)
+
+    public LinkImpl(final String remoteContainerId, final String linkName, final Role role, final LinkRegistry linkRegistry)
     {
+        _remoteContainerId = remoteContainerId;
         _linkName = linkName;
         _role = role;
+        _linkRegistry = linkRegistry;
+    }
+
+    public LinkImpl(final LinkDefinition linkDefinition, final LinkRegistry linkRegistry)
+    {
+        this(linkDefinition.getRemoteContainerId(), linkDefinition.getName(), linkDefinition.getRole(), linkRegistry);
+        setTermini(linkDefinition.getSource(), linkDefinition.getTarget());
     }
 
     @Override
@@ -85,12 +96,14 @@ public class LinkImpl implements Link_1_
                 }
 
                 _linkEndpoint.receiveAttach(attach);
+                _linkRegistry.linkChanged(this);
                 return Futures.immediateFuture((LinkEndpoint) _linkEndpoint);
             }
         }
-        catch (Throwable t)
+        catch (Exception e)
         {
-            return rejectLink(session, t);
+            LOGGER.debug("Error attaching link", e);
+            return rejectLink(session, e);
         }
     }
 
@@ -163,6 +176,7 @@ public class LinkImpl implements Link_1_
     public void linkClosed()
     {
         discardEndpoint();
+        _linkRegistry.linkClosed(this);
     }
 
     @Override
@@ -192,7 +206,7 @@ public class LinkImpl implements Link_1_
     @Override
     public void setSource(BaseSource source)
     {
-        _source = source;
+        setTermini(source, _target);
     }
 
     @Override
@@ -204,7 +218,7 @@ public class LinkImpl implements Link_1_
     @Override
     public void setTarget(BaseTarget target)
     {
-        _target = target;
+        setTermini(_source, target);
     }
 
     @Override
@@ -213,4 +227,16 @@ public class LinkImpl implements Link_1_
         _source = source;
         _target = target;
     }
+
+    @Override
+    public TerminusDurability getHighestSupportedTerminusDurability()
+    {
+        return _linkRegistry.getHighestSupportedTerminusDurability();
+    }
+
+    @Override
+    public String getRemoteContainerId()
+    {
+        return _remoteContainerId;
+    }
 }

Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java?rev=1787994&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java (added)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkKey.java Tue Mar 21 15:30:44 2017
@@ -0,0 +1,99 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+
+public class LinkKey
+{
+    private final String _linkName;
+    private final String _remoteContainerId;
+    private final Role _role;
+
+    public LinkKey(final String remoteContainerId, final String linkName, final Role role)
+    {
+        _linkName = linkName;
+        _remoteContainerId = remoteContainerId;
+        _role = role;
+    }
+
+    public LinkKey(final LinkDefinition link)
+    {
+        this(link.getRemoteContainerId(), link.getName(), link.getRole());
+    }
+
+    public String getLinkName()
+    {
+        return _linkName;
+    }
+
+    public String getRemoteContainerId()
+    {
+        return _remoteContainerId;
+    }
+
+    public Role getRole()
+    {
+        return _role;
+    }
+
+    @Override
+    public boolean equals(final Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        final LinkKey linkKey = (LinkKey) o;
+
+        if (!_linkName.equals(linkKey._linkName))
+        {
+            return false;
+        }
+        if (!_remoteContainerId.equals(linkKey._remoteContainerId))
+        {
+            return false;
+        }
+        return _role.equals(linkKey._role);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = _linkName.hashCode();
+        result = 31 * result + _remoteContainerId.hashCode();
+        result = 31 * result + _role.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "LinkKey{" +
+               "linkName='" + _linkName + '\'' +
+               ", remoteContainerId='" + _remoteContainerId + '\'' +
+               ", role=" + _role +
+               '}';
+    }
+}

Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java (from r1787932, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java&r1=1787932&r2=1787994&rev=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/virtualhost/LinkRegistry.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java Tue Mar 21 15:30:44 2017
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,12 +17,17 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.virtualhost;
 
-import org.apache.qpid.server.protocol.LinkModel;
+package org.apache.qpid.server.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.virtualhost.LinkRegistryModel;
 
-public interface LinkRegistry
+public interface LinkRegistry extends LinkRegistryModel
 {
-    <T extends LinkModel> T getSendingLink(String remoteContainerId, String linkName);
-    <T extends LinkModel> T getReceivingLink(String remoteContainerId, String linkName);
+    void linkClosed(final Link_1_0 link);
+
+    void linkChanged(final Link_1_0 link);
+
+    TerminusDurability getHighestSupportedTerminusDurability();
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryFactoryImpl.java Tue Mar 21 15:30:44 2017
@@ -21,7 +21,6 @@ package org.apache.qpid.server.protocol.
 
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.plugin.PluggableService;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
 import org.apache.qpid.server.virtualhost.LinkRegistryFactory;
 
 @PluggableService

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistryImpl.java Tue Mar 21 15:30:44 2017
@@ -20,52 +20,151 @@
  */
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStore;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreFactory;
+import org.apache.qpid.server.protocol.v1_0.store.LinkStoreUpdaterImpl;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.virtualhost.LinkRegistry;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 
 public class LinkRegistryImpl implements LinkRegistry
 {
-    private final Map<String, Map<String, Link_1_0>> _sendingLinkRegistry = new HashMap<>();
-    private final Map<String, Map<String, Link_1_0>> _receivingLinkRegistry = new HashMap<>();
+    private static final Logger LOGGER = LoggerFactory.getLogger(LinkRegistryImpl.class);
+    private final ConcurrentMap<LinkKey, Link_1_0> _sendingLinkRegistry = new ConcurrentHashMap<>();
+    private final ConcurrentMap<LinkKey, Link_1_0> _receivingLinkRegistry = new ConcurrentHashMap<>();
 
     private final NamedAddressSpace _addressSpace;
 
+    private final LinkStore _linkStore;
+
     LinkRegistryImpl(final NamedAddressSpace addressSpace)
     {
         _addressSpace = addressSpace;
+
+        LinkStoreFactory storeFactory = null;
+        Iterable<LinkStoreFactory> linkStoreFactories = new QpidServiceLoader().instancesOf(LinkStoreFactory.class);
+        for (LinkStoreFactory linkStoreFactory : linkStoreFactories)
+        {
+            if (linkStoreFactory.supports(addressSpace)
+                && (storeFactory == null || storeFactory.getPriority() < linkStoreFactory.getPriority()))
+            {
+                storeFactory = linkStoreFactory;
+            }
+        }
+        if (storeFactory == null)
+        {
+            throw new ServerScopedRuntimeException("Cannot find suitable link store");
+        }
+        _linkStore = storeFactory.create(addressSpace);
+
     }
 
+    @Override
     public Link_1_0 getSendingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, _sendingLinkRegistry, Role.SENDER);
     }
 
+    @Override
     public Link_1_0 getReceivingLink(final String remoteContainerId, final String linkName)
     {
         return getLinkFromRegistry(remoteContainerId, linkName, _receivingLinkRegistry, Role.RECEIVER);
     }
 
+    @Override
+    public void linkClosed(final Link_1_0 link)
+    {
+        ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole());
+        linkRegistry.remove(new LinkKey(link));
+        _linkStore.deleteLink(link);
+    }
+
+    @Override
+    public void linkChanged(final Link_1_0 link)
+    {
+        getLinkRegistry(link.getRole()).putIfAbsent(new LinkKey(link), link);
+        if ((link.getRole() == Role.SENDER && link.getSource() instanceof Source
+             && ((Source) link.getSource()).getDurable() != TerminusDurability.NONE)
+            || (link.getRole() == Role.RECEIVER && link.getTarget() instanceof Target
+                && ((Target) link.getTarget()).getDurable() != TerminusDurability.NONE))
+        {
+            _linkStore.saveLink(link);
+        }
+    }
+
+    @Override
+    public TerminusDurability getHighestSupportedTerminusDurability()
+    {
+        TerminusDurability supportedTerminusDurability = _linkStore.getHighestSupportedTerminusDurability();
+        return supportedTerminusDurability == TerminusDurability.UNSETTLED_STATE ? TerminusDurability.CONFIGURATION : supportedTerminusDurability;
+    }
+
+    @Override
+    public void open()
+    {
+        Collection<LinkDefinition> links = _linkStore.openAndLoad(new LinkStoreUpdaterImpl());
+        for(LinkDefinition link: links)
+        {
+            ConcurrentMap<LinkKey, Link_1_0> linkRegistry = getLinkRegistry(link.getRole());
+            linkRegistry.put(new LinkKey(link), new LinkImpl(link, this));
+        }
+    }
+
+    @Override
+    public void close()
+    {
+        _linkStore.close();
+    }
+
+    @Override
+    public void delete()
+    {
+        _linkStore.delete();
+    }
+
     private Link_1_0 getLinkFromRegistry(final String remoteContainerId,
                                          final String linkName,
-                                         final Map<String, Map<String, Link_1_0>> linkRegistry,
+                                         final ConcurrentMap<LinkKey, Link_1_0> linkRegistry,
                                          final Role role)
     {
-        Map<String, Link_1_0> containerRegistry = linkRegistry.get(remoteContainerId);
-        if (containerRegistry == null)
-        {
-            containerRegistry = new HashMap<>();
-            linkRegistry.put(remoteContainerId, containerRegistry);
-        }
-        Link_1_0 link = containerRegistry.get(linkName);
+        LinkKey linkKey = new LinkKey(remoteContainerId, linkName, role);
+        Link_1_0 newLink = new LinkImpl(remoteContainerId, linkName, role, this);
+        Link_1_0 link = linkRegistry.putIfAbsent(linkKey, newLink);
         if (link == null)
         {
-            link = new LinkImpl(linkName, role);
-            containerRegistry.put(linkName, link);
+            link = newLink;
         }
         return link;
     }
+
+    private ConcurrentMap<LinkKey, Link_1_0> getLinkRegistry(final Role role)
+    {
+        ConcurrentMap<LinkKey, Link_1_0> linkRegistry;
+        if (Role.SENDER == role)
+        {
+            linkRegistry = _sendingLinkRegistry;
+
+        }
+        else if (Role.RECEIVER == role)
+        {
+            linkRegistry = _receivingLinkRegistry;
+        }
+        else
+        {
+            throw new ServerScopedRuntimeException(String.format("Unsupported link role %s", role));
+        }
+
+        return linkRegistry;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1787994&r1=1787993&r2=1787994&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Tue Mar 21 15:30:44 2017
@@ -22,13 +22,12 @@ package org.apache.qpid.server.protocol.
 
 import com.google.common.util.concurrent.ListenableFuture;
 
-import org.apache.qpid.server.protocol.LinkModel;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 
-public interface Link_1_0 extends LinkModel
+public interface Link_1_0 extends LinkDefinition
 {
     ListenableFuture<? extends LinkEndpoint> attach(Session_1_0 session, final Attach attach);
 
@@ -36,17 +35,11 @@ public interface Link_1_0 extends LinkMo
 
     void discardEndpoint();
 
-    String getName();
-
-    Role getRole();
-
-    BaseSource getSource();
-
-    BaseTarget getTarget();
-
     void setSource(BaseSource source);
 
     void setTarget(BaseTarget target);
 
     void setTermini(BaseSource source, BaseTarget target);
+
+    TerminusDurability getHighestSupportedTerminusDurability();
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org