You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@juddi.apache.org by al...@apache.org on 2014/12/09 04:13:38 UTC

[17/28] juddi git commit: JUDDI-241 get/set replication cfg now works for all sub elements. notify changes works, however getChangeRecords fails when called from within the tomcat container.

JUDDI-241 get/set replication cfg now works for all sub elements. notify changes works, however getChangeRecords fails when called from within the tomcat container.


Project: http://git-wip-us.apache.org/repos/asf/juddi/repo
Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/22a846dd
Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/22a846dd
Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/22a846dd

Branch: refs/heads/master
Commit: 22a846ddb5fe2c79b23967a81899c214312fc2be
Parents: 03dce36
Author: Alex <al...@apache.org>
Authored: Sun Nov 23 19:52:30 2014 -0500
Committer: Alex <al...@apache.org>
Committed: Sun Nov 23 19:52:30 2014 -0500

----------------------------------------------------------------------
 .../src/test/resources/META-INF/persistence.xml |    3 +-
 .../juddi/api/impl/AuthenticatedService.java    |    5 +
 .../org/apache/juddi/api/impl/JUDDIApiImpl.java |   83 +-
 .../juddi/api/impl/UDDIReplicationImpl.java     | 1088 ++++++++++--------
 .../apache/juddi/mapping/MappingApiToModel.java |  333 +++---
 .../apache/juddi/mapping/MappingModelToApi.java |   27 +-
 .../java/org/apache/juddi/model/Contact.java    |   11 +
 .../org/apache/juddi/model/ControlMessage.java  |   30 +
 .../main/java/org/apache/juddi/model/Edge.java  |   41 +-
 .../juddi/model/EdgeReceiverAlternate.java      |   68 ++
 .../main/java/org/apache/juddi/model/Node.java  |  328 +++---
 .../java/org/apache/juddi/model/Operator.java   |   31 +-
 .../juddi/model/ReplicationConfiguration.java   |   46 +-
 .../model/ReplicationConfigurationNode.java     |   72 ++
 .../juddi/replication/ReplicationNotifier.java  |   84 +-
 .../apache/juddi/validation/ValidateNode.java   |    5 +-
 .../juddi/validation/ValidateReplication.java   |   49 +-
 .../src/main/resources/messages.properties      |    4 +-
 .../juddi/api/impl/API_160_ReplicationTest.java |  138 ++-
 .../apache/juddi/api/runtime/CLIServerTest.java |  119 ++
 .../apache/juddi/api/runtime/juddiTestimpl.java |  188 +++
 .../apache/juddi/api/runtime/replicantImpl.java |   70 ++
 .../replication/ReplicationNotifierTest.java    |    5 -
 .../src/test/resources/META-INF/persistence.xml |    3 +
 juddi-core/src/test/resources/META-INF/uddi.xml |   24 +
 .../src/main/resources/META-INF/persistence.xml |    3 +-
 .../org/apache/juddi/samples/EntryPoint.java    |   89 +-
 .../apache/juddi/samples/JuddiAdminService.java |  237 +++-
 .../apache/juddi/samples/UddiReplication.java   |    4 +-
 .../resources/META-INF/simple-publish-uddi.xml  |   27 +-
 .../src/test/resources/META-INF/persistence.xml |    3 +-
 .../WEB-INF/classes/META-INF/persistence.xml    |    2 +
 .../WEB-INF/classes/META-INF/persistence.xml    |    2 +
 .../WEB-INF/classes/META-INF/persistence.xml    |    2 +
 .../WEB-INF/classes/META-INF/persistence.xml    |    3 +
 pom.xml                                         |    2 +-
 uddi-ws/pom.xml                                 |   73 +-
 .../java/org/apache/juddi/repl_v3/EdgeExt.java  |   36 -
 .../main/java/org/uddi/api_v3/GetAuthToken.java |  128 ++-
 .../org/uddi/repl_v3/CommunicationGraph.java    |    1 +
 .../main/java/org/uddi/repl_v3/Operator.java    |  300 ++---
 .../uddi/repl_v3/ReplicationConfiguration.java  |   17 +-
 .../v3_service/UDDIReplicationPortType.java     |   38 +-
 .../juddi/api_v3/GetPublisherDetailTest.java    |   33 +
 44 files changed, 2516 insertions(+), 1339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
index 7bf55b0..f42c65f 100644
--- a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
+++ b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
@@ -78,7 +78,8 @@
 	<class>org.apache.juddi.model.ReplicationConfiguration</class>
 	<class>org.apache.juddi.model.Edge</class>
 	<class>org.apache.juddi.model.ControlMessage</class>
-	<class>org.apache.juddi.model.ReplicationMessage</class>
+  <class>org.apache.juddi.model.ReplicationConfigurationNode</class>
+    <class>org.apache.juddi.model.EdgeReceiverAlternate</class>
   
     <properties>
       <property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(SchemaAction='dropDB,add')"/>

http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
index d69afbf..767cbee 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
@@ -50,10 +50,15 @@ public abstract class AuthenticatedService {
 	public static final int AUTHTOKEN_RETIRED = 0;
 	static final Log logger = LogFactory.getLog(AuthenticatedService.class);
 	protected String node = "UNDEFINED_NODE_NAME";
+        protected String baseUrlSSL="UNDEFINED";
+         protected String baseUrl="UNDEFINED";
         
         public AuthenticatedService(){
                 try {
                         node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME");
+                        node=node.trim();
+                        baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL_SECURE, Property.DEFAULT_BASE_URL_SECURE);
+                        baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL, Property.DEFAULT_BASE_URL);
                 } catch (ConfigurationException ex) {
                         logger.fatal(null, ex);
                 }

http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
index 268616e..dd201fe 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
@@ -19,6 +19,7 @@ package org.apache.juddi.api.impl;
 import java.io.StringWriter;
 import java.math.BigInteger;
 import java.rmi.RemoteException;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -30,6 +31,7 @@ import javax.jws.WebService;
 import javax.persistence.EntityManager;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Query;
