You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@juddi.apache.org by al...@apache.org on 2014/12/09 04:13:38 UTC
[17/28] juddi git commit: JUDDI-241 get/set replication cfg now works
for all sub elements. notify changes works,
however getChangeRecords fails when called from within the tomcat container.
JUDDI-241 get/set replication cfg now works for all sub elements. notify changes works, however getChangeRecords fails when called from within the tomcat container.
Project: http://git-wip-us.apache.org/repos/asf/juddi/repo
Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/22a846dd
Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/22a846dd
Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/22a846dd
Branch: refs/heads/master
Commit: 22a846ddb5fe2c79b23967a81899c214312fc2be
Parents: 03dce36
Author: Alex <al...@apache.org>
Authored: Sun Nov 23 19:52:30 2014 -0500
Committer: Alex <al...@apache.org>
Committed: Sun Nov 23 19:52:30 2014 -0500
----------------------------------------------------------------------
.../src/test/resources/META-INF/persistence.xml | 3 +-
.../juddi/api/impl/AuthenticatedService.java | 5 +
.../org/apache/juddi/api/impl/JUDDIApiImpl.java | 83 +-
.../juddi/api/impl/UDDIReplicationImpl.java | 1088 ++++++++++--------
.../apache/juddi/mapping/MappingApiToModel.java | 333 +++---
.../apache/juddi/mapping/MappingModelToApi.java | 27 +-
.../java/org/apache/juddi/model/Contact.java | 11 +
.../org/apache/juddi/model/ControlMessage.java | 30 +
.../main/java/org/apache/juddi/model/Edge.java | 41 +-
.../juddi/model/EdgeReceiverAlternate.java | 68 ++
.../main/java/org/apache/juddi/model/Node.java | 328 +++---
.../java/org/apache/juddi/model/Operator.java | 31 +-
.../juddi/model/ReplicationConfiguration.java | 46 +-
.../model/ReplicationConfigurationNode.java | 72 ++
.../juddi/replication/ReplicationNotifier.java | 84 +-
.../apache/juddi/validation/ValidateNode.java | 5 +-
.../juddi/validation/ValidateReplication.java | 49 +-
.../src/main/resources/messages.properties | 4 +-
.../juddi/api/impl/API_160_ReplicationTest.java | 138 ++-
.../apache/juddi/api/runtime/CLIServerTest.java | 119 ++
.../apache/juddi/api/runtime/juddiTestimpl.java | 188 +++
.../apache/juddi/api/runtime/replicantImpl.java | 70 ++
.../replication/ReplicationNotifierTest.java | 5 -
.../src/test/resources/META-INF/persistence.xml | 3 +
juddi-core/src/test/resources/META-INF/uddi.xml | 24 +
.../src/main/resources/META-INF/persistence.xml | 3 +-
.../org/apache/juddi/samples/EntryPoint.java | 89 +-
.../apache/juddi/samples/JuddiAdminService.java | 237 +++-
.../apache/juddi/samples/UddiReplication.java | 4 +-
.../resources/META-INF/simple-publish-uddi.xml | 27 +-
.../src/test/resources/META-INF/persistence.xml | 3 +-
.../WEB-INF/classes/META-INF/persistence.xml | 2 +
.../WEB-INF/classes/META-INF/persistence.xml | 2 +
.../WEB-INF/classes/META-INF/persistence.xml | 2 +
.../WEB-INF/classes/META-INF/persistence.xml | 3 +
pom.xml | 2 +-
uddi-ws/pom.xml | 73 +-
.../java/org/apache/juddi/repl_v3/EdgeExt.java | 36 -
.../main/java/org/uddi/api_v3/GetAuthToken.java | 128 ++-
.../org/uddi/repl_v3/CommunicationGraph.java | 1 +
.../main/java/org/uddi/repl_v3/Operator.java | 300 ++---
.../uddi/repl_v3/ReplicationConfiguration.java | 17 +-
.../v3_service/UDDIReplicationPortType.java | 38 +-
.../juddi/api_v3/GetPublisherDetailTest.java | 33 +
44 files changed, 2516 insertions(+), 1339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
index 7bf55b0..f42c65f 100644
--- a/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
+++ b/juddi-core-openjpa/src/test/resources/META-INF/persistence.xml
@@ -78,7 +78,8 @@
<class>org.apache.juddi.model.ReplicationConfiguration</class>
<class>org.apache.juddi.model.Edge</class>
<class>org.apache.juddi.model.ControlMessage</class>
- <class>org.apache.juddi.model.ReplicationMessage</class>
+ <class>org.apache.juddi.model.ReplicationConfigurationNode</class>
+ <class>org.apache.juddi.model.EdgeReceiverAlternate</class>
<properties>
<property name="openjpa.jdbc.SynchronizeMappings" value="buildSchema(SchemaAction='dropDB,add')"/>
http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
index d69afbf..767cbee 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/AuthenticatedService.java
@@ -50,10 +50,15 @@ public abstract class AuthenticatedService {
public static final int AUTHTOKEN_RETIRED = 0;
static final Log logger = LogFactory.getLog(AuthenticatedService.class);
protected String node = "UNDEFINED_NODE_NAME";
+ protected String baseUrlSSL="UNDEFINED";
+ protected String baseUrl="UNDEFINED";
public AuthenticatedService(){
try {
node = AppConfig.getConfiguration().getString(Property.JUDDI_NODE_ID, "UNDEFINED_NODE_NAME");
+ node=node.trim();
+ baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL_SECURE, Property.DEFAULT_BASE_URL_SECURE);
+ baseUrlSSL=AppConfig.getConfiguration().getString(Property.JUDDI_BASE_URL, Property.DEFAULT_BASE_URL);
} catch (ConfigurationException ex) {
logger.fatal(null, ex);
}
http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
index 268616e..dd201fe 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/JUDDIApiImpl.java
@@ -19,6 +19,7 @@ package org.apache.juddi.api.impl;
import java.io.StringWriter;
import java.math.BigInteger;
import java.rmi.RemoteException;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -30,6 +31,7 @@ import javax.jws.WebService;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
+import javax.xml.bind.JAXB;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Marshaller;
import javax.xml.ws.Holder;
@@ -73,7 +75,6 @@ import org.apache.juddi.model.ClientSubscriptionInfo;
import org.apache.juddi.model.Node;
import org.apache.juddi.model.Publisher;
import org.apache.juddi.model.ReplicationConfiguration;
-import org.apache.juddi.model.SubscriptionMatch;
import org.apache.juddi.model.Tmodel;
import org.apache.juddi.model.UddiEntityPublisher;
import org.apache.juddi.subscription.NotificationList;
@@ -89,19 +90,24 @@ import org.apache.juddi.validation.ValidateNode;
import org.apache.juddi.validation.ValidatePublish;
import org.apache.juddi.validation.ValidatePublisher;
import org.apache.juddi.validation.ValidateReplication;
-import org.apache.juddi.validation.ValidateSubscription;
import org.uddi.api_v3.AuthToken;
import org.uddi.api_v3.BusinessInfo;
import org.uddi.api_v3.BusinessInfos;
+import org.uddi.api_v3.Contact;
import org.uddi.api_v3.DeleteTModel;
import org.uddi.api_v3.DispositionReport;
import org.uddi.api_v3.GetRegisteredInfo;
import org.uddi.api_v3.InfoSelection;
+import org.uddi.api_v3.KeyType;
+import org.uddi.api_v3.PersonName;
import org.uddi.api_v3.RegisteredInfo;
import org.uddi.api_v3.Result;
import org.uddi.api_v3.SaveTModel;
import org.uddi.api_v3.TModelInfo;
import org.uddi.api_v3.TModelInfos;
+import org.uddi.repl_v3.CommunicationGraph;
+import org.uddi.repl_v3.Operator;
+import org.uddi.repl_v3.OperatorStatusType;
import org.uddi.sub_v3.GetSubscriptionResults;
import org.uddi.sub_v3.Subscription;
import org.uddi.sub_v3.SubscriptionResultsList;
@@ -119,7 +125,7 @@ import org.uddi.v3_service.UDDISubscriptionPortType;
*/
@WebService(serviceName = "JUDDIApiService",
endpointInterface = "org.apache.juddi.v3_service.JUDDIApiPortType",
- targetNamespace = "urn:juddi-apache-org:v3_service")
+ targetNamespace = "urn:juddi-apache-org:v3_service", wsdlLocation = "classpath:/juddi_api_v1.wsdl")
public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortType {
private Log log = LogFactory.getLog(this.getClass());
@@ -786,6 +792,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
* @throws RemoteException
*/
@SuppressWarnings("unchecked")
+ @Override
public SyncSubscriptionDetail invokeSyncSubscription(
SyncSubscription body) throws DispositionReportFaultMessage,
RemoteException {
@@ -1273,30 +1280,42 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
if (!((Publisher) publisher).isAdmin()) {
throw new UserMismatchException(new ErrorMessage("errors.AdminReqd"));
}
- new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration,em);
+ new ValidateReplication(publisher).validateSetReplicationNodes(replicationConfiguration, em, node);
org.apache.juddi.model.ReplicationConfiguration model = null;
try {
model = (ReplicationConfiguration) em.createQuery("select c FROM ReplicationConfiguration c order by c.serialNumber desc").getSingleResult();
} catch (Exception ex) {
}
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ");
if (model == null) {
+ //this is a brand new configuration
model = new ReplicationConfiguration();
- MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em);
+ MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model,em);
model.setSerialNumber(System.currentTimeMillis());
+ model.setTimeOfConfigurationUpdate(sdf.format(new Date()));
em.persist(model);
+ //if (newReplicationNode(model)){
+ //tell the replication notifier to start transfering with
+ //the first change record
+ //}
} else {
- //long oldid = model.getSerialNumber();
- em.remove(model);
+ //a config exists, remove it, add the new one
+ //spec doesn't appear to mention if recording a change history on the config is required
+ //assuming not.
+ //em.remove(model);
model = new ReplicationConfiguration();
MappingApiToModel.mapReplicationConfiguration(replicationConfiguration, model, em);
model.setSerialNumber(System.currentTimeMillis());
- em.persist(model);
+
+ model.setTimeOfConfigurationUpdate(sdf.format(new Date()));
+ em.merge(model);
}
tx.commit();
+ //UDDIReplicationImpl.notifyConfigurationChange(replicationConfiguration);
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES,
QueryStatus.SUCCESS, procTime);
@@ -1305,14 +1324,22 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
serviceCounter.update(JUDDIQuery.SET_REPLICATION_NODES,
QueryStatus.FAILED, procTime);
throw drfm;
- } finally {
+ } catch (Exception ex){
+ logger.error(ex,ex);
+ JAXB.marshal(replicationConfiguration, System.out);
+ throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
+ }
+ finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
+ DispositionReport d = new DispositionReport();
+ Result res = new Result();
- return new DispositionReport();
+ d.getResult().add(res);
+ return d;
}
@Override
@@ -1335,7 +1362,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
sql.toString();
Query qry = em.createQuery(sql.toString());
qry.setMaxResults(1);
-
+
org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
MappingModelToApi.mapReplicationConfiguration(resultList, r);
tx.commit();
@@ -1347,15 +1374,36 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
serviceCounter.update(JUDDIQuery.GET_ALL_NODES,
QueryStatus.FAILED, procTime);
throw drfm;
- } catch (Exception ex){
- //possible that there is no config to return
- r.setCommunicationGraph(null);
- logger.warn("Error caught, is there a replication config is avaiable?", ex);
+ } catch (Exception ex) {
+ //possible that there is no config to return
+ r.setCommunicationGraph(new CommunicationGraph());
+ Operator op = new Operator();
+ op.setOperatorNodeID(node);
+ op.setSoapReplicationURL(baseUrlSSL + "/services/replication");
+ //TODO lookup from the root business
+
+ op.getContact().add(new Contact());
+ op.getContact().get(0).getPersonName().add(new PersonName("Unknown", null));
+ op.setOperatorStatus(OperatorStatusType.NORMAL);
+
+ r.getOperator().add(op);
+ r.getCommunicationGraph().getNode().add(node);
+ r.getCommunicationGraph().getControlledMessage().add("*");
+ logger.warn("Error caught, is there a replication config is avaiable? Returning a default config (no replication): " + ex.getMessage());
+ logger.debug("Error caught, is there a replication config is avaiable? Returning a default config (no replication): ", ex);
long procTime = System.currentTimeMillis() - startTime;
+ r.setSerialNumber(0);
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddkkmmZ");
+ r.setTimeOfConfigurationUpdate(sdf.format(new Date()));
+ r.setRegistryContact(new org.uddi.repl_v3.ReplicationConfiguration.RegistryContact());
+ //TODO pull from root business
+ r.getRegistryContact().setContact(new Contact());
+ r.getRegistryContact().getContact().getPersonName().add(new PersonName("Unknown", null));
+
serviceCounter.update(JUDDIQuery.GET_REPLICATION_NODES,
QueryStatus.FAILED, procTime);
-
- }finally {
+
+ } finally {
if (tx.isActive()) {
tx.rollback();
}
@@ -1364,6 +1412,7 @@ public class JUDDIApiImpl extends AuthenticatedService implements JUDDIApiPortTy
r.setMaximumTimeToGetChanges(BigInteger.ONE);
r.setMaximumTimeToSyncRegistry(BigInteger.ONE);
+ JAXB.marshal(r, System.out);
return r;
}
http://git-wip-us.apache.org/repos/asf/juddi/blob/22a846dd/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
index 15ed7a8..3971e6b 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
@@ -28,11 +28,19 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.logging.Level;
import java.util.logging.Logger;
+import javax.jws.WebMethod;
+import javax.jws.WebParam;
+import javax.jws.WebResult;
+import javax.jws.WebService;
import javax.persistence.EntityManager;
import javax.persistence.EntityTransaction;
import javax.persistence.Query;
+import javax.xml.bind.JAXB;
+import javax.xml.bind.annotation.XmlSeeAlso;
import javax.xml.datatype.DatatypeConfigurationException;
import javax.xml.ws.BindingProvider;
+import javax.xml.ws.RequestWrapper;
+import javax.xml.ws.ResponseWrapper;
import org.apache.commons.configuration.ConfigurationException;
import static org.apache.juddi.api.impl.AuthenticatedService.logger;
import org.apache.juddi.api.util.QueryStatus;
@@ -45,6 +53,7 @@ import org.apache.juddi.mapping.MappingModelToApi;
import org.apache.juddi.model.BusinessEntity;
import org.apache.juddi.model.BusinessService;
import org.apache.juddi.model.Node;
+import org.apache.juddi.model.Operator;
import org.apache.juddi.model.Tmodel;
import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
import org.apache.juddi.v3.client.UDDIService;
@@ -75,562 +84,625 @@ import org.uddi.v3_service.UDDIReplicationPortType;
* <li>do_ping</li>
* <li>get_highWaterMarks</li>
*
- * @author <a href="mailto:alexoree@apache.org">Alex O'Ree<a/>
+ * @author <a href="mailto:alexoree@apache.org">Alex O'Ree</a>
*/
+@WebService(serviceName = "UDDI_Replication_PortType", targetNamespace = "urn:uddi-org:repl_v3_portType",
+ endpointInterface = "org.uddi.v3_service.UDDIReplicationPortType")
+@XmlSeeAlso({
+ org.uddi.custody_v3.ObjectFactory.class,
+ org.uddi.repl_v3.ObjectFactory.class,
+ org.uddi.subr_v3.ObjectFactory.class,
+ org.uddi.api_v3.ObjectFactory.class,
+ org.uddi.vscache_v3.ObjectFactory.class,
+ org.uddi.vs_v3.ObjectFactory.class,
+ org.uddi.sub_v3.ObjectFactory.class,
+ org.w3._2000._09.xmldsig_.ObjectFactory.class,
+ org.uddi.policy_v3.ObjectFactory.class,
+ org.uddi.policy_v3_instanceparms.ObjectFactory.class
+})
public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
-
- private UDDIServiceCounter serviceCounter;
-
- private static PullTimerTask timer = null;
- private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
- private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
-
- private static UDDIPublicationImpl pub = null;
-
- public UDDIReplicationImpl() {
- super();
- if (pub == null) {
- pub = new UDDIPublicationImpl();
- }
- serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
- Init();
- try {
- startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
- interval = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
- } catch (ConfigurationException ex) {
- logger.fatal(ex);
- }
-
- }
-
- private synchronized void Init() {
- if (queue == null) {
- queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
+
+ static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) {
+
+ //if the config is different
}
- timer = new PullTimerTask();
-
- }
-
- private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
- if (changesAlreadySeen != null) {
- for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
- if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
- && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) {
- return true;
+
+ private UDDIServiceCounter serviceCounter;
+
+ private static PullTimerTask timer = null;
+ private long startBuffer = 20000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
+ private long interval = 300000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
+
+ private static UDDIPublicationImpl pub = null;
+
+ public UDDIReplicationImpl() {
+ super();
+ if (pub == null) {
+ pub = new UDDIPublicationImpl();
+ }
+ serviceCounter = ServiceCounterLifecycleResource.getServiceCounter(UDDIReplicationImpl.class);
+ Init();
+ try {
+ startBuffer = AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
+ interval = 5000;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
+ } catch (ConfigurationException ex) {
+ logger.fatal(ex);
}
- }
+
}
- return false;
- }
-
- private class PullTimerTask extends TimerTask {
-
- private Timer timer = null;
-
- public PullTimerTask() {
- super();
- timer = new Timer(true);
- timer.scheduleAtFixedRate(this, startBuffer, interval);
+
+ private synchronized void Init() {
+ if (queue == null) {
+ queue = new ConcurrentLinkedDeque<NotifyChangeRecordsAvailable>();
+ }
+ timer = new PullTimerTask();
+
}
-
- @Override
- public void run() {
-
- //ok someone told me there's a change available
- while (!queue.isEmpty()) {
- NotifyChangeRecordsAvailable poll = queue.poll();
- if (poll != null) {
- UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
- try {
- //ok now get all the changes
- List<ChangeRecord> records
- = replicationClient.getChangeRecords(node,
- null, null, poll.getChangesAvailable());
- //ok now we need to persist the change records
- for (int i = 0; i < records.size(); i++) {
- PersistChangeRecord(records.get(i));
+
+ private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
+ if (changesAlreadySeen != null) {
+ for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
+ if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(r.getChangeID().getNodeID())
+ && changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN().equals(r.getChangeID().getOriginatingUSN())) {
+ return true;
+ }
}
- } catch (Exception ex) {
- logger.equals(ex);
- }
}
- }
- }
-
- @Override
- public boolean cancel() {
- timer.cancel();
- return super.cancel();
+ return false;
}
/**
- * someone told me there's a change available, we retrieved it and are
- * processing the changes locally
- *
- * @param rec
+ * handles when a remote node tells me that there's an update(s)
+ * available
*/
- private void PersistChangeRecord(ChangeRecord rec) {
- if (rec == null) {
- return;
- }
- EntityManager em = PersistenceManager.getEntityManager();
- EntityTransaction tx = em.getTransaction();
- /**
- * In nodes that support pre-bundled replication responses, the
- * recipient of the get_changeRecords message MAY return more change
- * records than requested by the caller. In this scenario, the
- * caller MUST also be prepared to deal with such redundant changes
- * where a USN is less than the USN specified in the
- * changesAlreadySeen highWaterMarkVector.
- */
- try {
- tx.begin();
- //the change record rec must also be persisted!!
- em.persist(MappingApiToModel.mapChangeRecord(rec));
- //<editor-fold defaultstate="collapsed" desc="delete a record">
-
- if (rec.getChangeRecordDelete() != null) {
- if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
- //delete a binding template
- pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
- }
- if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
- //delete a business
- pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
- }
- if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
- //delete a service
- pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
- }
- if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
- //delete a tmodel
- /**
- * The changeRecordDelete for a tModel does not
- * correspond to any API described in this specification
- * and should only appear in the replication stream as
- * the result of an administrative function to
- * permanently remove a tModel.
- */
- pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
- }
+ private class PullTimerTask extends TimerTask {
+
+ private Timer timer = null;
+
+ public PullTimerTask() {
+ super();
+ timer = new Timer(true);
+ timer.scheduleAtFixedRate(this, startBuffer, interval);
}
- if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
- //delete a pa template
- pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em);
+
+ @Override
+ public void run() {
+
+ logger.info("Replication change puller thread started. Queue size: " + queue.size());
+ //ok someone told me there's a change available
+ while (!queue.isEmpty()) {
+ NotifyChangeRecordsAvailable poll = queue.poll();
+ if (poll != null) {
+ UDDIReplicationPortType replicationClient = getReplicationClient(poll.getNotifyingNode());
+ if (replicationClient == null) {
+ logger.fatal("unable to obtain a replication client to node " + poll.getNotifyingNode());
+ }
+ try {
+ //get the high water marks for this node
+ //ok now get all the changes
+ logger.info("fetching updates...");
+
+ List<ChangeRecord> records
+ = replicationClient.getChangeRecords(node,
+ poll.getChangesAvailable(), BigInteger.valueOf(100), null);
+ //ok now we need to persist the change records
+ logger.info("Change records retrieved " + records.size());
+ for (int i = 0; i < records.size(); i++) {
+ PersistChangeRecord(records.get(i));
+ }
+ } catch (Exception ex) {
+ logger.error("Error caught fetching replication changes from " + poll.getNotifyingNode(), ex);
+ }
+ } else {
+ logger.warn("weird, popped an object from the queue but it was null.");
+ }
+ }
+ }
+
+ @Override
+ public boolean cancel() {
+ timer.cancel();
+ return super.cancel();
}
+ /**
+ * someone told me there's a change available, we retrieved it
+ * and are processing the changes locally
+ *
+ * @param rec
+ */
+ private void PersistChangeRecord(ChangeRecord rec) {
+ if (rec == null) {
+ return;
+ }
+ EntityManager em = PersistenceManager.getEntityManager();
+ EntityTransaction tx = em.getTransaction();
+ /**
+ * In nodes that support pre-bundled replication
+ * responses, the recipient of the get_changeRecords
+ * message MAY return more change records than requested
+ * by the caller. In this scenario, the caller MUST also
+ * be prepared to deal with such redundant changes where
+ * a USN is less than the USN specified in the
+ * changesAlreadySeen highWaterMarkVector.
+ */
+ try {
+ tx.begin();
+ //the change record rec must also be persisted!!
+ em.persist(MappingApiToModel.mapChangeRecord(rec));
+ //<editor-fold defaultstate="collapsed" desc="delete a record">
+
+ if (rec.getChangeRecordDelete() != null) {
+ if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
+ //delete a binding template
+ pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
+ }
+ if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
+ //delete a business
+ pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
+ }
+ if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
+ //delete a service
+ pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
+ }
+ if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getTModelKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getTModelKey())) {
+ //delete a tmodel
+ /**
+ * The changeRecordDelete for a
+ * tModel does not correspond to
+ * any API described in this
+ * specification and should only
+ * appear in the replication
+ * stream as the result of an
+ * administrative function to
+ * permanently remove a tModel.
+ */
+ pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
+ }
+ }
+ if (rec.getChangeRecordDeleteAssertion() != null && rec.getChangeRecordDeleteAssertion().getPublisherAssertion() != null) {
+ //delete a pa template
+ pub.deletePublisherAssertion(rec.getChangeRecordDeleteAssertion().getPublisherAssertion(), em);
+ }
+
//</editor-fold>
- //<editor-fold defaultstate="collapsed" desc="New Data">
- if (rec.getChangeRecordNewData() != null) {
-
- if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
- throw new Exception("Inbound replication data is missiong node id!");
- }
-
- //The operationalInfo element MUST contain the operational information associated with the indicated new data.
- if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
- logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
- } else {
- if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
+ //<editor-fold defaultstate="collapsed" desc="New Data">
+ if (rec.getChangeRecordNewData() != null) {
+
+ if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
+ throw new Exception("Inbound replication data is missiong node id!");
+ }
+
+ //The operationalInfo element MUST contain the operational information associated with the indicated new data.
+ if (rec.getChangeRecordNewData().getOperationalInfo() == null) {
+ logger.warn("Inbound replication data does not have the required OperationalInfo element and is NOT spec compliant. Data will be ignored");
+ } else {
+ if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
//fetch the binding template if it exists already
- //if it exists,
- // confirm the owning node, it shouldn't be the local node id, if it is, throw
- // the owning node should be the same as it was before
-
- BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
- if (model == null) {
- logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
- } else {
- ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
-
- org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate();
- MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model);
- MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
- em.persist(model);
- }
-
- } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
-
- BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
- if (model == null) {
- model = new BusinessEntity();
- } else {
- ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
- }
- MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
- MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-
- em.persist(model);
-
- }
- if (rec.getChangeRecordNewData().getBusinessService() != null) {
- BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
- if (find == null) {
- logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
- } else {
- org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService();
- MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
- MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-
- em.persist(model);
- }
-
- } else if (rec.getChangeRecordNewData().getTModel() != null) {
- Tmodel model = new Tmodel();
- MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
-
- MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-
- em.persist(model);
- }
-
- }
-
- }
+ //if it exists,
+ // confirm the owning node, it shouldn't be the local node id, if it is, throw
+ // the owning node should be the same as it was before
+
+ BusinessService model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBindingTemplate().getServiceKey());
+ if (model == null) {
+ logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
+ } else {
+ ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
+
+ org.apache.juddi.model.BindingTemplate modelT = new org.apache.juddi.model.BindingTemplate();
+ MappingApiToModel.mapBindingTemplate(rec.getChangeRecordNewData().getBindingTemplate(), modelT, model);
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+ em.persist(model);
+ }
+
+ } else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
+
+ BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
+ if (model == null) {
+ model = new BusinessEntity();
+ } else {
+ ValidateNodeIdMatches(model.getNodeId(), rec.getChangeRecordNewData().getOperationalInfo());
+ }
+ MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+ em.persist(model);
+
+ }
+ if (rec.getChangeRecordNewData().getBusinessService() != null) {
+ BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessService().getBusinessKey());
+ if (find == null) {
+ logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
+ } else {
+ org.apache.juddi.model.BusinessService model = new org.apache.juddi.model.BusinessService();
+ MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+ em.persist(model);
+ }
+
+ } else if (rec.getChangeRecordNewData().getTModel() != null) {
+ Tmodel model = new Tmodel();
+ MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+ em.persist(model);
+ }
+
+ }
+
+ }
//</editor-fold>
// changeRecordNull no action needed
- // changeRecordHide tmodel only
- //<editor-fold defaultstate="collapsed" desc="hide tmodel">
- if (rec.getChangeRecordHide() != null) {
- /*
- A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
+ // changeRecordHide tmodel only
+ //<editor-fold defaultstate="collapsed" desc="hide tmodel">
+ if (rec.getChangeRecordHide() != null) {
+ /*
+ A changeRecordHide element corresponds to the behavior of hiding a tModel described in the delete_tModel in the Publish API section of this Specification. A tModel listed in a changeRecordHide should be marked as hidden, so that it is not returned in response to a find_tModel API call.
- The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
- */
- String key = rec.getChangeRecordHide().getTModelKey();
- org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
- if (existing == null) {
- logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
- } else {
- existing.setDeleted(true);
- existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
- existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
- em.persist(existing);
- }
- }
+ The changeRecordHide MUST contain a modified timestamp to allow multi-node registries to calculate consistent modifiedIncludingChildren timestamps as described in Section 3.8 operationalInfo Structure.
+ */
+ String key = rec.getChangeRecordHide().getTModelKey();
+ org.apache.juddi.model.Tmodel existing = em.find(org.apache.juddi.model.Tmodel.class, key);
+ if (existing == null) {
+ logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
+ } else {
+ existing.setDeleted(true);
+ existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+ existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
+ em.persist(existing);
+ }
+ }
//</editor-fold>
- //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
- if (rec.getChangeRecordPublisherAssertion() != null) {
+ //<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
+ if (rec.getChangeRecordPublisherAssertion() != null) {
//TODO implement
- }
+ }
//</editor-fold>
- tx.commit();
-
- } catch (Exception drfm) {
- logger.warn(drfm);
- } finally {
- if (tx.isActive()) {
- tx.rollback();
+ tx.commit();
+
+ } catch (Exception drfm) {
+ logger.warn(drfm);
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ em.close();
+ }
}
- em.close();
- }
- }
-
- }
-
- private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception {
- if (nodeId == null || operationalInfo == null) {
- throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
- }
- if (!nodeId.equals(operationalInfo.getNodeID())) {
- throw new Exception("node id mismatch!");
+
}
- }
-
- private synchronized UDDIReplicationPortType getReplicationClient(String node) {
- if (cache.containsKey(node)) {
- return cache.get(node);
+
+ private static void ValidateNodeIdMatches(String nodeId, OperationalInfo operationalInfo) throws Exception {
+ if (nodeId == null || operationalInfo == null) {
+ throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
+ }
+ if (!nodeId.equals(operationalInfo.getNodeID())) {
+ throw new Exception("node id mismatch!");
+ }
}
- UDDIService svc = new UDDIService();
- UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
- EntityManager em = PersistenceManager.getEntityManager();
- EntityTransaction tx = em.getTransaction();
- try {
- Node find = em.find(org.apache.juddi.model.Node.class, node);
- ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, find.getReplicationUrl());
- cache.put(node, replicationClient);
- return replicationClient;
- } catch (Exception ex) {
- logger.fatal("Node not found!" + node, ex);
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- em.close();
+
+ private synchronized UDDIReplicationPortType getReplicationClient(String node) {
+ if (cache.containsKey(node)) {
+ return cache.get(node);
+ }
+ UDDIService svc = new UDDIService();
+ UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
+
+ EntityManager em = PersistenceManager.getEntityManager();
+ EntityTransaction tx = em.getTransaction();
+ try {
+ StringBuilder sql = new StringBuilder();
+ sql.append("select c from ReplicationConfiguration c order by c.serialNumber desc");
+ sql.toString();
+ Query qry = em.createQuery(sql.toString());
+ qry.setMaxResults(1);
+
+ org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
+ for (Operator o : resultList.getOperator()) {
+ if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
+ ((BindingProvider) replicationClient).getRequestContext().put(BindingProvider.ENDPOINT_ADDRESS_PROPERTY, o.getSoapReplicationURL());
+ cache.put(node, replicationClient);
+ return replicationClient;
+ }
+ }
+ tx.rollback();
+
+ } catch (Exception ex) {
+ logger.fatal("Node not found!" + node, ex);
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ em.close();
+ }
+ //em.close();
+ return null;
+
}
- em.close();
- return null;
-
- }
- private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
-
- /**
- * @since 3.3
- * @param body
- * @return
- * @throws DispositionReportFaultMessage
- */
- public String doPing(DoPing body) throws DispositionReportFaultMessage {
- long startTime = System.currentTimeMillis();
- long procTime = System.currentTimeMillis() - startTime;
- serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
-
- return node;
-
- }
-
- @Override
- public List<ChangeRecord> getChangeRecords(String requestingNode,
- HighWaterMarkVectorType changesAlreadySeen,
- BigInteger responseLimitCount,
- HighWaterMarkVectorType responseLimitVector)
- throws DispositionReportFaultMessage {
- long startTime = System.currentTimeMillis();
-
- new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
-
- //TODO should we validate that "requestingNode" is in the replication config?
- List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
- EntityManager em = PersistenceManager.getEntityManager();
- EntityTransaction tx = em.getTransaction();
+ private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
/**
- * More specifically, the recipient determines the particular change
- * records that are returned by comparing the originating USNs in the
- * caller’s high water mark vector with the originating USNs of each of
- * the changes the recipient has seen from others or generated by
- * itself. The recipient SHOULD only return change records that have
- * originating USNs that are greater than those listed in the
- * changesAlreadySeen highWaterMarkVector and less than the limit
- * required by either the responseLimitCount or the responseLimitVector.
- *
- *
- * Part of the message is a high water mark vector that contains for
- * each node of the registry the originating USN of the most recent
- * change record that has been successfully processed by the invocating
- * node
+ * @since 3.3
+ * @param body
+ * @return
+ * @throws DispositionReportFaultMessage
*/
- int maxrecords = 100;
- if (responseLimitCount != null) {
- maxrecords = responseLimitCount.intValue();
+ public String doPing(DoPing body) throws DispositionReportFaultMessage {
+ long startTime = System.currentTimeMillis();
+ long procTime = System.currentTimeMillis() - startTime;
+ serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
+
+ return node;
+
}
- try {
- tx.begin();
- Long firstrecord = 1L;
- Long lastrecord = null;
-
- if (changesAlreadySeen != null) {
- //this is basically a lower limit (i.e. the newest record that was processed by the requestor
- //therefore we want the oldest record stored locally to return to the requestor for processing
- for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
- if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
- firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1;
- }
+
+ @WebResult(name = "changeRecord", targetNamespace = "urn:uddi-org:repl_v3")
+ @RequestWrapper(localName = "get_changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.GetChangeRecords")
+ @ResponseWrapper(localName = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", className = "org.uddi.repl_v3.ChangeRecords")
+
+ @Override
+ public List<ChangeRecord> getChangeRecords(@WebParam(name = "requestingNode", targetNamespace = "urn:uddi-org:repl_v3") String requestingNode,
+ @WebParam(name = "changesAlreadySeen", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType changesAlreadySeen,
+ @WebParam(name = "responseLimitCount", targetNamespace = "urn:uddi-org:repl_v3") BigInteger responseLimitCount,
+ @WebParam(name = "responseLimitVector", targetNamespace = "urn:uddi-org:repl_v3") HighWaterMarkVectorType responseLimitVector)
+ throws DispositionReportFaultMessage {
+ long startTime = System.currentTimeMillis();
+
+ new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
+
+ //TODO should we validate that "requestingNode" is in the replication config?
+ List<ChangeRecord> ret = new ArrayList<ChangeRecord>();
+ EntityManager em = PersistenceManager.getEntityManager();
+ EntityTransaction tx = em.getTransaction();
+
+ /**
+ * More specifically, the recipient determines the particular
+ * change records that are returned by comparing the originating
+ * USNs in the caller’s high water mark vector with the
+ * originating USNs of each of the changes the recipient has
+ * seen from others or generated by itself. The recipient SHOULD
+ * only return change records that have originating USNs that
+ * are greater than those listed in the changesAlreadySeen
+ * highWaterMarkVector and less than the limit required by
+ * either the responseLimitCount or the responseLimitVector.
+ *
+ *
+ * Part of the message is a high water mark vector that contains
+ * for each node of the registry the originating USN of the most
+ * recent change record that has been successfully processed by
+ * the invocating node
+ */
+ int maxrecords = 100;
+ if (responseLimitCount != null) {
+ maxrecords = responseLimitCount.intValue();
}
- }
- if (responseLimitVector != null) {
+ try {
+ tx.begin();
+ Long firstrecord = 1L;
+ Long lastrecord = null;
+
+ if (changesAlreadySeen != null) {
+ //this is basically a lower limit (i.e. the newest record that was processed by the requestor
+ //therefore we want the oldest record stored locally to return to the requestor for processing
+ for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
+ if (changesAlreadySeen.getHighWaterMark().get(i).getNodeID().equals(node)) {
+ firstrecord = changesAlreadySeen.getHighWaterMark().get(i).getOriginatingUSN() + 1;
+ }
+ }
+ }
+ if (responseLimitVector != null) {
//using responseLimitVector, indicating for each node in the graph the first change originating there that he does not wish to be returned.
- //upper limit basically
- for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
- if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
- lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
- }
- }
- }
-
- Query createQuery = null;
- if (lastrecord != null) {
- createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node and e.id < :lastrecord) OR (e.originatingUSN > :inbound and e.nodeID != :node and e.originatingUSN < :lastrecord) order by e.id ASC");
- createQuery.setParameter("lastrecord", lastrecord);
- } else {
- createQuery = em.createQuery("select e from ChangeRecord e where (e.id > :inbound and e.nodeID = :node) OR (e.originatingUSN > :inbound and e.nodeID != :node) order by e.id ASC");
- }
- createQuery.setMaxResults(maxrecords);
- createQuery.setParameter("inbound", firstrecord);
- createQuery.setParameter("node", node);
-
- List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) (org.apache.juddi.model.ChangeRecord) createQuery.getResultList();
- for (int i = 0; i < records.size(); i++) {
- ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
- if (!Excluded(changesAlreadySeen, r)) {
- ret.add(r);
+ //upper limit basically
+ for (int i = 0; i < responseLimitVector.getHighWaterMark().size(); i++) {
+ if (responseLimitVector.getHighWaterMark().get(i).getNodeID().equals(node)) {
+ lastrecord = responseLimitVector.getHighWaterMark().get(i).getOriginatingUSN();
+ }
+ }
+ }
+
+ Query createQuery = null;
+ if (lastrecord != null) {
+ createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node and e.id < :lastrecord) OR "
+ + "(e.originatingUSN > :inbound and e.nodeID <> :node and e.originatingUSN < :lastrecord)) order by e.id ASC");
+ createQuery.setParameter("lastrecord", lastrecord);
+ } else {
+ createQuery = em.createQuery("select e from ChangeRecord e where ((e.id >= :inbound and e.nodeID = :node) OR "
+ + "(e.originatingUSN > :inbound and e.nodeID <> :node)) order by e.id ASC");
+ }
+ createQuery.setMaxResults(maxrecords);
+ createQuery.setParameter("inbound", firstrecord);
+ createQuery.setParameter("node", node);
+
+ List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
+ for (int i = 0; i < records.size(); i++) {
+ ChangeRecord r = MappingModelToApi.mapChangeRecord(records.get(i));
+ if (!Excluded(changesAlreadySeen, r)) {
+ ret.add(r);
+ }
+
+ }
+
+ tx.rollback();
+ long procTime = System.currentTimeMillis() - startTime;
+ serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
+ QueryStatus.SUCCESS, procTime);
+
+ } catch (Exception ex) {
+ logger.fatal("Error, this node is: " + node, ex);
+ throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
+
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ em.close();
}
-
- }
-
- tx.rollback();
- long procTime = System.currentTimeMillis() - startTime;
- serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
- QueryStatus.SUCCESS, procTime);
-
- } catch (Exception ex) {
- logger.fatal("Error, this node is: " + node, ex);
- throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
-
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- em.close();
+ logger.info("Change records returned for " + requestingNode + ": " + ret.size());
+ //JAXB.marshal(ret, System.out);
+ return ret;
}
- return ret;
- }
-
- /**
- * This UDDI API message provides a means to obtain a list of highWaterMark
- * element containing the highest known USN for all nodes in the replication
- * graph. If there is no graph, we just return the local bits
- *
- * @return
- * @throws DispositionReportFaultMessage
- */
- @Override
- public List<ChangeRecordIDType> getHighWaterMarks()
- throws DispositionReportFaultMessage {
- long startTime = System.currentTimeMillis();
-
- List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
-
- //fetch from database the highest known watermark
- ReplicationConfiguration FetchEdges = FetchEdges();
-
- EntityManager em = PersistenceManager.getEntityManager();
- EntityTransaction tx = em.getTransaction();
- try {
- tx.begin();
- if (FetchEdges != null) {
- Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
- while (it.hasNext()) {
- String nextNode = it.next();
- if (!nextNode.equals(node)) {
-
- Long id = 0L;
- try {
- id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
- } catch (Exception ex) {
- logger.debug(ex);
+
+ /**
+ * This UDDI API message provides a means to obtain a list of
+ * highWaterMark element containing the highest known USN for all nodes
+ * in the replication graph. If there is no graph, we just return the
+ * local bits
+ *
+ * @return
+ * @throws DispositionReportFaultMessage
+ */
+ @Override
+ public List<ChangeRecordIDType> getHighWaterMarks()
+ throws DispositionReportFaultMessage {
+ long startTime = System.currentTimeMillis();
+
+ List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
+
+ //fetch from database the highest known watermark
+ ReplicationConfiguration FetchEdges = FetchEdges();
+
+ EntityManager em = PersistenceManager.getEntityManager();
+ EntityTransaction tx = em.getTransaction();
+ try {
+ tx.begin();
+ if (FetchEdges != null) {
+ Iterator<String> it = FetchEdges.getCommunicationGraph().getNode().iterator();
+ while (it.hasNext()) {
+ String nextNode = it.next();
+ if (!nextNode.equals(node)) {
+
+ Long id = 0L;
+ try {
+ id = (Long) em.createQuery("select e.originatingUSN from ChangeRecord e where e.nodeID = :node order by e.originatingUSN desc").setParameter("node", nextNode).setMaxResults(1).getSingleResult();
+ } catch (Exception ex) {
+ logger.debug(ex);
+ }
+ if (id == null) {
+ id = 0L;
+ //per the spec
+ }
+ ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id);
+
+ ret.add(x);
+
+ }
+ }
}
+ //dont forget this node
+ Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult();
if (id == null) {
- id = 0L;
- //per the spec
+ id = 0L;
}
- ChangeRecordIDType x = new ChangeRecordIDType(nextNode, id);
-
+ ChangeRecordIDType x = new ChangeRecordIDType();
+ x.setNodeID(node);
+ x.setOriginatingUSN(id);
ret.add(x);
-
- }
+
+ tx.rollback();
+ long procTime = System.currentTimeMillis() - startTime;
+ serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
+
+ } catch (Exception drfm) {
+ throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
+
+ } finally {
+ if (tx.isActive()) {
+ tx.rollback();
+ }
+ em.close();
}
- }
- //dont forget this node
- Long id = (Long) em.createQuery("select (e.id) from ChangeRecord e where e.nodeID = :node order by e.id desc").setParameter("node", node).setMaxResults(1).getSingleResult();
- if (id == null) {
- id = 0L;
- }
- ChangeRecordIDType x = new ChangeRecordIDType();
- x.setNodeID(node);
- x.setOriginatingUSN(id);
- ret.add(x);
-
- tx.rollback();
- long procTime = System.currentTimeMillis() - startTime;
- serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
-
- } catch (Exception drfm) {
- throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
-
- } finally {
- if (tx.isActive()) {
- tx.rollback();
- }
- em.close();
+
+ return ret;
}
-
-
-
- return ret;
- }
-
- /**
- * this means that another node has a change and we need to pick up the
- * change and apply it to our local database.
- *
- * @param body
- * @throws DispositionReportFaultMessage
- */
- @Override
- public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
- throws DispositionReportFaultMessage {
- long startTime = System.currentTimeMillis();
- long procTime = System.currentTimeMillis() - startTime;
- serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
- QueryStatus.SUCCESS, procTime);
+
+ /**
+ * this means that another node has a change and we need to pick up the
+ * change and apply it to our local database.
+ *
+ * @param body
+ * @throws DispositionReportFaultMessage
+ */
+ @Override
+ public void notifyChangeRecordsAvailable(NotifyChangeRecordsAvailable body)
+ throws DispositionReportFaultMessage {
+ long startTime = System.currentTimeMillis();
+ long procTime = System.currentTimeMillis() - startTime;
+ serviceCounter.update(ReplicationQuery.NOTIFY_CHANGERECORDSAVAILABLE,
+ QueryStatus.SUCCESS, procTime);
//some other node just told us there's new records available, call
- //getChangeRecords from the remote node asynch
-
- new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
-
- queue.add(body);
-
- //ValidateReplication.unsupportedAPICall();
- }
- private static Queue<NotifyChangeRecordsAvailable> queue = null;
-
- /**
- * transfers custody of an entity from node1/user1 to node2/user2
- *
- * @param body
- * @throws DispositionReportFaultMessage
- */
- @Override
- public void transferCustody(TransferCustody body)
- throws DispositionReportFaultMessage {
- long startTime = System.currentTimeMillis();
+ //getChangeRecords from the remote node asynch
- //*this node is transfering data to another node
- //body.getTransferOperationalInfo().
- ValidateReplication.unsupportedAPICall();
-
- EntityManager em = PersistenceManager.getEntityManager();
- //EntityTransaction tx = em.getTransaction();
+ new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
+
+ logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...");
+ queue.add(body);
+
+ //ValidateReplication.unsupportedAPICall();
+ }
+ private static Queue<NotifyChangeRecordsAvailable> queue = null;
/**
- * The custodial node must verify that it has granted permission to
- * transfer the entities identified and that this permission is still
- * valid. This operation is comprised of two steps:
- *
- * 1. Verification that the transferToken was issued by it, that it has
- * not expired, that it represents the authority to transfer no more and
- * no less than those entities identified by the businessKey and
- * tModelKey elements and that all these entities are still valid and
- * not yet transferred. The transferToken is invalidated if any of these
- * conditions are not met.
- *
- * 2. If the conditions above are met, the custodial node will prevent
- * any further changes to the entities identified by the businessKey and
- * tModelKey elements identified. The entity will remain in this state
- * until the replication stream indicates it has been successfully
- * processed via the replication stream. Upon successful verification of
- * the custody transfer request by the custodial node, an empty message
- * is returned by it indicating the success of the request and
- * acknowledging the custody transfer. Following the issue of the empty
- * message, the custodial node will submit into the replication stream a
- * changeRecordNewData providing in the operationalInfo, the nodeID
- * accepting custody of the datum and the authorizedName of the
- * publisher accepting ownership. The acknowledgmentRequested attribute
- * of this change record MUST be set to "true".
+ * transfers custody of an entity from node1/user1 to node2/user2
*
- * TODO enqueue Replication message
- *
- * Finally, the custodial node invalidates the transferToken in order to
- * prevent additional calls of the transfer_entities API.
+ * @param body
+ * @throws DispositionReportFaultMessage
*/
- DiscardTransferToken dtt = new DiscardTransferToken();
- dtt.setKeyBag(body.getKeyBag());
- dtt.setTransferToken(body.getTransferToken());
- new UDDICustodyTransferImpl().discardTransferToken(dtt);
- }
-
+ @Override
+ public void transferCustody(TransferCustody body)
+ throws DispositionReportFaultMessage {
+ long startTime = System.currentTimeMillis();
+
+ //*this node is transfering data to another node
+ //body.getTransferOperationalInfo().
+ ValidateReplication.unsupportedAPICall();
+
+ EntityManager em = PersistenceManager.getEntityManager();
+ //EntityTransaction tx = em.getTransaction();
+
+ /**
+ * The custodial node must verify that it has granted permission
+ * to transfer the entities identified and that this permission
+ * is still valid. This operation is comprised of two steps:
+ *
+ * 1. Verification that the transferToken was issued by it, that
+ * it has not expired, that it represents the authority to
+ * transfer no more and no less than those entities identified
+ * by the businessKey and tModelKey elements and that all these
+ * entities are still valid and not yet transferred. The
+ * transferToken is invalidated if any of these conditions are
+ * not met.
+ *
+ * 2. If the conditions above are met, the custodial node will
+ * prevent any further changes to the entities identified by the
+ * businessKey and tModelKey elements identified. The entity
+ * will remain in this state until the replication stream
+ * indicates it has been successfully processed via the
+ * replication stream. Upon successful verification of the
+ * custody transfer request by the custodial node, an empty
+ * message is returned by it indicating the success of the
+ * request and acknowledging the custody transfer. Following the
+ * issue of the empty message, the custodial node will submit
+ * into the replication stream a changeRecordNewData providing
+ * in the operationalInfo, the nodeID accepting custody of the
+ * datum and the authorizedName of the publisher accepting
+ * ownership. The acknowledgmentRequested attribute of this
+ * change record MUST be set to "true".
+ *
+ * TODO enqueue Replication message
+ *
+ * Finally, the custodial node invalidates the transferToken in
+ * order to prevent additional calls of the transfer_entities
+ * API.
+ */
+ DiscardTransferToken dtt = new DiscardTransferToken();
+ dtt.setKeyBag(body.getKeyBag());
+ dtt.setTransferToken(body.getTransferToken());
+ new UDDICustodyTransferImpl().discardTransferToken(dtt);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@juddi.apache.org
For additional commands, e-mail: commits-help@juddi.apache.org