You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 10:21:34 UTC

[05/34] activemq-artemis git commit: ARTEMIS-824: Add management routines for connector services

ARTEMIS-824: Add management routines for connector services


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/1b7033a2
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/1b7033a2
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/1b7033a2

Branch: refs/heads/ARTEMIS-780
Commit: 1b7033a20e3e529595ce1ae28b4b389bfd220e43
Parents: 2dfd144
Author: Ulf Lilleengen <lu...@redhat.com>
Authored: Tue Oct 25 15:40:37 2016 +0200
Committer: Ulf Lilleengen <lu...@redhat.com>
Committed: Wed Oct 26 10:11:11 2016 +0200

----------------------------------------------------------------------
 .../core/management/ActiveMQServerControl.java  | 12 +++++
 .../impl/ActiveMQServerControlImpl.java         | 43 +++++++++++++++
 .../artemis/core/server/ServiceRegistry.java    |  8 +++
 .../core/server/impl/ConnectorsService.java     | 57 ++++++++++++--------
 .../core/server/impl/ServiceRegistryImpl.java   | 44 +++++++--------
 .../management/ActiveMQServerControlTest.java   | 16 ++++++
 .../ActiveMQServerControlUsingCoreTest.java     | 17 ++++++
 .../core/config/impl/ConnectorsServiceTest.java | 48 ++++++++++++++++-
 8 files changed, 196 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 01a8d74..075a5ef 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.api.core.management;
 
 import javax.management.MBeanOperationInfo;
+import java.util.Map;
 
 /**
  * An ActiveMQServerControl is used to manage ActiveMQ Artemis servers.
@@ -850,6 +851,17 @@ public interface ActiveMQServerControl {
    @Operation(desc = "Destroy a bridge", impact = MBeanOperationInfo.ACTION)
    void destroyBridge(@Parameter(name = "name", desc = "Name of the bridge") String name) throws Exception;
 
+   @Operation(desc = "Create a connector service", impact = MBeanOperationInfo.ACTION)
+   void createConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name,
+                               @Parameter(name = "factoryClass", desc = "Class name of the connector service factory") String factoryClass,
+                               @Parameter(name = "parameters", desc = "Parameter specific to the connector service") Map<String, Object> parameters) throws Exception;
+
+   @Operation(desc = "Destroy a connector service", impact = MBeanOperationInfo.ACTION)
+   void destroyConnectorService(@Parameter(name = "name", desc = "Name of the connector service") String name) throws Exception;
+
+   @Attribute(desc = "names of the connector services on this server")
+   String[] getConnectorServices();
+
    @Operation(desc = "force the server to stop and notify clients to failover", impact = MBeanOperationInfo.UNKNOWN)
    void forceFailover() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 5918ec4..fb7deee 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -52,6 +52,7 @@ import org.apache.activemq.artemis.api.core.management.DivertControl;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
 import org.apache.activemq.artemis.core.config.DivertConfiguration;
 import org.apache.activemq.artemis.core.messagecounter.MessageCounterManager;
 import org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterManagerImpl;
@@ -68,6 +69,7 @@ import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.ConnectorServiceFactory;
 import org.apache.activemq.artemis.core.server.Consumer;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -1851,6 +1853,47 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
    }
 
    @Override
+   public void createConnectorService(final String name, final String factoryClass, final Map<String, Object> parameters) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      try {
+         final ConnectorServiceConfiguration config = new ConnectorServiceConfiguration().setName(name).setFactoryClassName(factoryClass).setParams(parameters);
+         ConnectorServiceFactory factory = server.getServiceRegistry().getConnectorService(config);
+         server.getConnectorsService().createService(config, factory);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public void destroyConnectorService(final String name) throws Exception {
+      checkStarted();
+
+      clearIO();
+
+      try {
+         server.getConnectorsService().destroyService(name);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
+   public String[] getConnectorServices() {
+      checkStarted();
+
+      clearIO();
+
+      try {
+         return server.getConnectorsService().getConnectors().keySet().toArray(new String[0]);
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public void forceFailover() throws Exception {
       checkStarted();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
index 4f2ef9d..b0fa658 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServiceRegistry.java
@@ -54,6 +54,14 @@ public interface ServiceRegistry {
     */
    Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> getConnectorServices(List<ConnectorServiceConfiguration> configs);
 