+import javax.xml.bind.JAXB;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.Marshaller;
 import javax.xml.ws.Holder;
@@ -73,7 +75,6 @@ import org.apache.juddi.model.ClientSubscriptionInfo;
 import org.apache.juddi.model.Node;
 import org.apache.juddi.model.Publisher;
 import org.apache.juddi.model.ReplicationConfiguration;
-import org.apache.juddi.model.SubscriptionMatch;
 import org.apache.juddi.model.Tmodel;
 import org.apache.juddi.model.UddiEntityPublisher;
 import org.apache.juddi.subscription.NotificationList;
@@ -89,19 +90,24 @@ import org.apache.juddi.validation.ValidateNode;
 import org.apache.juddi.validation.ValidatePublish;
 import org.apache.juddi.validation.ValidatePublisher;
 import org.apache.juddi.validation.ValidateReplication;
-import org.apache.juddi.validation.ValidateSubscription;
 import org.uddi.api_v3.AuthToken;
 import org.uddi.api_v3.BusinessInfo;
 import org.uddi.api_v3.BusinessInfos;
+import org.uddi.api_v3.Contact;
 import org.uddi.api_v3.DeleteTModel;
 import org.uddi.api_v3.DispositionReport;
 import org.uddi.api_v3.GetRegisteredInfo;
 import org.uddi.api_v3.InfoSelection;
+import org.uddi.api_v3.KeyType;
+import org.uddi.api_v3.PersonName;
 import org.uddi.api_v3.RegisteredInfo;
 import org.uddi.api_v3.Result;
 import org.uddi.api_v3.SaveTModel;
 import org.uddi.api_v3.TModelInfo;
 import org.uddi.api_v3.TModelInfos;
+import org.uddi.repl_v3.CommunicationGraph;
+import org.uddi.repl_v3.Operator;
+import org.uddi.repl_v3.OperatorStatusType;
 import org.uddi.sub_v3.GetSubscriptionResults;
 import org.uddi.sub_v3.Subscription;
 import org.uddi.sub_v3.SubscriptionResultsList;
@@ -119,7 +125,7 @@ import org.uddi.v3_service.UDDISubscriptionPortType;
  */
 @WebService(serviceName = "JUDDIApiService",
         endpointInterface = "org.apache.juddi.v3_service.JUDDIApiPortType",
-        targetNamespace = "urn:juddi-apache-org:v3_service")
+        targetNamespace = "urn:juddi-apache-org:v3_service", wsdlLocation =  "classpath:/juddi_api_v1.wsdl")
 public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortType {
 
         private Log log = LogFactory.getLog(this.getClass());
@@ -786,6 +792,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
          * @throws RemoteException
          */
         @SuppressWarnings("unchecked")
+        @Override
         public SyncSubscriptionDetail invokeSyncSubscription(
                 SyncSubscription body) throws DispositionReportFaultMessage,
                 RemoteException {
@@ -1273,30 +1280,42 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
                         if (!((Publisher) publisher).isAdmin()) {
                                 throw new UserMismatchException(new ErrorMessage("errors.AdminReqd"));
                         }
-                        new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration,em);
+                        new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration, em, node);
 
                         org.apache.juddi.model.ReplicationConfiguration model = null;
                         try {
                                 model = (ReplicationConfiguration) em.createQuery("select c FROM ReplicationConfiguration c order by c.serialNumber desc").getSingleResult();
                         } catch (Exception ex) {
                         }
+                        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ");
                         if (model == null) {
+                                //this is a brand new configuration
                                 model = new ReplicationConfiguration();
-                                MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em);
+                                MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model,em);
                                 model.setSerialNumber(System.currentTimeMillis());
+                                model.setTimeOfConfigurationUpdate(sdf.format(new Date()));
                                 em.persist(model);
+                                //if (newReplicationNode(model)){
+                                //tell the replication notifier to start transfering with
+                                //the first change record
+                                //}
 
                         } else {
-                                //long oldid = model.getSerialNumber();
-                                em.remove(model);
+                                //a config exists, remove it, add the new one
+                                //spec doesn't appear to mention if recording a change history on the config is required
+                                //assuming not.
+                                //em.remove(model);
                                 model = new ReplicationConfiguration();
                                 MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em);
                                 model.setSerialNumber(System.currentTimeMillis());
-                                em.persist(model);
+
+                                model.setTimeOfConfigurationUpdate(sdf.format(new Date()));
+                                em.merge(model);
 
                         }
 
                         tx.commit();
+                        //UDDIReplicationImpl.notifyConfigurationChange(replicationConfiguration);
                         long procTime = System.currentTimeMillis() - startTime;
                         serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES,
                                 QueryStatus.SUCCESS, procTime);
@@ -1305,14 +1324,22 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
                         serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES,
                                 QueryStatus.FAILED, procTime);
                         throw drfm;
-                } finally {
+                } catch (Exception ex){
+                        logger.error(ex,ex);
+                        JAXB.marshal(replicationConfiguration, System.out);
+                        throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
+                }
+                finally {
                         if (tx.isActive()) {
                                 tx.rollback();
                         }
                         em.close();
                 }
+                DispositionReport d = new DispositionReport();
+                Result res = new Result();
 
-                return new DispositionReport();
+                d.getResult().add(res);
+                return d;
         }
 
         @Override
@@ -1335,7 +1362,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
                         sql.toString();
                         Query qry = em.createQuery(sql.toString());
                         qry.setMaxResults(1);
-                       
+
                         org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
                         MappingModelToApi.mapReplicationConfiguration(resultList, r);
                         tx.commit();
@@ -1347,15 +1374,36 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
                         serviceCounter.update(JUDDIQuery.GET_ALL_NODES,
                                 QueryStatus.FAILED, procTime);
                         throw drfm;
