You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by da...@apache.org on 2018/02/16 09:52:38 UTC

[22/30] atlas git commit: ATLAS-2246: OMRS Connector API plus REST and IGC Connector skeleton - 15th February 2018

http://git-wip-us.apache.org/repos/asf/atlas/blob/8a57e657/omrs/src/main/java/org/apache/atlas/omrs/enterprise/repositoryconnector/EnterpriseOMRSRepositoryConnector.java
----------------------------------------------------------------------
diff --git a/omrs/src/main/java/org/apache/atlas/omrs/enterprise/repositoryconnector/EnterpriseOMRSRepositoryConnector.java b/omrs/src/main/java/org/apache/atlas/omrs/enterprise/repositoryconnector/EnterpriseOMRSRepositoryConnector.java
new file mode 100644
index 0000000..f74a772
--- /dev/null
+++ b/omrs/src/main/java/org/apache/atlas/omrs/enterprise/repositoryconnector/EnterpriseOMRSRepositoryConnector.java
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.omrs.enterprise.repositoryconnector;
+
+import org.apache.atlas.ocf.ffdc.ConnectorCheckedException;
+import org.apache.atlas.omrs.enterprise.connectormanager.OMRSConnectorConsumer;
+import org.apache.atlas.omrs.enterprise.connectormanager.OMRSConnectorManager;
+import org.apache.atlas.omrs.ffdc.exception.RepositoryErrorException;
+import org.apache.atlas.omrs.metadatacollection.OMRSMetadataCollection;
+import org.apache.atlas.omrs.metadatacollection.repositoryconnector.OMRSRepositoryConnector;
+import org.apache.atlas.omrs.ffdc.OMRSErrorCode;
+import org.apache.atlas.omrs.ffdc.exception.OMRSRuntimeException;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ * EnterpriseOMRSRepositoryConnector supports federating calls to multiple metadata repositories.  As a result,
+ * its OMRSMetadataCollection (EnterpriseOMRSMetadataCollection) returns metadata from all repositories in the
+ * connected open metadata repository cohort(s).
+ * <p>
+ *     An instance of the EnterpriseOMRSRepositoryConnector is created by each Open Metadata Access Service (OMAS)
+ *     using the OCF ConnectorBroker.  They use its metadata collection to retrieve and send the metadata they need.
+ * </p>
+ * <p>
+ *     Each EnterpriseOMRSRepositoryConnector instance needs to maintain an up to date list of OMRS Connectors to all of the
+ *     repositories in the connected open metadata repository cohort(s).  It does by registering as an OMRSConnectorConsumer
+ *     with the OMRSConnectorManager to be notified when connectors to new open metadata repositories are available.
+ * </p>
+ */
+public class EnterpriseOMRSRepositoryConnector extends OMRSRepositoryConnector implements OMRSConnectorConsumer
+{
+    private OMRSConnectorManager             connectorManager                 = null;
+    private String                           connectorConsumerId              = null;
+
+    private EnterpriseOMRSMetadataCollection enterpriseMetadataCollection     = null;
+    private String                           enterpriseMetadataCollectionId   = null;
+    private String                           enterpriseMetadataCollectionName = null;
+
+    private boolean                          connected                        = false;
+
+    private FederatedConnector               localCohortConnector             = null;
+    private ArrayList<FederatedConnector>    remoteCohortConnectors           = new ArrayList<>();
+
+
+    /**
+     * Constructor used by the EnterpriseOMRSConnectorProvider.
+     *
+     * @param connectorManager - provides notifications as repositories register and unregister with the
+     *                         cohorts.
+     * @param enterpriseMetadataCollectionName - name of the virtual metadata collection that this
+     *                                         connector is managing.
+     */
+    public EnterpriseOMRSRepositoryConnector(OMRSConnectorManager connectorManager,
+                                             String               enterpriseMetadataCollectionName)
+    {
+        super();
+
+        this.enterpriseMetadataCollectionName = enterpriseMetadataCollectionName;
+
+        this.connectorManager = connectorManager;
+
+        if (connectorManager != null)
+        {
+            this.connectorConsumerId = connectorManager.registerConnectorConsumer(this);
+        }
+        else
+        {
+            OMRSErrorCode errorCode = OMRSErrorCode.INVALID_COHORT_CONFIG;
+            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage();
+
+            throw new OMRSRuntimeException(errorCode.getHTTPErrorCode(),
+                                           this.getClass().getName(),
+                                           "constructor",
+                                           errorMessage,
+                                           errorCode.getSystemAction(),
+                                           errorCode.getUserAction());
+        }
+    }
+
+
+    /**
+     * Set up the unique Id for this metadata collection.
+     *
+     * @param metadataCollectionId - String unique Id
+     */
+    public void setMetadataCollectionId(String     metadataCollectionId)
+    {
+        this.enterpriseMetadataCollectionId = metadataCollectionId;
+
+        if (metadataCollectionId != null)
+        {
+            enterpriseMetadataCollection = new EnterpriseOMRSMetadataCollection(this,
+                                                                                enterpriseMetadataCollectionId,
+                                                                                enterpriseMetadataCollectionName);
+
+            connected = true;
+        }
+    }
+
+
+    /**
+     * Returns the metadata collection object that provides an OMRS abstraction of the metadata within
+     * a metadata repository.  For the EnterpriseOMRSRepositoryConnector, this is the metadata collection that is
+     * configured to work across the cohort.
+     *
+     * @return OMRSMetadataCollection - metadata information retrieved from the metadata repository.
+     */
+    public OMRSMetadataCollection getMetadataCollection()
+    {
+        String   methodName = "getMetadataCollection()";
+
+        if (enterpriseMetadataCollection == null)
+        {
+            OMRSErrorCode errorCode = OMRSErrorCode.INVALID_COHORT_CONFIG;
+            String errorMessage = errorCode.getErrorMessageId()
+                                + errorCode.getFormattedErrorMessage();
+
+            throw new OMRSRuntimeException(errorCode.getHTTPErrorCode(),
+                                           this.getClass().getName(),
+                                           methodName,
+                                           errorMessage,
+                                           errorCode.getSystemAction(),
+                                           errorCode.getUserAction());
+        }
+
+        return enterpriseMetadataCollection;
+    }
+
+    /**
+     * Free up any resources held since the connector is no longer needed.
+     *
+     * @throws ConnectorCheckedException - there is a problem disconnecting the connector.
+     */
+    public void disconnect() throws ConnectorCheckedException
+    {
+        if ((connectorManager != null) && (connectorConsumerId != null))
+        {
+            connectorManager.unregisterConnectorConsumer(connectorConsumerId);
+        }
+
+        localCohortConnector = null;
+        remoteCohortConnectors = new ArrayList<>();
+        enterpriseMetadataCollection = null;
+        connected = false;
+    }
+
+
+    /**
+     * Returns the list of metadata collections that the EnterpriseOMRSRepositoryConnector is federating queries across.
+     *
+     * This method is used by this connector's metadata collection object on each request it processes.  This
+     * means it always has the most up to date list of metadata collections to work with.
+     *
+     * @return OMRSMetadataCollection ArrayList
+     * @throws RepositoryErrorException - the enterprise services are not available
+     */
+    protected  ArrayList<OMRSMetadataCollection>   getMetadataCollections() throws RepositoryErrorException
+    {
+        String   methodName = "getMetadataCollections()";
+
+        if (connected)
+        {
+            ArrayList<OMRSMetadataCollection> cohortMetadataCollections = new ArrayList<>();
+
+            /*
+             * Make sure the local connector is first.
+             */
+            if (localCohortConnector != null)
+            {
+                cohortMetadataCollections.add(localCohortConnector.getMetadataCollection());
+            }
+
+            /*
+             * Now add the remote connectors.
+             */
+            for (FederatedConnector federatedConnector : remoteCohortConnectors)
+            {
+                cohortMetadataCollections.add(federatedConnector.getMetadataCollection());
+            }
+
+            if (cohortMetadataCollections.size() > 0)
+            {
+                return cohortMetadataCollections;
+            }
+            else
+            {
+                OMRSErrorCode errorCode = OMRSErrorCode.NO_REPOSITORIES;
+                String errorMessage = errorCode.getErrorMessageId()
+                                    + errorCode.getFormattedErrorMessage();
+
+                throw new RepositoryErrorException(errorCode.getHTTPErrorCode(),
+                                                   this.getClass().getName(),
+                                                   methodName,
+                                                   errorMessage,
+                                                   errorCode.getSystemAction(),
+                                                   errorCode.getUserAction());
+            }
+        }
+        else
+        {
+            OMRSErrorCode errorCode = OMRSErrorCode.ENTERPRISE_DISCONNECTED;
+            String errorMessage = errorCode.getErrorMessageId() + errorCode.getFormattedErrorMessage();
+
+            throw new RepositoryErrorException(errorCode.getHTTPErrorCode(),
+                                           this.getClass().getName(),
+                                           methodName,
+                                           errorMessage,
+                                           errorCode.getSystemAction(),
+                                           errorCode.getUserAction());
+        }
+    }
+
+
+    /**
+     * Save the connector to the local repository.  This is passed from the OMRSConnectorManager.
+     *
+     * @param metadataCollectionId - Unique identifier for the metadata collection
+     * @param localConnector - OMRSRepositoryConnector object for the local repository.
+     */
+    public void setLocalConnector(String                  metadataCollectionId,
+                                  OMRSRepositoryConnector localConnector)
+    {
+        if (localConnector != null)
+        {
+            localCohortConnector = new FederatedConnector(metadataCollectionId,
+                                                           localConnector,
+                                                           localConnector.getMetadataCollection());
+        }
+        else
+        {
+            localCohortConnector = null;
+        }
+    }
+
+
+    /**
+     * Pass the connector to one of the remote repositories in the metadata repository cohort.
+     *
+     * @param metadataCollectionId - Unique identifier for the metadata collection
+     * @param remoteConnector - OMRSRepositoryConnector object providing access to the remote repository.
+     */
+    public void addRemoteConnector(String                  metadataCollectionId,
+                                   OMRSRepositoryConnector remoteConnector)
+    {
+        if (remoteConnector != null)
+        {
+            remoteCohortConnectors.add(new FederatedConnector(metadataCollectionId,
+                                                               remoteConnector,
+                                                               remoteConnector.getMetadataCollection()));
+        }
+    }
+
+
+    /**
+     * Pass the metadata collection id for a repository that has just left the metadata repository cohort.
+     *
+     * @param metadataCollectionId - identifier of the metadata collection that is no longer available.
+     */
+    public void removeRemoteConnector(String  metadataCollectionId)
+    {
+        Iterator<FederatedConnector> iterator = remoteCohortConnectors.iterator();
+
+        while(iterator.hasNext())
+        {
+            FederatedConnector registeredConnector = iterator.next();
+
+            if (registeredConnector.getMetadataCollectionId().equals(metadataCollectionId))
+            {
+                iterator.remove();
+            }
+        }
+    }
+
+    /**
+     * Call disconnect on all registered connectors and stop calling them.  The OMRS is about to shutdown.
+     */
+    public void disconnectAllConnectors()
+    {
+        // TODO
+    }
+
+
+    /**
+     * FederatedConnector is a private class for storing details of each of the connectors to the repositories
+     * in the open metadata repository cohort.
+     */
+    private class FederatedConnector
+    {
+        private String                  metadataCollectionId = null;
+        private OMRSRepositoryConnector connector            = null;
+        private OMRSMetadataCollection  metadataCollection   = null;
+
+
+        /**
+         * Constructor to set up the details of a federated connector.
+         *
+         * @param metadataCollectionId - unique identifier for the metadata collection accessed through the connector
+         * @param connector - connector for the repository
+         * @param metadataCollection - metadata collection from the connection.
+         */
+        public FederatedConnector(String metadataCollectionId, OMRSRepositoryConnector connector, OMRSMetadataCollection metadataCollection)
+        {
+            this.metadataCollectionId = metadataCollectionId;
+            this.connector = connector;
+            this.metadataCollection = metadataCollection;
+        }
+
+
+        /**
+         * Return the identifier for the metadata collection accessed through the connector.
+         *
+         * @return String identifier
+         */
+        public String getMetadataCollectionId()
+        {
+            return metadataCollectionId;
+        }
+
+
+        /**
+         * Return the connector for the repository.
+         *
+         * @return OMRSRepositoryConnector object
+         */
+        public OMRSRepositoryConnector getConnector()
+        {
+            return connector;
+        }
+
+
+        /**
+         * Return the metadata collection associated with the connector.
+         *
+         * @return OMRSMetadataCollection object
+         */
+        public OMRSMetadataCollection getMetadataCollection()
+        {
+            return metadataCollection;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/atlas/blob/8a57e657/omrs/src/main/java/org/apache/atlas/omrs/eventmanagement/OMRSEventListener.java
----------------------------------------------------------------------
diff --git a/omrs/src/main/java/org/apache/atlas/omrs/eventmanagement/OMRSEventListener.java b/omrs/src/main/java/org/apache/atlas/omrs/eventmanagement/OMRSEventListener.java
new file mode 100644
index 0000000..9db6892
--- /dev/null
+++ b/omrs/src/main/java/org/apache/atlas/omrs/eventmanagement/OMRSEventListener.java
@@ -0,0 +1,832 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.omrs.eventmanagement;
+
+import org.apache.atlas.omrs.auditlog.OMRSAuditCode;
+import org.apache.atlas.omrs.auditlog.OMRSAuditLog;
+import org.apache.atlas.omrs.auditlog.OMRSAuditingComponent;
+import org.apache.atlas.omrs.eventmanagement.events.*;
+import org.apache.atlas.omrs.eventmanagement.events.v1.OMRSEventV1;
+import org.apache.atlas.omrs.topicconnectors.OMRSTopicListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * The OMRSEventListener manages inbound events from the metadata repository cohort.  There are
+ * two main groups of events: registration events that are handled by the OMRSCohortRegistry and metadata
+ * events that describe changes to TypeDefs and metadata instances.  The metadata events are handled by the
+ * local connector.
+ * </p>
+ * <p>
+ * The role of the OMRSEventListener is to decide which events to process.  This is controlled by the
+ * synchronization rule passed on the constructor.
+ * </p>
+ */
+public class OMRSEventListener implements OMRSTopicListener
+{
+    private  String                                 cohortName = null;
+    private  String                                 localMetadataCollectionId = null;
+
+    /*
+     * There is an event processor for each category of event.  The OMRSEventListener passes appropriate events to these
+     * objects depending on the settings of its configuration.
+     */
+    private OMRSRegistryEventProcessor registryEventProcessor = null;
+    private OMRSTypeDefEventProcessor  typeDefEventProcessor  = null;
+    private OMRSInstanceEventProcessor instanceEventProcessor = null;
+
+    /*
+     * The audit log is used for recording events, decisions, errors and exceptions
+     */
+    private  OMRSAuditLog                           auditLog = new OMRSAuditLog(OMRSAuditingComponent.EVENT_LISTENER);
+
+
+    private static final Logger log = LoggerFactory.getLogger(OMRSEventListener.class);
+
+
+    /**
+     * Default Constructor - saves configuration parameters.
+     *
+     * @param cohortName - name of the cohort that this event listener belongs to
+     * @param localMetadataCollectionId - unique identifier for the local metadata collection
+     * @param registryEventProcessor - processor for registry events
+     * @param typeDefEventProcessor - processor for TypeDef synchronization events
+     * @param instanceEventProcessor - processor for metadata instance replication
+     */
+    public OMRSEventListener(String                                cohortName,
+                             String                                localMetadataCollectionId,
+                             OMRSRegistryEventProcessor            registryEventProcessor,
+                             OMRSTypeDefEventProcessor             typeDefEventProcessor,
+                             OMRSInstanceEventProcessor            instanceEventProcessor)
+    {
+        this.cohortName = cohortName;
+        this.localMetadataCollectionId = localMetadataCollectionId;
+        this.registryEventProcessor = registryEventProcessor;
+        this.typeDefEventProcessor  = typeDefEventProcessor;
+        this.instanceEventProcessor = instanceEventProcessor;
+    }
+
+
+    /**
+     * Process an incoming event.  This method is called by the OMRSTopicConnector.  The processing is careful of nulls
+     * and ignores an event that is incorrectly formatted.  The assumption is that the unformatted part of the message
+     * is an extension from a newer versionName of the protocol and can be ignored.
+     *
+     * @param event Version 1 of the OMRSEvent that defines the category and payload of the incoming event.
+     */
+    public void processEvent(OMRSEventV1 event)
+    {
+        String   actionDescription = "Process Incoming Event";
+
+        /*
+         * The event should not be null but worth checking.
+         */
+        if (event != null)
+        {
+            /*
+             *  If the event came from this server - then ignore it.
+             */
+            if ((localMetadataCollectionId != null) &&
+                    (localMetadataCollectionId.equals(event.getOriginator().getMetadataCollectionId())))
+            {
+                if (log.isDebugEnabled())
+                {
+                    log.debug("Ignoring event that this server originated");
+                }
+            }
+            else
+            {
+                /*
+                 * Determine the category of event to process.
+                 */
+                switch (event.getEventCategory())
+                {
+                    case REGISTRY:
+                        this.processRegistryEvent(new OMRSRegistryEvent(event));
+                        break;
+
+                    case TYPEDEF:
+                        this.processTypeDefEvent(new OMRSTypeDefEvent(event));
+                        break;
+
+                    case INSTANCE:
+                        this.processInstanceEvent(new OMRSInstanceEvent(event));
+                        break;
+
+                    default:
+                        /*
+                         * Nothing to do since this server does not understand the message type.  This situation
+                         * will occur if the local server is back level from another server in the cohort
+                         * and the more advanced server supports new types of messages,
+                         */
+                        OMRSAuditCode auditCode = OMRSAuditCode.PROCESS_UNKNOWN_EVENT;
+
+                        auditLog.logRecord(actionDescription,
+                                           auditCode.getLogMessageId(),
+                                           auditCode.getSeverity(),
+                                           auditCode.getFormattedLogMessage(),
+                                           "event {" + event.toString() + "}",
+                                           auditCode.getSystemAction(),
+                                           auditCode.getUserAction());
+
+                        if (log.isDebugEnabled())
+                        {
+                            log.debug("Unknown event received :|");
+                        }
+
+                }
+            }
+        }
+        else
+        {
+            /*
+             * A null event was passed - probably should not happen so log audit record.
+             */
+            OMRSAuditCode auditCode = OMRSAuditCode.NULL_OMRS_EVENT_RECEIVED;
+
+            auditLog.logRecord(actionDescription,
+                               auditCode.getLogMessageId(),
+                               auditCode.getSeverity(),
+                               auditCode.getFormattedLogMessage(),
+                               null,
+                               auditCode.getSystemAction(),
+                               auditCode.getUserAction());
+
+            if (log.isDebugEnabled())
+            {
+                log.debug("Null OMRS Event received :(");
+            }
+        }
+    }
+
+
+    /**
+     * The event contains a registry event.  It needs to be further unpacked and passed to the appropriate
+     * registry event processor (OMRSCohortRegistry).
+     *
+     * @param registryEvent event to unpack
+     */
+    private void processRegistryEvent(OMRSRegistryEvent   registryEvent)
+    {
+        if (log.isDebugEnabled())
+        {
+            log.debug("Processing registry event", registryEvent);
+        }
+
+        if (registryEvent == null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("Null registry event - ignoring event");
+            }
+        }
+        else if (registryEventProcessor == null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("No registry event processor - ignoring event ");
+            }
+        }
+        else /* process registry event */
+        {
+            OMRSRegistryEventType registryEventType       = registryEvent.getRegistryEventType();
+            OMRSEventOriginator   registryEventOriginator = registryEvent.getEventOriginator();
+
+            if ((registryEventType != null) && (registryEventOriginator != null))
+            {
+                switch (registryEventType)
+                {
+                    case REGISTRATION_EVENT:
+                        registryEventProcessor.processRegistrationEvent(cohortName,
+                                                                        registryEventOriginator.getMetadataCollectionId(),
+                                                                        registryEventOriginator.getServerName(),
+                                                                        registryEventOriginator.getServerType(),
+                                                                        registryEventOriginator.getOrganizationName(),
+                                                                        registryEvent.getRegistrationTimestamp(),
+                                                                        registryEvent.getRemoteConnection(),
+                                                                        registryEvent.getTypeDefSummaries());
+                        break;
+
+                    case RE_REGISTRATION_EVENT:
+                        registryEventProcessor.processReRegistrationEvent(cohortName,
+                                                                          registryEventOriginator.getMetadataCollectionId(),
+                                                                          registryEventOriginator.getServerName(),
+                                                                          registryEventOriginator.getServerType(),
+                                                                          registryEventOriginator.getOrganizationName(),
+                                                                          registryEvent.getRegistrationTimestamp(),
+                                                                          registryEvent.getRemoteConnection(),
+                                                                          registryEvent.getTypeDefSummaries());
+                        break;
+
+                    case REFRESH_REGISTRATION_REQUEST:
+                        registryEventProcessor.processRegistrationRefreshRequest(cohortName,
+                                                                                 registryEventOriginator.getServerName(),
+                                                                                 registryEventOriginator.getServerType(),
+                                                                                 registryEventOriginator.getOrganizationName());
+                        break;
+
+                    case UN_REGISTRATION_EVENT:
+                        registryEventProcessor.processUnRegistrationEvent(cohortName,
+                                                                          registryEventOriginator.getMetadataCollectionId(),
+                                                                          registryEventOriginator.getServerName(),
+                                                                          registryEventOriginator.getServerType(),
+                                                                          registryEventOriginator.getOrganizationName());
+                        break;
+
+                    case REGISTRATION_ERROR_EVENT:
+                        OMRSRegistryEventErrorCode errorCode = registryEvent.getErrorCode();
+
+                        if (errorCode != null)
+                        {
+                            switch(errorCode)
+                            {
+                                case BAD_REMOTE_CONNECTION:
+                                    registryEventProcessor.processBadConnectionEvent(cohortName,
+                                                                                     registryEventOriginator.getMetadataCollectionId(),
+                                                                                     registryEventOriginator.getServerName(),
+                                                                                     registryEventOriginator.getServerType(),
+                                                                                     registryEventOriginator.getOrganizationName(),
+                                                                                     registryEvent.getTargetMetadataCollectionId(),
+                                                                                     registryEvent.getTargetRemoteConnection(),
+                                                                                     registryEvent.getErrorMessage());
+                                    break;
+
+                                case CONFLICTING_COLLECTION_ID:
+                                    registryEventProcessor.processConflictingCollectionIdEvent(cohortName,
+                                                                                               registryEventOriginator.getMetadataCollectionId(),
+                                                                                               registryEventOriginator.getServerName(),
+                                                                                               registryEventOriginator.getServerType(),
+                                                                                               registryEventOriginator.getOrganizationName(),
+                                                                                               registryEvent.getTargetMetadataCollectionId(),
+                                                                                               registryEvent.getErrorMessage());
+                                    break;
+
+                                default:
+                                    if (log.isDebugEnabled())
+                                    {
+                                        log.debug("Unknown registry event error code - ignoring event");
+                                    }
+                                    break;
+                            }
+                        }
+                        else
+                        {
+                            if (log.isDebugEnabled())
+                            {
+                                log.debug("Null registry event error code - ignoring event");
+                            }
+                        }
+                        break;
+
+                    default:
+                        /*
+                         * New type of registry event that this server does not understand - ignore it
+                         */
+                        if (log.isDebugEnabled())
+                        {
+                            log.debug("Unknown registry event: " + registryEvent.toString());
+                        }
+                        break;
+                }
+            }
+            else
+            {
+                if (log.isDebugEnabled())
+                {
+                    log.debug("Ignored registry event: " + registryEvent.toString());
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Unpack and deliver a TypeDef event to the TypeDefEventProcessor
+     *
+     * @param typeDefEvent - event to unpack
+     */
+    private void processTypeDefEvent(OMRSTypeDefEvent   typeDefEvent)
+    {
+        if (typeDefEvent == null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("Null TypeDef event - ignoring event");
+            }
+        }
+        else if (typeDefEventProcessor == null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("No TypeDef event processor - ignoring event");
+            }
+        }
+        else
+        {
+            OMRSTypeDefEventType  typeDefEventType       = typeDefEvent.getTypeDefEventType();
+            OMRSEventOriginator   typeDefEventOriginator = typeDefEvent.getEventOriginator();
+
+            if ((typeDefEventType != null) && (typeDefEventOriginator != null))
+            {
+                switch (typeDefEventType)
+                {
+                    case NEW_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processNewTypeDefEvent(cohortName,
+                                                                     typeDefEventOriginator.getMetadataCollectionId(),
+                                                                     typeDefEventOriginator.getServerName(),
+                                                                     typeDefEventOriginator.getServerType(),
+                                                                     typeDefEventOriginator.getOrganizationName(),
+                                                                     typeDefEvent.getTypeDef());
+                        break;
+
+                    case NEW_ATTRIBUTE_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processNewAttributeTypeDefEvent(cohortName,
+                                                                              typeDefEventOriginator.getMetadataCollectionId(),
+                                                                              typeDefEventOriginator.getServerName(),
+                                                                              typeDefEventOriginator.getServerType(),
+                                                                              typeDefEventOriginator.getOrganizationName(),
+                                                                              typeDefEvent.getAttributeTypeDef());
+                        break;
+
+                    case UPDATED_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processUpdatedTypeDefEvent(cohortName,
+                                                                         typeDefEventOriginator.getMetadataCollectionId(),
+                                                                         typeDefEventOriginator.getServerName(),
+                                                                         typeDefEventOriginator.getServerType(),
+                                                                         typeDefEventOriginator.getOrganizationName(),
+                                                                         typeDefEvent.getTypeDefPatch());
+                        break;
+
+                    case DELETED_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processDeletedTypeDefEvent(cohortName,
+                                                                         typeDefEventOriginator.getMetadataCollectionId(),
+                                                                         typeDefEventOriginator.getServerName(),
+                                                                         typeDefEventOriginator.getServerType(),
+                                                                         typeDefEventOriginator.getOrganizationName(),
+                                                                         typeDefEvent.getTypeDefGUID(),
+                                                                         typeDefEvent.getTypeDefName());
+                        break;
+
+                    case DELETED_ATTRIBUTE_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processDeletedAttributeTypeDefEvent(cohortName,
+                                                                                  typeDefEventOriginator.getMetadataCollectionId(),
+                                                                                  typeDefEventOriginator.getServerName(),
+                                                                                  typeDefEventOriginator.getServerType(),
+                                                                                  typeDefEventOriginator.getOrganizationName(),
+                                                                                  typeDefEvent.getTypeDefGUID(),
+                                                                                  typeDefEvent.getTypeDefName());
+                        break;
+
+                    case RE_IDENTIFIED_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processReIdentifiedTypeDefEvent(cohortName,
+                                                                              typeDefEventOriginator.getMetadataCollectionId(),
+                                                                              typeDefEventOriginator.getServerName(),
+                                                                              typeDefEventOriginator.getServerType(),
+                                                                              typeDefEventOriginator.getOrganizationName(),
+                                                                              typeDefEvent.getOriginalTypeDefSummary(),
+                                                                              typeDefEvent.getTypeDef());
+                        break;
+
+                    case RE_IDENTIFIED_ATTRIBUTE_TYPEDEF_EVENT:
+                        typeDefEventProcessor.processReIdentifiedAttributeTypeDefEvent(cohortName,
+                                                                                       typeDefEventOriginator.getMetadataCollectionId(),
+                                                                                       typeDefEventOriginator.getServerName(),
+                                                                                       typeDefEventOriginator.getServerType(),
+                                                                                       typeDefEventOriginator.getOrganizationName(),
+                                                                                       typeDefEvent.getOriginalAttributeTypeDef(),
+                                                                                       typeDefEvent.getAttributeTypeDef());
+
+                    case TYPEDEF_ERROR_EVENT:
+                        OMRSTypeDefEventErrorCode errorCode = typeDefEvent.getErrorCode();
+
+                        if (errorCode != null)
+                        {
+                            switch(errorCode)
+                            {
+                                case CONFLICTING_TYPEDEFS:
+                                    typeDefEventProcessor.processTypeDefConflictEvent(cohortName,
+                                                                                      typeDefEventOriginator.getMetadataCollectionId(),
+                                                                                      typeDefEventOriginator.getServerName(),
+                                                                                      typeDefEventOriginator.getServerType(),
+                                                                                      typeDefEventOriginator.getOrganizationName(),
+                                                                                      typeDefEvent.getOriginalTypeDefSummary(),
+                                                                                      typeDefEvent.getOtherMetadataCollectionId(),
+                                                                                      typeDefEvent.getOtherTypeDefSummary(),
+                                                                                      typeDefEvent.getErrorMessage());
+                                    break;
+
+                                case CONFLICTING_ATTRIBUTE_TYPEDEFS:
+                                    typeDefEventProcessor.processAttributeTypeDefConflictEvent(cohortName,
+                                                                                               typeDefEventOriginator.getMetadataCollectionId(),
+                                                                                               typeDefEventOriginator.getServerName(),
+                                                                                               typeDefEventOriginator.getServerType(),
+                                                                                               typeDefEventOriginator.getOrganizationName(),
+                                                                                               typeDefEvent.getOriginalAttributeTypeDef(),
+                                                                                               typeDefEvent.getOtherMetadataCollectionId(),
+                                                                                               typeDefEvent.getOtherAttributeTypeDef(),
+                                                                                               typeDefEvent.getErrorMessage());
+
+                                case TYPEDEF_PATCH_MISMATCH:
+                                    typeDefEventProcessor.processTypeDefPatchMismatchEvent(cohortName,
+                                                                                           typeDefEventOriginator.getMetadataCollectionId(),
+                                                                                           typeDefEventOriginator.getServerName(),
+                                                                                           typeDefEventOriginator.getServerType(),
+                                                                                           typeDefEventOriginator.getOrganizationName(),
+                                                                                           typeDefEvent.getTargetMetadataCollectionId(),
+                                                                                           typeDefEvent.getTargetTypeDefSummary(),
+                                                                                           typeDefEvent.getOtherTypeDef(),
+                                                                                           typeDefEvent.getErrorMessage());
+                                    break;
+
+                                default:
+                                    if (log.isDebugEnabled())
+                                    {
+                                        log.debug("Unknown TypeDef event error code - ignoring event");
+                                    }
+                                    break;
+                            }
+                        }
+                        else
+                        {
+                            if (log.isDebugEnabled())
+                            {
+                                log.debug("Ignored TypeDef event - null error code");
+                            }
+                        }
+                        break;
+
+                    default:
+                        if (log.isDebugEnabled())
+                        {
+                            log.debug("Ignored TypeDef event - unknown type");
+                        }
+                        break;
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Unpack and deliver an instance event to the InstanceEventProcessor
+     *
+     * @param instanceEvent - event to unpack
+     */
+    private void processInstanceEvent(OMRSInstanceEvent  instanceEvent)
+    {
+        if (log.isDebugEnabled())
+        {
+            log.debug("Processing instance event", instanceEvent);
+        }
+
+        if (instanceEvent == null)
+        {
+            if (log.isDebugEnabled())
+            {
+                log.debug("Null instance event - ignoring event");
+            }
+        }
+        else
+        {
+            OMRSInstanceEventType instanceEventType       = instanceEvent.getInstanceEventType();
+            OMRSEventOriginator   instanceEventOriginator = instanceEvent.getEventOriginator();
+
+            if ((instanceEventType != null) && (instanceEventOriginator != null))
+            {
+                switch (instanceEventType)
+                {
+                    case NEW_ENTITY_EVENT:
+                        instanceEventProcessor.processNewEntityEvent(cohortName,
+                                                                     instanceEventOriginator.getMetadataCollectionId(),
+                                                                     instanceEventOriginator.getServerName(),
+                                                                     instanceEventOriginator.getServerType(),
+                                                                     instanceEventOriginator.getOrganizationName(),
+                                                                     instanceEvent.getEntity());
+                        break;
+
+                    case UPDATED_ENTITY_EVENT:
+                        instanceEventProcessor.processUpdatedEntityEvent(cohortName,
+                                                                         instanceEventOriginator.getMetadataCollectionId(),
+                                                                         instanceEventOriginator.getServerName(),
+                                                                         instanceEventOriginator.getServerType(),
+                                                                         instanceEventOriginator.getOrganizationName(),
+                                                                         instanceEvent.getEntity());
+                        break;
+
+                    case CLASSIFIED_ENTITY_EVENT:
+                        instanceEventProcessor.processClassifiedEntityEvent(cohortName,
+                                                                            instanceEventOriginator.getMetadataCollectionId(),
+                                                                            instanceEventOriginator.getServerName(),
+                                                                            instanceEventOriginator.getServerType(),
+                                                                            instanceEventOriginator.getOrganizationName(),
+                                                                            instanceEvent.getEntity());
+                        break;
+
+                    case RECLASSIFIED_ENTITY_EVENT:
+                        instanceEventProcessor.processReclassifiedEntityEvent(cohortName,
+                                                                              instanceEventOriginator.getMetadataCollectionId(),
+                                                                              instanceEventOriginator.getServerName(),
+                                                                              instanceEventOriginator.getServerType(),
+                                                                              instanceEventOriginator.getOrganizationName(),
+                                                                              instanceEvent.getEntity());
+                        break;
+
+                    case DECLASSIFIED_ENTITY_EVENT:
+                        instanceEventProcessor.processDeclassifiedEntityEvent(cohortName,
+                                                                              instanceEventOriginator.getMetadataCollectionId(),
+                                                                              instanceEventOriginator.getServerName(),
+                                                                              instanceEventOriginator.getServerType(),
+                                                                              instanceEventOriginator.getOrganizationName(),
+                                                                              instanceEvent.getEntity());
+                        break;
+
+                    case DELETED_ENTITY_EVENT:
+                        instanceEventProcessor.processDeletedEntityEvent(cohortName,
+                                                                         instanceEventOriginator.getMetadataCollectionId(),
+                                                                         instanceEventOriginator.getServerName(),
+                                                                         instanceEventOriginator.getServerType(),
+                                                                         instanceEventOriginator.getOrganizationName(),
+                                                                         instanceEvent.getTypeDefGUID(),
+                                                                         instanceEvent.getTypeDefName(),
+                                                                         instanceEvent.getInstanceGUID());
+                        break;
+
+                    case PURGED_ENTITY_EVENT:
+                        instanceEventProcessor.processPurgedEntityEvent(cohortName,
+                                                                        instanceEventOriginator.getMetadataCollectionId(),
+                                                                        instanceEventOriginator.getServerName(),
+                                                                        instanceEventOriginator.getServerType(),
+                                                                        instanceEventOriginator.getOrganizationName(),
+                                                                        instanceEvent.getTypeDefGUID(),
+                                                                        instanceEvent.getTypeDefName(),
+                                                                        instanceEvent.getInstanceGUID());
+                        break;
+
+                    case UNDONE_ENTITY_EVENT:
+                        instanceEventProcessor.processUndoneEntityEvent(cohortName,
+                                                                        instanceEventOriginator.getMetadataCollectionId(),
+                                                                        instanceEventOriginator.getServerName(),
+                                                                        instanceEventOriginator.getServerType(),
+                                                                        instanceEventOriginator.getOrganizationName(),
+                                                                        instanceEvent.getEntity());
+                        break;
+
+                    case RESTORED_ENTITY_EVENT:
+                        instanceEventProcessor.processRestoredEntityEvent(cohortName,
+                                                                          instanceEventOriginator.getMetadataCollectionId(),
+                                                                          instanceEventOriginator.getServerName(),
+                                                                          instanceEventOriginator.getServerType(),
+                                                                          instanceEventOriginator.getOrganizationName(),
+                                                                          instanceEvent.getEntity());
+                        break;
+
+                    case REFRESH_ENTITY_REQUEST:
+                        instanceEventProcessor.processRefreshEntityRequested(cohortName,
+                                                                             instanceEventOriginator.getMetadataCollectionId(),
+                                                                             instanceEventOriginator.getServerName(),
+                                                                             instanceEventOriginator.getServerType(),
+                                                                             instanceEventOriginator.getOrganizationName(),
+                                                                             instanceEvent.getTypeDefGUID(),
+                                                                             instanceEvent.getTypeDefName(),
+                                                                             instanceEvent.getInstanceGUID(),
+                                                                             instanceEvent.getHomeMetadataCollectionId());
+                        break;
+
+                    case REFRESHED_ENTITY_EVENT:
+                        instanceEventProcessor.processRefreshEntityEvent(cohortName,
+                                                                         instanceEventOriginator.getMetadataCollectionId(),
+                                                                         instanceEventOriginator.getServerName(),
+                                                                         instanceEventOriginator.getServerType(),
+                                                                         instanceEventOriginator.getOrganizationName(),
+                                                                         instanceEvent.getEntity());
+                        break;
+
+                    case RE_HOMED_ENTITY_EVENT:
+                        instanceEventProcessor.processReHomedEntityEvent(cohortName,
+                                                                         instanceEventOriginator.getMetadataCollectionId(),
+                                                                         instanceEventOriginator.getServerName(),
+                                                                         instanceEventOriginator.getServerType(),
+                                                                         instanceEventOriginator.getOrganizationName(),
+                                                                         instanceEvent.getOriginalHomeMetadataCollectionId(),
+                                                                         instanceEvent.getEntity());
+                        break;
+
+                    case RETYPED_ENTITY_EVENT:
+                        instanceEventProcessor.processReTypedEntityEvent(cohortName,
+                                                                         instanceEventOriginator.getMetadataCollectionId(),
+                                                                         instanceEventOriginator.getServerName(),
+                                                                         instanceEventOriginator.getServerType(),
+                                                                         instanceEventOriginator.getOrganizationName(),
+                                                                         instanceEvent.getOriginalTypeDefSummary(),
+                                                                         instanceEvent.getEntity());
+                        break;
+
+                    case RE_IDENTIFIED_ENTITY_EVENT:
+                        instanceEventProcessor.processReIdentifiedEntityEvent(cohortName,
+                                                                              instanceEventOriginator.getMetadataCollectionId(),
+                                                                              instanceEventOriginator.getServerName(),
+                                                                              instanceEventOriginator.getServerType(),
+                                                                              instanceEventOriginator.getOrganizationName(),
+                                                                              instanceEvent.getOriginalInstanceGUID(),
+                                                                              instanceEvent.getEntity());
+                        break;
+
+                    case NEW_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processNewRelationshipEvent(cohortName,
+                                                                           instanceEventOriginator.getMetadataCollectionId(),
+                                                                           instanceEventOriginator.getServerName(),
+                                                                           instanceEventOriginator.getServerType(),
+                                                                           instanceEventOriginator.getOrganizationName(),
+                                                                           instanceEvent.getRelationship());
+                        break;
+
+                    case UPDATED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processUpdatedRelationshipEvent(cohortName,
+                                                                               instanceEventOriginator.getMetadataCollectionId(),
+                                                                               instanceEventOriginator.getServerName(),
+                                                                               instanceEventOriginator.getServerType(),
+                                                                               instanceEventOriginator.getOrganizationName(),
+                                                                               instanceEvent.getRelationship());
+                        break;
+
+                    case UNDONE_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processUndoneRelationshipEvent(cohortName,
+                                                                              instanceEventOriginator.getMetadataCollectionId(),
+                                                                              instanceEventOriginator.getServerName(),
+                                                                              instanceEventOriginator.getServerType(),
+                                                                              instanceEventOriginator.getOrganizationName(),
+                                                                              instanceEvent.getRelationship());
+                        break;
+
+                    case DELETED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processDeletedRelationshipEvent(cohortName,
+                                                                               instanceEventOriginator.getMetadataCollectionId(),
+                                                                               instanceEventOriginator.getServerName(),
+                                                                               instanceEventOriginator.getServerType(),
+                                                                               instanceEventOriginator.getOrganizationName(),
+                                                                               instanceEvent.getTypeDefGUID(),
+                                                                               instanceEvent.getTypeDefName(),
+                                                                               instanceEvent.getInstanceGUID());
+                        break;
+
+                    case PURGED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processPurgedEntityEvent(cohortName,
+                                                                        instanceEventOriginator.getMetadataCollectionId(),
+                                                                        instanceEventOriginator.getServerName(),
+                                                                        instanceEventOriginator.getServerType(),
+                                                                        instanceEventOriginator.getOrganizationName(),
+                                                                        instanceEvent.getTypeDefGUID(),
+                                                                        instanceEvent.getTypeDefName(),
+                                                                        instanceEvent.getInstanceGUID());
+                        break;
+
+                    case RESTORED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processRestoredRelationshipEvent(cohortName,
+                                                                                instanceEventOriginator.getMetadataCollectionId(),
+                                                                                instanceEventOriginator.getServerName(),
+                                                                                instanceEventOriginator.getServerType(),
+                                                                                instanceEventOriginator.getOrganizationName(),
+                                                                                instanceEvent.getRelationship());
+                        break;
+
+                    case REFRESH_RELATIONSHIP_REQUEST:
+                        instanceEventProcessor.processRefreshRelationshipRequest(cohortName,
+                                                                                 instanceEventOriginator.getMetadataCollectionId(),
+                                                                                 instanceEventOriginator.getServerName(),
+                                                                                 instanceEventOriginator.getServerType(),
+                                                                                 instanceEventOriginator.getOrganizationName(),
+                                                                                 instanceEvent.getTypeDefGUID(),
+                                                                                 instanceEvent.getTypeDefName(),
+                                                                                 instanceEvent.getInstanceGUID(),
+                                                                                 instanceEvent.getHomeMetadataCollectionId());
+                        break;
+
+                    case REFRESHED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processRefreshRelationshipEvent(cohortName,
+                                                                               instanceEventOriginator.getMetadataCollectionId(),
+                                                                               instanceEventOriginator.getServerName(),
+                                                                               instanceEventOriginator.getServerType(),
+                                                                               instanceEventOriginator.getOrganizationName(),
+                                                                               instanceEvent.getRelationship());
+                        break;
+
+                    case RE_IDENTIFIED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processReIdentifiedRelationshipEvent(cohortName,
+                                                                                    instanceEventOriginator.getMetadataCollectionId(),
+                                                                                    instanceEventOriginator.getServerName(),
+                                                                                    instanceEventOriginator.getServerType(),
+                                                                                    instanceEventOriginator.getOrganizationName(),
+                                                                                    instanceEvent.getOriginalInstanceGUID(),
+                                                                                    instanceEvent.getRelationship());
+                        break;
+
+                    case RE_HOMED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processReHomedRelationshipEvent(cohortName,
+                                                                               instanceEventOriginator.getMetadataCollectionId(),
+                                                                               instanceEventOriginator.getServerName(),
+                                                                               instanceEventOriginator.getServerType(),
+                                                                               instanceEventOriginator.getOrganizationName(),
+                                                                               instanceEvent.getOriginalHomeMetadataCollectionId(),
+                                                                               instanceEvent.getRelationship());
+                        break;
+
+                    case RETYPED_RELATIONSHIP_EVENT:
+                        instanceEventProcessor.processReTypedRelationshipEvent(cohortName,
+                                                                               instanceEventOriginator.getMetadataCollectionId(),
+                                                                               instanceEventOriginator.getServerName(),
+                                                                               instanceEventOriginator.getServerType(),
+                                                                               instanceEventOriginator.getOrganizationName(),
+                                                                               instanceEvent.getOriginalTypeDefSummary(),
+                                                                               instanceEvent.getRelationship());
+                        break;
+
+                    case INSTANCE_ERROR_EVENT:
+                        OMRSInstanceEventErrorCode errorCode = instanceEvent.getErrorCode();
+
+                        if (errorCode != null)
+                        {
+                            switch(errorCode)
+                            {
+                                case CONFLICTING_INSTANCES:
+                                    instanceEventProcessor.processConflictingInstancesEvent(cohortName,
+                                                                                            instanceEventOriginator.getMetadataCollectionId(),
+                                                                                            instanceEventOriginator.getServerName(),
+                                                                                            instanceEventOriginator.getServerType(),
+                                                                                            instanceEventOriginator.getOrganizationName(),
+                                                                                            instanceEvent.getTargetMetadataCollectionId(),
+                                                                                            instanceEvent.getTargetTypeDefSummary(),
+                                                                                            instanceEvent.getTargetInstanceGUID(),
+                                                                                            instanceEvent.getOtherMetadataCollectionId(),
+                                                                                            instanceEvent.getOtherOrigin(),
+                                                                                            instanceEvent.getOtherTypeDefSummary(),
+                                                                                            instanceEvent.getOtherInstanceGUID(),
+                                                                                            instanceEvent.getErrorMessage());
+                                    break;
+
+                                case CONFLICTING_TYPE:
+                                    instanceEventProcessor.processConflictingTypeEvent(cohortName,
+                                                                                       instanceEventOriginator.getMetadataCollectionId(),
+                                                                                       instanceEventOriginator.getServerName(),
+                                                                                       instanceEventOriginator.getServerType(),
+                                                                                       instanceEventOriginator.getOrganizationName(),
+                                                                                       instanceEvent.getTargetMetadataCollectionId(),
+                                                                                       instanceEvent.getTargetTypeDefSummary(),
+                                                                                       instanceEvent.getTargetInstanceGUID(),
+                                                                                       instanceEvent.getOtherTypeDefSummary(),
+                                                                                       instanceEvent.getErrorMessage());
+                                    break;
+
+                                default:
+                                    if (log.isDebugEnabled())
+                                    {
+                                        log.debug("Unknown instance event error code - ignoring event");
+                                    }
+                                    break;
+                            }
+                        }
+                        else
+                        {
+                            if (log.isDebugEnabled())
+                            {
+                                log.debug("Ignored Instance event - null error code");
+                            }
+                        }
+                        break;
+
+                    default:
+                        if (log.isDebugEnabled())
+                        {
+                            log.debug("Ignored TypeDef event - unknown type");
+                        }
+                        break;
+                }
+            }
+            else
+            {
+                if (log.isDebugEnabled())
+                {
+                    log.debug("Ignored instance event - null type");
+                }
+            }
+        }
+    }
+}
+
+
+