+   /**
+    * Get connector service for a given configuration.
+    *
+    * @param configuration The connector service configuration.
+    * @return an instance of the connector service factory.
+    */
+   ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration);
+
    void addIncomingInterceptor(BaseInterceptor interceptor);
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java
index 1397070..897d27c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ConnectorsService.java
@@ -17,10 +17,13 @@
 package org.apache.activemq.artemis.core.server.impl;
 
 import java.util.Collection;
-import java.util.HashSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.ConnectorServiceConfiguration;
@@ -52,7 +55,7 @@ public final class ConnectorsService implements ActiveMQComponent {
 
    private final Configuration configuration;
 
-   private final Set<ConnectorService> connectors = new HashSet<>();
+   private final Map<String, ConnectorService> connectors = new HashMap<>();
 
    private final ServiceRegistry serviceRegistry;
 
@@ -69,51 +72,61 @@ public final class ConnectorsService implements ActiveMQComponent {
    }
 
    @Override
-   public void start() throws Exception {
+   public synchronized void start() throws Exception {
       Collection<Pair<ConnectorServiceFactory, ConnectorServiceConfiguration>> connectorServiceFactories = serviceRegistry.getConnectorServices(configuration.getConnectorServiceConfigurations());
 
       for (Pair<ConnectorServiceFactory, ConnectorServiceConfiguration> pair : connectorServiceFactories) {
-         createService(pair.getB(), pair.getA());
-      }
-
-      for (ConnectorService connector : connectors) {
          try {
-            connector.start();
+            createService(pair.getB(), pair.getA());
          } catch (Throwable e) {
-            ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, connector.getName());
+            ActiveMQServerLogger.LOGGER.errorStartingConnectorService(e, pair.getB().getConnectorName());
          }
       }
+
       isStarted = true;
    }
 
-   public void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) {
+   public synchronized void createService(ConnectorServiceConfiguration info, ConnectorServiceFactory factory) throws Exception {
+      if (connectors.containsKey(info.getConnectorName())) {
+         throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + info.getConnectorName() + " already created");
+      }
+
       if (info.getParams() != null) {
          Set<String> invalid = ConfigurationHelper.checkKeys(factory.getAllowableProperties(), info.getParams().keySet());
          if (!invalid.isEmpty()) {
-            ActiveMQServerLogger.LOGGER.connectorKeysInvalid(ConfigurationHelper.stringSetToCommaListString(invalid));
-            return;
+            throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Invalid connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
          }
       }
 
       Set<String> invalid = ConfigurationHelper.checkKeysExist(factory.getRequiredProperties(), info.getParams().keySet());
       if (!invalid.isEmpty()) {
-         ActiveMQServerLogger.LOGGER.connectorKeysMissing(ConfigurationHelper.stringSetToCommaListString(invalid));
-         return;
+         throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Missing connector keys for connector service " + info.getConnectorName() + ": " + ConfigurationHelper.stringSetToCommaListString(invalid));
       }
       ConnectorService connectorService = factory.createConnectorService(info.getConnectorName(), info.getParams(), storageManager, postOffice, scheduledPool);
-      connectors.add(connectorService);
+      connectorService.start();
+
+      connectors.put(info.getConnectorName(), connectorService);
+   }
+
+   public synchronized void destroyService(String name) throws Exception {
+      if (!connectors.containsKey(name)) {
+         throw ActiveMQExceptionType.GENERIC_EXCEPTION.createException("Connector service " + name + " does not exist");
+      }
+      ConnectorService connectorService = connectors.get(name);
+      connectorService.stop();
+      connectors.remove(name);
    }
 
    @Override