-                } catch (Exception ex){
-                //possible that there is no config to return
-                        r.setCommunicationGraph(null);
-                        logger.warn("Error caught, is there a replication config is avaiable?", ex);
+                } catch (Exception ex) {
+                        //possible that there is no config to return
+                        r.setCommunicationGraph(new CommunicationGraph());
+                        Operator op = new Operator();
+                        op.setOperatorNodeID(node);
+                        op.setSoapReplicationURL(baseUrlSSL + "/services/replication");
+                        //TODO lookup from the root business
+                       
+                        op.getContact().add(new Contact());
+                        op.getContact().get(0).getPersonName().add(new PersonName("Unknown", null));
+                        op.setOperatorStatus(OperatorStatusType.NORMAL);
+                        
+                        r.getOperator().add(op);
+                        r.getCommunicationGraph().getNode().add(node);
+                        r.getCommunicationGraph().getControlledMessage().add("*");
+                        logger.warn("Error caught, is there a replication config is avaiable? Returning a default config (no replication): " + ex.getMessage());
+                        logger.debug("Error caught, is there a replication config is avaiable? Returning a default config (no replication): ", ex);
                         long procTime = System.currentTimeMillis() - startTime;
+                        r.setSerialNumber(0);
+                        SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ");
+                        r.setTimeOfConfigurationUpdate(sdf.format(new Date()));
+                        r.setRegistryContact(new org.uddi.repl_v3.ReplicationConfiguration.RegistryContact());
+                        //TODO pull from root business
+                        r.getRegistryContact().setContact(new Contact());
+                        r.getRegistryContact().getContact().getPersonName().add(new PersonName("Unknown", null));
+
                         serviceCounter.update(JUDDIQuery.GET_REPLICATION_NODES,
                                 QueryStatus.FAILED, procTime);
-                
-                }finally {
+
+                } finally {
                         if (tx.isActive()) {
                                 tx.rollback();
                         }
@@ -1364,6 +1412,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
 
                 r.setMaximumTimeToGetChanges(BigInteger.ONE);
                 r.setMaximumTimeToSyncRegistry(BigInteger.ONE);
+                JAXB.marshal(r, System.out);
                 return r;
         }
 

http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
index 15ed7a8..3971e6b 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
@@ -28,11 +28,19 @@ import java.util.TimerTask;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
 import javax.persistence.EntityManager;
 import javax.persistence.EntityTransaction;
 import javax.persistence.Query;
+import javax.xml.bind.JAXB;
+import javax.xml.bind.annotation.XmlSeeAlso;
 import javax.xml.datatype.DatatypeConfigurationException;
 import javax.xml.ws.BindingProvider;
+import javax.xml.ws.RequestWrapper;
+import javax.xml.ws.ResponseWrapper;
 import org.apache.commons.configuration.ConfigurationException;
 import static org.apache.juddi.api.impl.AuthenticatedService.logger;
 import org.apache.juddi.api.util.QueryStatus;
@@ -45,6 +53,7 @@ import org.apache.juddi.mapping.MappingModelToApi;
 import org.apache.juddi.model.BusinessEntity;
 import org.apache.juddi.model.BusinessService;
 import org.apache.juddi.model.Node;
+import org.apache.juddi.model.Operator;
 import org.apache.juddi.model.Tmodel;
 import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
 import org.apache.juddi.v3.client.UDDIService;
@@ -75,562 +84,625 @@ import org.uddi.v3_service.UDDIReplicationPortType;
  * <li>do_ping</li>
  * <li>get_highWaterMarks</li>
  *
- * @author <a href="mailto:alexoree@apache.org">Alex O'Ree<a/>
+ * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
  */
+@WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:repl_v3_portType",
+        endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
+@XmlSeeAlso({
+        org.uddi.custody_v3.ObjectFactory.class,
+        org.uddi.repl_v3.ObjectFactory.class,
+        org.uddi.subr_v3.ObjectFactory.class,
+        org.uddi.api_v3.ObjectFactory.class,
+        org.uddi.vscache_v3.ObjectFactory.class,
+        org.uddi.vs_v3.ObjectFactory.class,
+        org.uddi.sub_v3.ObjectFactory.class,
+        org.w3._2000._09.xmldsig_.ObjectFactory.class,
+        org.uddi.policy_v3.ObjectFactory.class,
+        org.uddi.policy_v3_instanceparms.ObjectFactory.class
+})
 public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
-    
-    private UDDIServiceCounter serviceCounter;
-    
-    private static PullTimerTask timer = null;
-    private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
-    private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
-
-    private static UDDIPublicationImpl pub = null;
-    
-    public UDDIReplicationImpl() {
-        super();
-        if (pub == null) {
-            pub = new UDDIPublicationImpl();
-        }
-        serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
-        Init();
-        try {
-            startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
-            interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
-        } catch (ConfigurationException ex) {
-            logger.fatal(ex);
-        }
-        
-    }
-    
-    private synchronized void Init() {
-        if (queue == null) {
-            queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
+
+        static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) {
+
+                //if the config is different
         }
-        timer = new PullTimerTask();
-        
-    }
-    
-    private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
-        if (changesAlreadySeen != null) {
-            for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
-                if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
-                        && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) {
-                    return true;
+
+        private UDDIServiceCounter serviceCounter;
+
+        private static PullTimerTask timer = null;
+        private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
+        private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
+
+        private static UDDIPublicationImpl pub = null;
+
+        public UDDIReplicationImpl() {
+                super();
+                if (pub == null) {
+                        pub = new UDDIPublicationImpl();
+                }
+                serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
+                Init();
+                try {
+                        startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default 
+                        interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
+                } catch (ConfigurationException ex) {
+                        logger.fatal(ex);
                 }
-            }
+
         }
-        return false;
-    }
-    
-    private class PullTimerTask extends TimerTask {
-        
-        private Timer timer = null;
-        
-        public PullTimerTask() {
-            super();
-            timer = new Timer(true);
-            timer.scheduleAtFixedRate(this, startBuffer, interval);
+
+        private synchronized void Init() {
+                if (queue == null) {
+                        queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
+                }
+                timer = new PullTimerTask();
+
         }
-        
-        @Override
-        public void run() {
-
-            //ok someone told me there's a change available
-            while (!queue.isEmpty()) {
-                NotifyChangeRecordsAvailable poll = queue.poll();
-                if (poll != null) {
-                    UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
-                    try {
-                        //ok now get all the changes
-                        List<ChangeRecord> records
-                                = replicationClient.getChangeRecords(node,
-                                        null, null, poll.getChangesAvailable());
-                        //ok now we need to persist the change records
-                        for (int i = 0; i < records.size(); i++) {
-                            PersistChangeRecord(records.get(i));
+
+        private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
+                if (changesAlreadySeen != null) {
+                        for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
+                                if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
+                                        && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) {
+                                        return true;
+                                }
                         }
-                    } catch (Exception ex) {
-                        logger.equals(ex);
-                    }
                 }
-            }
-        }
-        
-        @Override
-        public boolean cancel() {
-            timer.cancel();
-            return super.cancel();
+                return false;
         }
 
         /**
-         * someone told me there's a change available, we retrieved it and are
-         * processing the changes locally
-         *
-         * @param rec
+         * handles when a remote node tells me that there's an update(s)
+         * available
          */
-        private void PersistChangeRecord(ChangeRecord rec) {
-            if (rec == null) {
-                return;
-            }
-            EntityManager em = PersistenceManager.getEntityManager();
-            EntityTransaction tx = em.getTransaction();
-            /**
-             * In nodes that support pre-bundled replication responses, the
-             * recipient of the get_changeRecords message MAY return more change
-             * records than requested by the caller. In this scenario, the
-             * caller MUST also be prepared to deal with such redundant changes
-             * where a USN is less than the USN specified in the
-             * changesAlreadySeen highWaterMarkVector.
-             */
-            try {
-                tx.begin();
-                //the change record rec must also be persisted!!
-                em.persist(MappingApiToModel.mapChangeRecord(rec));
-                //<editor-fold defaultstate="collapsed" desc="delete a record">
-
-                if (rec.getChangeRecordDelete() != null) {
-                    if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
-                        //delete a binding template
-                        pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
-                    }
-                    if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
-                        //delete a business 
-                        pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
-                    }
-                    if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
-                        //delete a service 
-                        pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
-                    }
-                    if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
-                        //delete a tmodel 
-                        /**
-                         * The changeRecordDelete for a tModel does not
-                         * correspond to any API described in this specification
-                         * and should only appear in the replication stream as
-                         * the result of an administrative function to
-                         * permanently remove a tModel.
-                         */
-                        pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
-                    }
+        private class PullTimerTask extends TimerTask {
+
+                private Timer timer = null;
+
+                public PullTimerTask() {
+                        super();
+                        timer = new Timer(true);
+                        timer.scheduleAtFixedRate(this, startBuffer, interval);
                 }
-                if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
-                    //delete a pa template                            
-                    pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em);
+
+                @Override
+                public void run() {
+
+                        logger.info("Replication change puller thread started. Queue size: " + queue.size());
+                        //ok someone told me there's a change available
+                        while (!queue.isEmpty()) {
+                                NotifyChangeRecordsAvailable poll = queue.poll();
+                                if (poll != null) {
+                                        UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
+                                        if (replicationClient == null) {
+                                                logger.fatal("unable to obtain a replication client to node " + poll.getNotifyingNode());
+                                        }
+                                        try {
+                                                //get the high water marks for this node
+                                                //ok now get all the changes
+                                                logger.info("fetching updates...");
+
+                                                List<ChangeRecord> records
+                                                        = replicationClient.getChangeRecords(node,
+                                                                poll.getChangesAvailable(), BigInteger.valueOf(100), null);
+                                                //ok now we need to persist the change records
+                                                logger.info("Change records retrieved " + records.size());
+                                                for (int i = 0; i < records.size(); i++) {
+                                                        PersistChangeRecord(records.get(i));
+                                                }
+                                        } catch (Exception ex) {
+                                                logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex);
+                                        }
+                                } else {
+                                        logger.warn("weird, popped an object from the queue but it was null.");
+                                }
+                        }
+                }
+
+                @Override
+                public boolean cancel() {
+                        timer.cancel();
+                        return super.cancel();
                 }
 
+                /**
+                 * someone told me there's a change available, we retrieved it
+                 * and are processing the changes locally
+                 *
+                 * @param rec
+                 */
+                private void PersistChangeRecord(ChangeRecord rec) {
+                        if (rec == null) {
+                                return;
+                        }
+                        EntityManager em = PersistenceManager.getEntityManager();
+                        EntityTransaction tx = em.getTransaction();
+                        /**
+                         * In nodes that support pre-bundled replication
+                         * responses, the recipient of the get_changeRecords
+                         * message MAY return more change records than requested
+                         * by the caller. In this scenario, the caller MUST also
+                         * be prepared to deal with such redundant changes where
+                         * a USN is less than the USN specified in the
+                         * changesAlreadySeen highWaterMarkVector.
+                         */
+                        try {
+                                tx.begin();
+                                //the change record rec must also be persisted!!
+                                em.persist(MappingApiToModel.mapChangeRecord(rec));
+                                //<editor-fold defaultstate="collapsed" desc="delete a record">
+
+                                if (rec.getChangeRecordDelete() != null) {
+                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
+                                                //delete a binding template
+                                                pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
+                                                //delete a business 
+                                                pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
+                                                //delete a service 
+                                                pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
+                                        }
+                                        if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
+                                                //delete a tmodel 
+                                                /**
+                                                 * The changeRecordDelete for a
+                                                 * tModel does not correspond to
+                                                 * any API described in this
+                                                 * specification and should only
+                                                 * appear in the replication
+                                                 * stream as the result of an
+                                                 * administrative function to
+                                                 * permanently remove a tModel.
+                                                 */
+                                                pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
+                                        }
+                                }
+                                if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
+                                        //delete a pa template                            
+                                        pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em);
+                                }
+
 //</editor-fold>