-   public void stop() throws Exception {
+   public synchronized void stop() throws Exception {
       if (!isStarted) {
          return;
       }
-      for (ConnectorService connector : connectors) {
+      for (Map.Entry<String, ConnectorService> connector : connectors.entrySet()) {
          try {
-            connector.stop();
+            connector.getValue().stop();
          } catch (Throwable e) {
-            ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getName());
+            ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, connector.getKey());
          }
       }
       connectors.clear();
@@ -121,11 +134,11 @@ public final class ConnectorsService implements ActiveMQComponent {
    }
 
    @Override
-   public boolean isStarted() {
+   public synchronized boolean isStarted() {
       return isStarted;
    }
 
-   public Set<ConnectorService> getConnectors() {
-      return connectors;
+   public synchronized Map<String, ConnectorService> getConnectors() {
+      return Collections.unmodifiableMap(connectors);
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
index d2d66a4..4add7b5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServiceRegistryImpl.java
@@ -103,12 +103,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
       if (configs != null) {
          for (final ConnectorServiceConfiguration config : configs) {
             if (connectorServices.get(config.getConnectorName()) == null) {
-               ConnectorServiceFactory factory = AccessController.doPrivileged(new PrivilegedAction<ConnectorServiceFactory>() {
-                  @Override
-                  public ConnectorServiceFactory run() {
-                     return (ConnectorServiceFactory) ClassloadingUtil.newInstanceFromClassLoader(config.getFactoryClassName());
-                  }
-               });
+               ConnectorServiceFactory factory = loadClass(config.getFactoryClassName());
                addConnectorService(factory, config);
             }
          }
@@ -118,6 +113,11 @@ public class ServiceRegistryImpl implements ServiceRegistry {
    }
 
    @Override
+   public ConnectorServiceFactory getConnectorService(ConnectorServiceConfiguration configuration) {
+      return loadClass(configuration.getFactoryClassName());
+   }
+
+   @Override
    public void addIncomingInterceptor(BaseInterceptor interceptor) {
       incomingInterceptors.add(interceptor);
    }
@@ -184,13 +184,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
       AcceptorFactory factory = acceptorFactories.get(name);
 
       if (factory == null && className != null) {
-         factory = AccessController.doPrivileged(new PrivilegedAction<AcceptorFactory>() {
-            @Override
-            public AcceptorFactory run() {
-               return (AcceptorFactory) ClassloadingUtil.newInstanceFromClassLoader(className);
-            }
-         });
-
+         factory = loadClass(className);
          addAcceptorFactory(name, factory);
       }
 
@@ -202,17 +196,21 @@ public class ServiceRegistryImpl implements ServiceRegistry {
       acceptorFactories.put(name, acceptorFactory);
    }
 
+   public <T> T loadClass(final String className) {
+      return AccessController.doPrivileged(new PrivilegedAction<T>() {
+         @Override
+         public T run() {
+            return (T) ClassloadingUtil.newInstanceFromClassLoader(className);
+         }
+      });
+   }
+
    private Transformer instantiateTransformer(final String className) {
       Transformer transformer = null;
 
       if (className != null) {
          try {
-            transformer = AccessController.doPrivileged(new PrivilegedAction<Transformer>() {
-               @Override
-               public Transformer run() {
-                  return (Transformer) ClassloadingUtil.newInstanceFromClassLoader(className);
-               }
-            });
+            transformer = loadClass(className);
          } catch (Exception e) {
             throw ActiveMQMessageBundle.BUNDLE.errorCreatingTransformerClass(e, className);
          }
@@ -223,13 +221,7 @@ public class ServiceRegistryImpl implements ServiceRegistry {
    private void instantiateInterceptors(List<String> classNames, List<BaseInterceptor> interceptors) {
       if (classNames != null) {
          for (final String className : classNames) {
-            BaseInterceptor interceptor = AccessController.doPrivileged(new PrivilegedAction<BaseInterceptor>() {
-               @Override
-               public BaseInterceptor run() {
-                  return (BaseInterceptor) ClassloadingUtil.newInstanceFromClassLoader(className);
-               }
-            });
-
+            BaseInterceptor interceptor = loadClass(className);
             interceptors.add(interceptor);
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index d040b8a..7dd2d0b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -58,6 +58,7 @@ import org.apache.activemq.artemis.jlibaio.LibaioContext;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.unit.core.config.impl.fakes.FakeConnectorServiceFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.junit.Assert;
@@ -1327,6 +1328,21 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
       Assert.assertEquals(1, second.getJsonNumber("consumerCount").longValue());
    }
 
+   @Test
+   public void testConnectorServiceManagement() throws Exception {
+      ActiveMQServerControl managementControl = createManagementControl();
+      managementControl.createConnectorService("myconn", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
+
+      Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
+
+      managementControl.createConnectorService("myconn2", FakeConnectorServiceFactory.class.getCanonicalName(), new HashMap<String, Object>());
+      Assert.assertEquals(2, server.getConnectorsService().getConnectors().size());
+
+      managementControl.destroyConnectorService("myconn");
+      Assert.assertEquals(1, server.getConnectorsService().getConnectors().size());
+      Assert.assertEquals("myconn2", managementControl.getConnectorServices()[0]);
+   }
+
    protected void scaleDown(ScaleDownHandler handler) throws Exception {
       SimpleString address = new SimpleString("testQueue");
       HashMap<String, Object> params = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 777ddd2..60187f0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -20,6 +20,8 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.Parameter;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 
+import java.util.Map;
+
 public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTest {
 
    // Constants -----------------------------------------------------
@@ -628,6 +630,21 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public void createConnectorService(String name, String factoryClass, Map<String, Object> parameters) throws Exception {
+            proxy.invokeOperation("createConnectorService", name, factoryClass, parameters);
+         }
+
+         @Override
+         public void destroyConnectorService(String name) throws Exception {
+            proxy.invokeOperation("destroyConnectorService", name);
+         }
+
+         @Override
+         public String[] getConnectorServices() {
+            return ActiveMQServerControlUsingCoreTest.toStringArray((Object[]) proxy.retrieveAttributeValue("connectorServices"));
+         }
+
+         @Override
          public void forceFailover() throws Exception {
             proxy.invokeOperation("forceFailover");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1b7033a2/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java
index aee7a71..00f0663 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/config/impl/ConnectorsServiceTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.unit.core.config.impl;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
@@ -65,7 +66,7 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
       connectorsService.start();
 
       assertTrue(connectorsService.getConnectors().size() == 1);
-      assertTrue(connectorsService.getConnectors().contains(connectorServiceFactory.getConnectorService()));
+      assertTrue(connectorsService.getConnectors().values().contains(connectorServiceFactory.getConnectorService()));
    }
 
    /**
@@ -86,4 +87,49 @@ public class ConnectorsServiceTest extends ActiveMQTestBase {
 
       assertTrue(connectorsService.getConnectors().size() == 1);
    }
+
+   /**
+    * Test that connectors can be created and destroyed directly.
+    *
+    * @throws Exception
+    */
+   @Test
+   public void testConnectorServiceUsedDirectly() throws Exception {
+      // Initial setup with existing connector service
+      ConnectorServiceConfiguration connectorServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact");
+      configuration.setConnectorServiceConfigurations(Arrays.asList(connectorServiceConfiguration));
+
+      ConnectorsService connectorsService = new ConnectorsService(configuration, null, null, null, serviceRegistry);
+      connectorsService.start();
+      assertEquals(1, connectorsService.getConnectors().size());
+
+
+      // Add with same name
+      FakeConnectorServiceFactory connectorServiceFactory = new FakeConnectorServiceFactory();
+      try {
+         connectorsService.createService(connectorServiceConfiguration, connectorServiceFactory);
+         assertTrue("Expected exception when creating service with same name", false);
+      } catch (Exception e) {
+      }
+
+
+      // Add unique with same factory
+      ConnectorServiceConfiguration additionalServiceConfiguration = new ConnectorServiceConfiguration().setFactoryClassName(FakeConnectorServiceFactory.class.getCanonicalName()).setParams(new HashMap<String, Object>()).setName("myfact2");
+      connectorsService.createService(additionalServiceConfiguration, connectorServiceFactory);
+      assertEquals(2, connectorsService.getConnectors().size());
+
+      // Destroy existing connector services
+      connectorsService.destroyService("myfact");
+      assertEquals(1, connectorsService.getConnectors().size());
+
+      connectorsService.destroyService("myfact2");
+      assertEquals(0, connectorsService.getConnectors().size());
+
+      // Destroy non-existing connector service
+      try {
+         connectorsService.destroyService("myfact");
+         assertTrue("Expected exception when destroying non-existing service", false);
+      } catch (Exception e) {
+      }
+   }
 }