-                //<editor-fold defaultstate="collapsed" desc="New Data">
-                if (rec.getChangeRecordNewData() != null) {
-                    
-                    if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
-                        throw new Exception("Inbound replication data is missiong node id!");
-                    }
-
-                    //The operationalInfo element MUST contain the operational information associated with the indicated new data.
-                    if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
-                        logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
-                    } else {
-                        if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
+                                //<editor-fold defaultstate="collapsed" desc="New Data">
+                                if (rec.getChangeRecordNewData() != null) {
+
+                                        if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
+                                                throw new Exception("Inbound replication data is missiong node id!");
+                                        }
+
+                                        //The operationalInfo element MUST contain the operational information associated with the indicated new data.
+                                        if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
+                                                logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
+                                        } else {
+                                                if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
                                                         //fetch the binding template if it exists already
-                            //if it exists, 
-                            //      confirm the owning node, it shouldn't be the local node id, if it is, throw
-                            //      the owning node should be the same as it was before
-
-                            BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
-                            if (model == null) {
-                                logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
-                            } else {
-                                ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
-                                
-                                org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate();
-                                MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model);
-                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-                                em.persist(model);
-                            }
-                            
-                        } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
-                            
-                            BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
-                            if (model == null) {
-                                model = new BusinessEntity();
-                            } else {
-                                ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
-                            }
-                            MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
-                            MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-                            
-                            em.persist(model);
-                            
-                        }
-                        if (rec.getChangeRecordNewData().getBusinessService() != null) {
-                            BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
-                            if (find == null) {
-                                logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
-                            } else {
-                                org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService();
-                                MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
-                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-                                
-                                em.persist(model);
-                            }
-                            
-                        } else if (rec.getChangeRecordNewData().getTModel() != null) {
-                            Tmodel model = new Tmodel();
-                            MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
-                            
-                            MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-                            
-                            em.persist(model);
-                        }
-                        
-                    }
-                    
-                }
+                                                        //if it exists, 
+                                                        //      confirm the owning node, it shouldn't be the local node id, if it is, throw
+                                                        //      the owning node should be the same as it was before
+
+                                                        BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
+                                                        if (model == null) {
+                                                                logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
+                                                        } else {
+                                                                ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                                org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate();
+                                                                MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model);
+                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+                                                                em.persist(model);
+                                                        }
+
+                                                } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
+
+                                                        BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
+                                                        if (model == null) {
+                                                                model = new BusinessEntity();
+                                                        } else {
+                                                                ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
+                                                        }
+                                                        MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
+                                                        MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                        em.persist(model);
+
+                                                }
+                                                if (rec.getChangeRecordNewData().getBusinessService() != null) {
+                                                        BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
+                                                        if (find == null) {
+                                                                logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
+                                                        } else {
+                                                                org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService();
+                                                                MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
+                                                                MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                                em.persist(model);
+                                                        }
+
+                                                } else if (rec.getChangeRecordNewData().getTModel() != null) {
+                                                        Tmodel model = new Tmodel();
+                                                        MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+
+                                                        MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+                                                        em.persist(model);
+                                                }
+
+                                        }
+
+                                }
 //</editor-fold>
 
                                 // changeRecordNull no action needed
-                // changeRecordHide tmodel only
-                //<editor-fold defaultstate="collapsed" desc="hide tmodel">
-                if (rec.getChangeRecordHide() != null) {
-                    /*
-                     A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification.  A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
+                                // changeRecordHide tmodel only
+                                //<editor-fold defaultstate="collapsed" desc="hide tmodel">
+                                if (rec.getChangeRecordHide() != null) {
+                                        /*
+                                         A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification.  A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
                                         
-                     The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
-                     */
-                    String key = rec.getChangeRecordHide().getTModelKey();
-                    org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
-                    if (existing == null) {
-                        logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
-                    } else {
-                        existing.setDeleted(true);
-                        existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
-                        existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
-                        em.persist(existing);
-                    }
-                }
+                                         The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
+                                         */
+                                        String key = rec.getChangeRecordHide().getTModelKey();
+                                        org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
+                                        if (existing == null) {
+                                                logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
+                                        } else {
+                                                existing.setDeleted(true);
+                                                existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+                                                existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+                                                em.persist(existing);
+                                        }
+                                }
 //</editor-fold>
 
-                //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
-                if (rec.getChangeRecordPublisherAssertion() != null) {
+                                //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
+                                if (rec.getChangeRecordPublisherAssertion() != null) {
 //TODO implement
-                }
+                                }
 //</editor-fold>
 
-                tx.commit();
-                
-            } catch (Exception drfm) {
-                logger.warn(drfm);
-            } finally {
-                if (tx.isActive()) {
-                    tx.rollback();
+                                tx.commit();
+
+                        } catch (Exception drfm) {
+                                logger.warn(drfm);
+                        } finally {
+                                if (tx.isActive()) {
+                                        tx.rollback();
+                                }
+                                em.close();
+                        }
                 }
-                em.close();
-            }
-        }
-        
-    }
-    
-    private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception {
-        if (nodeId == null || operationalInfo == null) {
-            throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
-        }
-        if (!nodeId.equals(operationalInfo.getNodeID())) {
-            throw new Exception("node id mismatch!");
+
         }
-    }
-    
-    private synchronized UDDIReplicationPortType getReplicationClient(String node) {
-        if (cache.containsKey(node)) {
-            return cache.get(node);
+
+        private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception {
+                if (nodeId == null || operationalInfo == null) {
+                        throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
+                }
+                if (!nodeId.equals(operationalInfo.getNodeID())) {
+                        throw new Exception("node id mismatch!");
+                }
         }
-        UDDIService svc = new UDDIService();
-        UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
-        EntityManager em = PersistenceManager.getEntityManager();
-        EntityTransaction tx = em.getTransaction();
-        try {
-            Node find = em.find(org.apache.juddi.model.Node.class, node);
-            ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, find.getReplicationUrl());
-            cache.put(node, replicationClient);
-            return replicationClient;
-        } catch (Exception ex) {
-            logger.fatal("Node not found!" + node, ex);
-        } finally {
-            if (tx.isActive()) {
-                tx.rollback();
-            }
-            em.close();
+
+        private synchronized UDDIReplicationPortType getReplicationClient(String node) {
+                if (cache.containsKey(node)) {
+                        return cache.get(node);
+                }
+                UDDIService svc = new UDDIService();
+                UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
+
+                EntityManager em = PersistenceManager.getEntityManager();
+                EntityTransaction tx = em.getTransaction();
+                try {
+                        StringBuilder sql = new StringBuilder();
+                        sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
+                        sql.toString();
+                        Query qry = em.createQuery(sql.toString());
+                        qry.setMaxResults(1);
+
+                        org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
+                        for (Operator o : resultList.getOperator()) {
+                                if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
+                                        ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
+                                        cache.put(node, replicationClient);
+                                        return replicationClient;
+                                }
+                        }
+                        tx.rollback();
+
+                } catch (Exception ex) {
+                        logger.fatal("Node not found!" + node, ex);
+                } finally {
+                        if (tx.isActive()) {
+                                tx.rollback();
+                        }
+                        em.close();
+                }
+                //em.close();
+                return null;
+
         }
-        em.close();
-        return null;
-        
-    }
-    private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
-
-    /**
-     * @since 3.3
-     * @param body
-     * @return
-     * @throws DispositionReportFaultMessage
-     */
-    public String doPing(DoPing body) throws DispositionReportFaultMessage {
-        long startTime = System.currentTimeMillis();
-        long procTime = System.currentTimeMillis() - startTime;
-        serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
-        
-        return node;
-        
-    }
-    
-    @Override
-    public List<ChangeRecord> getChangeRecords(String requestingNode,
-            HighWaterMarkVectorType changesAlreadySeen,
-            BigInteger responseLimitCount,
-            HighWaterMarkVectorType responseLimitVector)
-            throws DispositionReportFaultMessage {
-        long startTime = System.currentTimeMillis();
-        
-        new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
-
-        //TODO should we validate that "requestingNode" is in the replication config?
-        List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
-        EntityManager em = PersistenceManager.getEntityManager();
-        EntityTransaction tx = em.getTransaction();
+        private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
 
         /**
-         * More specifically, the recipient determines the particular change
-         * records that are returned by comparing the originating USNs in the
-         * caller’s high water mark vector with the originating USNs of each of
-         * the changes the recipient has seen from others or generated by
-         * itself. The recipient SHOULD only return change records that have
-         * originating USNs that are greater than those listed in the
-         * changesAlreadySeen highWaterMarkVector and less than the limit
-         * required by either the responseLimitCount or the responseLimitVector.
-         *
-         *
-         * Part of the message is a high water mark vector that contains for
-         * each node of the registry the originating USN of the most recent
-         * change record that has been successfully processed by the invocating
-         * node
+         * @since 3.3
+         * @param body
+         * @return
+         * @throws DispositionReportFaultMessage
          */
-        int maxrecords = 100;
-        if (responseLimitCount != null) {
-            maxrecords = responseLimitCount.intValue();
+        public String doPing(DoPing body) throws DispositionReportFaultMessage {
+                long startTime = System.currentTimeMillis();
+                long procTime = System.currentTimeMillis() - startTime;
+                serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
+
+                return node;
+
         }
-        try {
-            tx.begin();
-            Long firstrecord = 1L;
-            Long lastrecord = null;
-            
-            if (changesAlreadySeen != null) {
-                                //this is basically a lower limit (i.e. the newest record that was processed by the requestor
-                //therefore we want the oldest record stored locally to return to the requestor for processing
-                for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
-                    if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
-                        firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1;
-                    }
+
+        @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3")
+        @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords")
+        @ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords")
+
+        @Override
+        public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode,
+                @WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen,
+                @WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount,
+                @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector)
+                throws DispositionReportFaultMessage {
+                long startTime = System.currentTimeMillis();
+
+                new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
+
+                //TODO should we validate that "requestingNode" is in the replication config?
+                List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
+                EntityManager em = PersistenceManager.getEntityManager();
+                EntityTransaction tx = em.getTransaction();
+
+                /**
+                 * More specifically, the recipient determines the particular
+                 * change records that are returned by comparing the originating
+                 * USNs in the caller’s high water mark vector with the
+                 * originating USNs of each of the changes the recipient has
+                 * seen from others or generated by itself. The recipient SHOULD
+                 * only return change records that have originating USNs that
+                 * are greater than those listed in the changesAlreadySeen
+                 * highWaterMarkVector and less than the limit required by
+                 * either the responseLimitCount or the responseLimitVector.
+                 *
+                 *
+                 * Part of the message is a high water mark vector that contains
+                 * for each node of the registry the originating USN of the most
+                 * recent change record that has been successfully processed by
+                 * the invocating node
+                 */
+                int maxrecords = 100;
+                if (responseLimitCount != null) {
+                        maxrecords = responseLimitCount.intValue();
                 }
-            }
-            if (responseLimitVector != null) {
+                try {
+                        tx.begin();
+                        Long firstrecord = 1L;
+                        Long lastrecord = null;
+
+                        if (changesAlreadySeen != null) {
+                                //this is basically a lower limit (i.e. the newest record that was processed by the requestor
+                                //therefore we want the oldest record stored locally to return to the requestor for processing
+                                for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
+                                        if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) {
+                                                firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1;
+                                        }
+                                }
+                        }
+                        if (responseLimitVector != null) {
                                 //using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
-                //upper limit basically
-                for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
-                    if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
-                        lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
-                    }
-                }
-            }
-            
-            Query createQuery = null;
-            if (lastrecord != null) {
-                createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node and e.id < :lastrecord) OR (e.originatingUSN > :inbound and e.nodeID != :node and e.originatingUSN < :lastrecord) order by e.id ASC");
-                createQuery.setParameter("lastrecord", lastrecord);
-            } else {
-                createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node) OR (e.originatingUSN > :inbound and e.nodeID != :node) order by e.id ASC");
-            }
-            createQuery.setMaxResults(maxrecords);
-            createQuery.setParameter("inbound", firstrecord);
-            createQuery.setParameter("node", node);
-            
-            List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) (org.apache.juddi.model.ChangeRecord) createQuery.getResultList();
-            for (int i = 0; i < records.size(); i++) {
-                ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
-                if (!Excluded(changesAlreadySeen, r)) {
-                    ret.add(r);
+                                //upper limit basically
+                                for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
+                                        if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
+                                                lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
+                                        }
+                                }
+                        }
+
+                        Query createQuery = null;
+                        if (lastrecord != null) {
+                                createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node and e.id < :lastrecord) OR "
+                                        + "(e.originatingUSN > :inbound and e.nodeID <> :node and e.originatingUSN < :lastrecord)) order by e.id ASC");
+                                createQuery.setParameter("lastrecord", lastrecord);
+                        } else {
+                                createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node) OR "
+                                        + "(e.originatingUSN > :inbound and e.nodeID <> :node)) order by e.id ASC");
+                        }
+                        createQuery.setMaxResults(maxrecords);
+                        createQuery.setParameter("inbound", firstrecord);
+                        createQuery.setParameter("node", node);
+
+                        List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
+                        for (int i = 0; i < records.size(); i++) {
+                                ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
+                                if (!Excluded(changesAlreadySeen, r)) {
+                                        ret.add(r);
+                                }
+
+                        }
+
+                        tx.rollback();
+                        long procTime = System.currentTimeMillis() - startTime;
+                        serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
+                                QueryStatus.SUCCESS, procTime);
+
+                } catch (Exception ex) {
+                        logger.fatal("Error, this node is: " + node, ex);
+                        throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
+
+                } finally {
+                        if (tx.isActive()) {
+                                tx.rollback();
+                        }
+                        em.close();
                 }
-                
-            }
-            
-            tx.rollback();
-            long procTime = System.currentTimeMillis() - startTime;
-            serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
-                    QueryStatus.SUCCESS, procTime);
-            
-        } catch (Exception ex) {
-            logger.fatal("Error, this node is: " + node, ex);
-            throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
-            
-        } finally {
-            if (tx.isActive()) {
-                tx.rollback();
-            }
-            em.close();
+                logger.info("Change records returned for " + requestingNode + ": " + ret.size());
+                //JAXB.marshal(ret, System.out);
+                return ret;
         }
-        return ret;
-    }
-
-    /**
-     * This UDDI API message provides a means to obtain a list of highWaterMark
-     * element containing the highest known USN for all nodes in the replication
-     * graph. If there is no graph, we just return the local bits
-     *
-     * @return
-     * @throws DispositionReportFaultMessage
-     */
-    @Override
-    public List<ChangeRecordIDType> getHighWaterMarks()
-            throws DispositionReportFaultMessage {
-        long startTime = System.currentTimeMillis();
-        
-        List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
-
-        //fetch from database the highest known watermark
-        ReplicationConfiguration FetchEdges = FetchEdges();
-        
-        EntityManager em = PersistenceManager.getEntityManager();
-        EntityTransaction tx = em.getTransaction();
-        try {
-            tx.begin();
-            if (FetchEdges != null) {
-                Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
-                while (it.hasNext()) {
-                    String nextNode = it.next();
-                    if (!nextNode.equals(node)) {
-                        
-                        Long id = 0L;
-                        try {
-                            id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
-                        } catch (Exception ex) {
-                            logger.debug(ex);
+
+        /**
+         * This UDDI API message provides a means to obtain a list of
+         * highWaterMark element containing the highest known USN for all nodes
+         * in the replication graph. If there is no graph, we just return the
+         * local bits
+         *
+         * @return
+         * @throws DispositionReportFaultMessage
+         */
+        @Override
+        public List<ChangeRecordIDType> getHighWaterMarks()
+                throws DispositionReportFaultMessage {
+                long startTime = System.currentTimeMillis();
+
+                List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
+
+                //fetch from database the highest known watermark
+                ReplicationConfiguration FetchEdges = FetchEdges();
+
+                EntityManager em = PersistenceManager.getEntityManager();
+                EntityTransaction tx = em.getTransaction();
+                try {
+                        tx.begin();
+                        if (FetchEdges != null) {
+                                Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
+                                while (it.hasNext()) {
+                                        String nextNode = it.next();
+                                        if (!nextNode.equals(node)) {
+
+                                                Long id = 0L;
+                                                try {
+                                                        id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
+                                                } catch (Exception ex) {
+                                                        logger.debug(ex);
+                                                }
+                                                if (id == null) {
+                                                        id = 0L;
+                                                        //per the spec
+                                                }
+                                                ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id);
+
+                                                ret.add(x);
+
+                                        }
+                                }
                         }
+                        //dont forget this node
+                        Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node  order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult();
                         if (id == null) {
-                            id = 0L;
-                            //per the spec
+                                id = 0L;
                         }
-                        ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id);
-                        
+                        ChangeRecordIDType x = new ChangeRecordIDType();
+                        x.setNodeID(node);
+                        x.setOriginatingUSN(id);
                         ret.add(x);
-                        
-                    }
+
+                        tx.rollback();
+                        long procTime = System.currentTimeMillis() - startTime;
+                        serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
+
+                } catch (Exception drfm) {
+                        throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
+
+                } finally {
+                        if (tx.isActive()) {
+                                tx.rollback();
+                        }
+                        em.close();
                 }
-            }
-            //dont forget this node
-            Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node  order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult();
-            if (id == null) {
-                id = 0L;
-            }
-            ChangeRecordIDType x = new ChangeRecordIDType();
-            x.setNodeID(node);
-            x.setOriginatingUSN(id);
-            ret.add(x);
-            
-            tx.rollback();
-            long procTime = System.currentTimeMillis() - startTime;
-            serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
-            
-        } catch (Exception drfm) {
-            throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
-            
-        } finally {
-            if (tx.isActive()) {
-                tx.rollback();
-            }
-            em.close();
+
+                return ret;
         }
-        
-        
-        
-        return ret;
-    }
-
-    /**
-     * this means that another node has a change and we need to pick up the
-     * change and apply it to our local database.
-     *
-     * @param body
-     * @throws DispositionReportFaultMessage
-     */
-    @Override
-    public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
-            throws DispositionReportFaultMessage {
-        long startTime = System.currentTimeMillis();
-        long procTime = System.currentTimeMillis() - startTime;
-        serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
-                QueryStatus.SUCCESS, procTime);
+
+        /**
+         * this means that another node has a change and we need to pick up the
+         * change and apply it to our local database.
+         *
+         * @param body
+         * @throws DispositionReportFaultMessage
+         */
+        @Override
+        public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
+                throws DispositionReportFaultMessage {
+                long startTime = System.currentTimeMillis();
+                long procTime = System.currentTimeMillis() - startTime;
+                serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
+                        QueryStatus.SUCCESS, procTime);
                 //some other node just told us there's new records available, call
-        //getChangeRecords from the remote node asynch
-
-        new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
-        
-        queue.add(body);
-
-        //ValidateReplication.unsupportedAPICall();
-    }
-    private static Queue<NotifyChangeRecordsAvailable> queue = null;
-
-    /**
-     * transfers custody of an entity from node1/user1 to node2/user2
-     *
-     * @param body
-     * @throws DispositionReportFaultMessage
-     */
-    @Override
-    public void transferCustody(TransferCustody body)
-            throws DispositionReportFaultMessage {
-        long startTime = System.currentTimeMillis();
+                //getChangeRecords from the remote node asynch
 
-                //*this node is transfering data to another node
-        //body.getTransferOperationalInfo().
-        ValidateReplication.unsupportedAPICall();
-        
-        EntityManager em = PersistenceManager.getEntityManager();
-        //EntityTransaction tx = em.getTransaction();
+                new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
+
+                logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...");
+                queue.add(body);
+
+                //ValidateReplication.unsupportedAPICall();
+        }
+        private static Queue<NotifyChangeRecordsAvailable> queue = null;
 
         /**
-         * The custodial node must verify that it has granted permission to
-         * transfer the entities identified and that this permission is still
-         * valid. This operation is comprised of two steps:
-         *
-         * 1. Verification that the transferToken was issued by it, that it has
-         * not expired, that it represents the authority to transfer no more and
-         * no less than those entities identified by the businessKey and
-         * tModelKey elements and that all these entities are still valid and
-         * not yet transferred. The transferToken is invalidated if any of these
-         * conditions are not met.
-         *
-         * 2. If the conditions above are met, the custodial node will prevent
-         * any further changes to the entities identified by the businessKey and
-         * tModelKey elements identified. The entity will remain in this state
-         * until the replication stream indicates it has been successfully
-         * processed via the replication stream. Upon successful verification of
-         * the custody transfer request by the custodial node, an empty message
-         * is returned by it indicating the success of the request and
-         * acknowledging the custody transfer. Following the issue of the empty
-         * message, the custodial node will submit into the replication stream a
-         * changeRecordNewData providing in the operationalInfo, the nodeID
-         * accepting custody of the datum and the authorizedName of the
-         * publisher accepting ownership. The acknowledgmentRequested attribute
-         * of this change record MUST be set to "true".
+         * transfers custody of an entity from node1/user1 to node2/user2
          *
-         * TODO enqueue Replication message
-         *
-         * Finally, the custodial node invalidates the transferToken in order to
-         * prevent additional calls of the transfer_entities API.
+         * @param body
+         * @throws DispositionReportFaultMessage
          */
-        DiscardTransferToken dtt = new DiscardTransferToken();
-        dtt.setKeyBag(body.getKeyBag());
-        dtt.setTransferToken(body.getTransferToken());
-        new UDDICustodyTransferImpl().discardTransferToken(dtt);
-    }
-    
+        @Override
+        public void transferCustody(TransferCustody body)
+                throws DispositionReportFaultMessage {
+                long startTime = System.currentTimeMillis();
+
+                //*this node is transfering data to another node
+                //body.getTransferOperationalInfo().
+                ValidateReplication.unsupportedAPICall();
+
+                EntityManager em = PersistenceManager.getEntityManager();
+                //EntityTransaction tx = em.getTransaction();
+
+                /**
+                 * The custodial node must verify that it has granted permission
+                 * to transfer the entities identified and that this permission
+                 * is still valid. This operation is comprised of two steps:
+                 *
+                 * 1. Verification that the transferToken was issued by it, that
+                 * it has not expired, that it represents the authority to
+                 * transfer no more and no less than those entities identified
+                 * by the businessKey and tModelKey elements and that all these
+                 * entities are still valid and not yet transferred. The
+                 * transferToken is invalidated if any of these conditions are
+                 * not met.
+                 *
+                 * 2. If the conditions above are met, the custodial node will
+                 * prevent any further changes to the entities identified by the
+                 * businessKey and tModelKey elements identified. The entity
+                 * will remain in this state until the replication stream
+                 * indicates it has been successfully processed via the
+                 * replication stream. Upon successful verification of the
+                 * custody transfer request by the custodial node, an empty
+                 * message is returned by it indicating the success of the
+                 * request and acknowledging the custody transfer. Following the
+                 * issue of the empty message, the custodial node will submit
+                 * into the replication stream a changeRecordNewData providing
+                 * in the operationalInfo, the nodeID accepting custody of the
+                 * datum and the authorizedName of the publisher accepting
+                 * ownership. The acknowledgmentRequested attribute of this
+                 * change record MUST be set to "true".
+                 *
+                 * TODO enqueue Replication message
+                 *
+                 * Finally, the custodial node invalidates the transferToken in
+                 * order to prevent additional calls of the transfer_entities
+                 * API.
+                 */
+                DiscardTransferToken dtt = new DiscardTransferToken();
+                dtt.setKeyBag(body.getKeyBag());
+                dtt.setTransferToken(body.getTransferToken());
+                new UDDICustodyTransferImpl().discardTransferToken(dtt);
+        }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@juddi.apache.org
For additional commands, e-mail: commits-help@juddi.apache.org