You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@juddi.apache.org by al...@apache.org on 2015/01/18 22:12:49 UTC
juddi git commit: JUDDI-241 revising the validation logic for inbound
replication data. this prevents accidental and malicious custody transfers
while still allowing valid transfers to occur, also prevents malicious updates
Repository: juddi
Updated Branches:
refs/heads/master 53d6f2d2c -> ba85baa1c
JUDDI-241 revising the validation logic for inbound replication data. this prevents accidental and malicious custody transfers while still allowing valid transfers to occur, also prevents malicious updates
Project: http://git-wip-us.apache.org/repos/asf/juddi/repo
Commit: http://git-wip-us.apache.org/repos/asf/juddi/commit/ba85baa1
Tree: http://git-wip-us.apache.org/repos/asf/juddi/tree/ba85baa1
Diff: http://git-wip-us.apache.org/repos/asf/juddi/diff/ba85baa1
Branch: refs/heads/master
Commit: ba85baa1ced45e28f13c5060a72a8bac30ba8c73
Parents: 53d6f2d
Author: Alex <al...@apache.org>
Authored: Sun Jan 18 16:12:27 2015 -0500
Committer: Alex <al...@apache.org>
Committed: Sun Jan 18 16:12:27 2015 -0500
----------------------------------------------------------------------
.../juddi/api/impl/UDDICustodyTransferImpl.java | 4 +-
.../juddi/api/impl/UDDIPublicationImpl.java | 47 ++-
.../juddi/api/impl/UDDIReplicationImpl.java | 317 ++++++++++++-------
.../java/org/apache/juddi/model/UddiEntity.java | 10 +
4 files changed, 267 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java
index 352123c..3421d81 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDICustodyTransferImpl.java
@@ -261,8 +261,10 @@ public class UDDICustodyTransferImpl extends AuthenticatedService implements UDD
UddiEntity uddiEntity = em.find(UddiEntity.class, key);
if (uddiEntity!=null) {
+ uddiEntity.setIsTransferInProgress(true);
sourceNode = uddiEntity.getNodeId();
- break; //we only need one source node
+ em.merge(uddiEntity);
+ //save the fact we are expecting a transfer
}
else
{
http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java
index 2417db9..a62e1c6 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIPublicationImpl.java
@@ -80,12 +80,14 @@ import org.uddi.api_v3.SaveBusiness;
import org.uddi.api_v3.SaveService;
import org.uddi.api_v3.SaveTModel;
import org.uddi.api_v3.ServiceDetail;
+import org.uddi.api_v3.TModel;
import org.uddi.api_v3.TModelDetail;
import org.uddi.repl_v3.ChangeRecordDelete;
import org.uddi.repl_v3.ChangeRecordDeleteAssertion;
import org.uddi.repl_v3.ChangeRecordHide;
import org.uddi.repl_v3.ChangeRecordIDType;
import org.uddi.repl_v3.ChangeRecordNewData;
+import org.uddi.repl_v3.ChangeRecordNewDataConditional;
import org.uddi.repl_v3.ChangeRecordPublisherAssertion;
import org.uddi.v3_service.DispositionReportFaultMessage;
import org.uddi.v3_service.UDDIPublicationPortType;
@@ -157,7 +159,7 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
em.merge(existingPubAssertion);
persistNewAssertion = false;
- changes.add(getChangeRecord_deletePublisherAssertion(apiPubAssertion, node, existingPubAssertion.getToCheck().equalsIgnoreCase("false"), existingPubAssertion.getFromCheck().equalsIgnoreCase("false"), System.currentTimeMillis()));
+ changes.add(getChangeRecord_deletePublisherAssertion(apiPubAssertion, node, existingPubAssertion.getToCheck().equalsIgnoreCase("false"), existingPubAssertion.getFromCheck().equalsIgnoreCase("false"), System.currentTimeMillis()));
} else {
// Otherwise, it is a new relationship between these entities. Remove the old one so the new one can be added.
// TODO: the model only seems to allow one assertion per two business (primary key is fromKey and toKey). Spec seems to imply as
@@ -343,8 +345,9 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
org.apache.juddi.model.PublisherAssertion existingPubAssertion = em.find(org.apache.juddi.model.PublisherAssertion.class,
modelPubAssertion.getId());
- if (existingPubAssertion==null)
+ if (existingPubAssertion == null) {
throw new InvalidValueException(new ErrorMessage("E_assertionNotFound"));
+ }
boolean fromkey = publisher.isOwner(em.find(BusinessEntity.class, entity.getFromKey()));
boolean tokey = publisher.isOwner(em.find(BusinessEntity.class, entity.getToKey()));
@@ -362,7 +365,7 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
em.persist(existingPubAssertion);
}
- changes.add(getChangeRecord_deletePublisherAssertion(entity, node, tokey,fromkey, existingPubAssertion.getModified().getTime()));
+ changes.add(getChangeRecord_deletePublisherAssertion(entity, node, tokey, fromkey, existingPubAssertion.getModified().getTime()));
}
tx.commit();
@@ -924,6 +927,8 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
List<ChangeRecord> changes = new ArrayList<ChangeRecord>();
for (org.uddi.api_v3.TModel apiTModel : apiTModelList) {
+ // Object obj=em.find( org.apache.juddi.model.Tmodel.class, apiTModel.getTModelKey());
+ //just making changes to an existing tModel, no worries
org.apache.juddi.model.Tmodel modelTModel = new org.apache.juddi.model.Tmodel();
MappingApiToModel.mapTModel(apiTModel, modelTModel);
@@ -934,6 +939,15 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
result.getTModel().add(apiTModel);
changes.add(getChangeRecord(modelTModel, apiTModel, node));
+ /*
+ //TODO JUDDI-915
+ if (obj != null) {
+
+ changes.add(getChangeRecord(modelTModel, apiTModel, node));
+ } else {
+ //special case for replication, must setup a new data conditional change record
+ changes.add(getChangeRecordConditional(modelTModel, apiTModel, node));
+ }*/
}
@@ -1533,4 +1547,31 @@ public class UDDIPublicationImpl extends AuthenticatedService implements UDDIPub
return ret;
}
+ private static ChangeRecord getChangeRecordConditional(Tmodel modelTModel, TModel apiTModel, String node) throws DispositionReportFaultMessage {
+ ChangeRecord cr = new ChangeRecord();
+ if (!apiTModel.getTModelKey().equals(modelTModel.getEntityKey())) {
+ throw new FatalErrorException(new ErrorMessage("E_fatalError", "the model and api keys do not match when saving a tmodel!"));
+ }
+ cr.setEntityKey(modelTModel.getEntityKey());
+ cr.setNodeID(node);
+
+ cr.setRecordType(ChangeRecord.RecordType.ChangeRecordNewDataConditional);
+ org.uddi.repl_v3.ChangeRecord crapi = new org.uddi.repl_v3.ChangeRecord();
+ crapi.setChangeID(new ChangeRecordIDType(node, -1L));
+ crapi.setChangeRecordNewDataConditional(new ChangeRecordNewDataConditional());
+ crapi.getChangeRecordNewDataConditional().setChangeRecordNewData(new ChangeRecordNewData());
+ crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().setTModel(apiTModel);
+ crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().setTModelKey(modelTModel.getEntityKey());
+ crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().setOperationalInfo(new OperationalInfo());
+ MappingModelToApi.mapOperationalInfo(modelTModel, crapi.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
+ StringWriter sw = new StringWriter();
+ JAXB.marshal(crapi, sw);
+ try {
+ cr.setContents(sw.toString().getBytes("UTF8"));
+ } catch (UnsupportedEncodingException ex) {
+ logger.error(ex);
+ }
+ return cr;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
index 8f20593..703f58c 100644
--- a/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
+++ b/juddi-core/src/main/java/org/apache/juddi/api/impl/UDDIReplicationImpl.java
@@ -58,6 +58,7 @@ import org.apache.juddi.model.Operator;
import org.apache.juddi.model.PublisherAssertion;
import org.apache.juddi.model.PublisherAssertionId;
import org.apache.juddi.model.Tmodel;
+import org.apache.juddi.model.UddiEntity;
import org.apache.juddi.replication.ReplicationNotifier;
import static org.apache.juddi.replication.ReplicationNotifier.FetchEdges;
import org.apache.juddi.v3.client.UDDIService;
@@ -109,13 +110,13 @@ import org.uddi.v3_service.UDDIReplicationPortType;
org.uddi.policy_v3_instanceparms.ObjectFactory.class
})
public class UDDIReplicationImpl extends AuthenticatedService implements UDDIReplicationPortType {
-
+
static void notifyConfigurationChange(ReplicationConfiguration oldConfig, ReplicationConfiguration newConfig) {
//if the config is different
Set<String> oldnodes = getNodes(oldConfig);
Set<String> newNodes = getNodes(newConfig);
-
+
Set<String> addedNodes = diffNodeList(oldnodes, newNodes);
if (queue == null) {
queue = new ConcurrentLinkedQueue<String>();
@@ -128,9 +129,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
queue.add(s);
}
}
-
+
}
-
+
private static Set<String> getNodes(ReplicationConfiguration oldConfig) {
Set<String> ret = new HashSet<String>();
if (oldConfig == null) {
@@ -169,19 +170,19 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (!found) {
diff.add(lhs);
}
-
+
}
return diff;
}
-
+
private UDDIServiceCounter serviceCounter;
-
+
private static PullTimerTask timer = null;
private long startBuffer = 5000l;//AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_START_BUFFER, 20000l); // 20s startup delay default
private long interval = 5000l;// AppConfig.getConfiguration().getLong(Property.JUDDI_NOTIFICATION_INTERVAL, 300000l); //5 min default
private static UDDIPublicationImpl pub = null;
-
+
public UDDIReplicationImpl() {
super();
if (pub == null) {
@@ -195,17 +196,17 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
} catch (ConfigurationException ex) {
logger.fatal(ex);
}
-
+
}
-
+
private synchronized void Init() {
if (queue == null) {
queue = new ConcurrentLinkedQueue<String>();
}
timer = new PullTimerTask();
-
+
}
-
+
private boolean Excluded(HighWaterMarkVectorType changesAlreadySeen, ChangeRecord r) {
if (changesAlreadySeen != null) {
for (int i = 0; i < changesAlreadySeen.getHighWaterMark().size(); i++) {
@@ -223,18 +224,18 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
* available
*/
private class PullTimerTask extends TimerTask {
-
+
private Timer timer = null;
-
+
public PullTimerTask() {
super();
timer = new Timer(true);
timer.scheduleAtFixedRate(this, startBuffer, interval);
}
-
+
@Override
public void run() {
-
+
if (!queue.isEmpty()) {
logger.info("Replication change puller thread started. Queue size: " + queue.size());
}
@@ -260,10 +261,10 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
GetChangeRecords body = new GetChangeRecords();
body.setRequestingNode(node);
body.setResponseLimitCount(BigInteger.valueOf(20));
-
+
body.setChangesAlreadySeen(getLastChangeRecordFrom(poll));
logger.info("fetching updates from " + poll + " since " + body.getChangesAlreadySeen().getHighWaterMark().get(0).getOriginatingUSN() + " items still in the queue: " + queue.size());
-
+
List<ChangeRecord> records
= replicationClient.getChangeRecords(body).getChangeRecord();
//ok now we need to persist the change records
@@ -283,7 +284,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
}
}
-
+
@Override
public boolean cancel() {
timer.cancel();
@@ -301,7 +302,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
return;
}
logger.debug("_______________________Remote change request " + rec.getChangeID().getNodeID() + ":" + rec.getChangeID().getOriginatingUSN());
-
+
if (rec.getChangeID().getNodeID().equalsIgnoreCase(node)) {
logger.info("Just received a change record that i created, ignoring....");
return;
@@ -317,7 +318,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
* a USN is less than the USN specified in the
* changesAlreadySeen highWaterMarkVector.
*/
-
+
try {
tx.begin();
//the change record rec must also be persisted!!
@@ -337,13 +338,19 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (rec.getChangeRecordDelete() != null) {
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBindingKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBindingKey())) {
//delete a binding template
+ UddiEntity ue = em.find(BindingTemplate.class, rec.getChangeRecordDelete().getBindingKey());
+ ValidateNodeIdMisMatches(ue, node);
pub.deleteBinding(rec.getChangeRecordDelete().getBindingKey(), em);
}
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getBusinessKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getBusinessKey())) {
//delete a business
+ UddiEntity ue = em.find(BusinessEntity.class, rec.getChangeRecordDelete().getBindingKey());
+ ValidateNodeIdMisMatches(ue, node);
pub.deleteBusiness(rec.getChangeRecordDelete().getBusinessKey(), em);
}
if (rec.getChangeRecordDelete() != null && rec.getChangeRecordDelete().getServiceKey() != null && !"".equalsIgnoreCase(rec.getChangeRecordDelete().getServiceKey())) {
+ UddiEntity ue = em.find(BusinessService.class, rec.getChangeRecordDelete().getBindingKey());
+ ValidateNodeIdMisMatches(ue, node);
//delete a service
pub.deleteService(rec.getChangeRecordDelete().getServiceKey(), em);
}
@@ -359,11 +366,13 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
* administrative function to
* permanently remove a tModel.
*/
- Object tm=em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
- if (tm!=null)
+ UddiEntity tm = em.find(Tmodel.class, rec.getChangeRecordDelete().getTModelKey());
+ if (tm != null) {
+ ValidateNodeIdMisMatches(tm, node);
em.remove(tm);
- else
+ } else {
logger.error("failed to adminstratively delete tmodel because it doesn't exist. " + rec.getChangeRecordDelete().getTModelKey());
+ }
//pub.deleteTModel(rec.getChangeRecordDelete().getTModelKey(), em);
}
}
@@ -384,7 +393,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
throw new Exception("Inbound replication data is missiong node id! Change will not be applied");
}
if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equalsIgnoreCase(node)) {
- logger.warn("Inbound replication data is modifying locally owned data. This is not allowed");
+ logger.warn("Inbound replication data is modifying locally owned data. This is not allowed, except for custody transfer");
}
if (rec.getChangeRecordNewData().getBindingTemplate() != null) {
//fetch the binding template if it exists already
@@ -397,7 +406,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
} else {
ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
-
+
org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewData().getBindingTemplate().getBindingKey());
if (bt != null) {
//ValidateNodeIdMatches(node, bt.getNodeId());
@@ -409,16 +418,44 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
em.persist(bt);
}
-
+
} else if (rec.getChangeRecordNewData().getBusinessEntity() != null) {
-
+
BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewData().getBusinessEntity().getBusinessKey());
if (model != null) {
- ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
- //em.remove(model);
- MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
- MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
- em.merge(model);
+ if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && !model.getNodeId().equals(node)) {
+ if (model.getIsTransferInProgress()) {
+ //allow the transfer
+ MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+ MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
+ model.setIsTransferInProgress(false);
+ em.merge(model);
+ } else {
+ //block it, unexpected transfer
+ throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
+ }
+
+ } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && model.getNodeId().equals(node)) {
+ //if destination is here and it's staying here, then this is strange also
+ //someone else updated one of my records
+ throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
+ } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && model.getNodeId().equals(node)) {
+ //this is also strange, destination is elsewhere however it's owned by me.
+ throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
+
+ } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && !model.getNodeId().equals(node)) {
+ //changes on a remote node, for an existing item
+ MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
+ MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
+ em.merge(model);
+
+ }
+
} else {
model = new BusinessEntity();
MappingApiToModel.mapBusinessEntity(rec.getChangeRecordNewData().getBusinessEntity(), model);
@@ -431,39 +468,78 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (find == null) {
logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
} else {
-
+
org.apache.juddi.model.BusinessService model = null;
model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewData().getBusinessService().getServiceKey());
if (model != null) {
ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
em.remove(model);
}
-
+
model = new org.apache.juddi.model.BusinessService();
MappingApiToModel.mapBusinessService(rec.getChangeRecordNewData().getBusinessService(), model, find);
MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewData().getOperationalInfo());
-
+
em.persist(model);
}
-
+
} else if (rec.getChangeRecordNewData().getTModel() != null) {
-
+
Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewData().getTModel().getTModelKey());
if (model != null) {
- ValidateNodeIdMatches(rec.getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
- em.remove(model);
+ //in the case of a transfer
+ //if the new entity is being transfer to ME, accept and i didn't previously own it, but only if the local record is flagged as transferable
+ //meaning, only accept if i'm expecting a transfer
+ if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && !model.getNodeId().equals(node)) {
+ if (model.getIsTransferInProgress()) {
+ //allow the transfer
+ em.remove(model);
+ model = new Tmodel();
+ MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+ model.setIsTransferInProgress(false);
+ em.persist(model);
+ } else {
+ //block it, unexpected transfer
+ throw new Exception("Unexpected entity transfer to this node from " + rec.getChangeID().getNodeID());
+ }
+
+ } else if (rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && model.getNodeId().equals(node)) {
+ //if destination is here and it's staying here, then this is strange also
+ //someone else updated one of my records
+ throw new Exception("unexpected modification of records that this server owns, " + model.getEntityKey());
+ } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && model.getNodeId().equals(node)) {
+ //this is also strange, destination is elsewhere however it's owned by me.
+ throw new Exception("unexpected transfer from this node to elsewhere, possible that the key in question exists at two places prior to replication sync, " + model.getEntityKey());
+
+ } else if (!rec.getChangeRecordNewData().getOperationalInfo().getNodeID().equals(node)
+ && !model.getNodeId().equals(node)) {
+ //changes on a remote node, for an existing item
+ em.remove(model);
+ model = new Tmodel();
+ MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+ em.persist(model);
+
+ }
+ } else {
+ model = new Tmodel();
+ MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
+
+ MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
+
+ em.persist(model);
}
- model = new Tmodel();
- MappingApiToModel.mapTModel(rec.getChangeRecordNewData().getTModel(), model);
-
- MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewData().getOperationalInfo());
-
- em.persist(model);
}
-
+
}
-
+
}
//</editor-fold>
@@ -481,6 +557,8 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (existing == null) {
logger.error("Unexpected delete/hide tmodel message received for non existing key " + key);
} else {
+ //no one else can delete/hide my tmodel
+ ValidateNodeIdMisMatches(existing, node);
existing.setDeleted(true);
existing.setModified(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
existing.setModifiedIncludingChildren(rec.getChangeRecordHide().getModified().toGregorianCalendar().getTime());
@@ -491,26 +569,26 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
//<editor-fold defaultstate="collapsed" desc="changeRecordPublisherAssertion">
if (rec.getChangeRecordPublisherAssertion() != null) {
-
+
logger.info("Repl CR Publisher Assertion");
//TODO are publisher assertions owned by a given node?
PublisherAssertionId paid = new PublisherAssertionId(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey(), rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey());
org.apache.juddi.model.PublisherAssertion model = em.find(org.apache.juddi.model.PublisherAssertion.class, paid);
if (model != null) {
logger.info("Repl CR Publisher Assertion - Existing");
-
+
if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
model.setFromCheck("true");
} else {
model.setFromCheck("false");
}
-
+
if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
model.setToCheck("true");
} else {
model.setToCheck("false");
}
-
+
model.setKeyName(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyName());
model.setKeyValue(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getKeyValue());
model.setTmodelKey(rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getKeyedReference().getTModelKey());
@@ -524,20 +602,20 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
em.merge(model);
} else {
logger.info("Repl CR Publisher Assertion - new PA");
-
+
model = new PublisherAssertion();
MappingApiToModel.mapPublisherAssertion(rec.getChangeRecordPublisherAssertion().getPublisherAssertion(), model);
model.setBusinessEntityByFromKey(null);
model.setBusinessEntityByToKey(null);
model.setBusinessEntityByFromKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getFromKey()));
model.setBusinessEntityByToKey(em.find(BusinessEntity.class, rec.getChangeRecordPublisherAssertion().getPublisherAssertion().getToKey()));
-
+
if (rec.getChangeRecordPublisherAssertion().isFromBusinessCheck()) {
model.setFromCheck("true");
} else {
model.setFromCheck("false");
}
-
+
if (rec.getChangeRecordPublisherAssertion().isToBusinessCheck()) {
model.setToCheck("true");
} else {
@@ -557,7 +635,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
ReplicationNotifier.Enqueue(MappingApiToModel.mapChangeRecord(posack));
}
if (rec.getChangeRecordNewDataConditional() != null) {
-
+
if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID() == null) {
throw new Exception("Inbound replication data is missiong node id!");
}
@@ -576,11 +654,11 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (model == null) {
logger.error("Replication error, attempting to insert a binding where the service doesn't exist yet");
} else {
-
+
org.apache.juddi.model.BindingTemplate bt = em.find(org.apache.juddi.model.BindingTemplate.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBindingTemplate().getBindingKey());
if (bt != null) {
ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), bt.getNodeId());
-
+
em.remove(bt);
}
bt = new BindingTemplate();
@@ -589,9 +667,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
// MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
em.persist(bt);
}
-
+
} else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity() != null) {
-
+
BusinessEntity model = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessEntity().getBusinessKey());
if (model != null) {
ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
@@ -605,31 +683,31 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
logger.warn("Name size on save is " + model.getBusinessNames().size());
em.persist(model);
-
+
}
if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService() != null) {
BusinessEntity find = em.find(org.apache.juddi.model.BusinessEntity.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getBusinessKey());
if (find == null) {
logger.error("Replication error, attempting to insert a service where the business doesn't exist yet");
} else {
-
+
org.apache.juddi.model.BusinessService model = null;
model = em.find(org.apache.juddi.model.BusinessService.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService().getServiceKey());
if (model != null) {
ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
em.remove(model);
}
-
+
model = new org.apache.juddi.model.BusinessService();
MappingApiToModel.mapBusinessService(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getBusinessService(), model, find);
MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
MappingApiToModel.mapOperationalInfoIncludingChildren(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
-
+
em.persist(model);
}
-
+
} else if (rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel() != null) {
-
+
Tmodel model = em.find(org.apache.juddi.model.Tmodel.class, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel().getTModelKey());
if (model != null) {
ValidateNodeIdMatches(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo().getNodeID(), model.getNodeId());
@@ -637,14 +715,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
model = new Tmodel();
MappingApiToModel.mapTModel(rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getTModel(), model);
-
+
MappingApiToModel.mapOperationalInfo(model, rec.getChangeRecordNewDataConditional().getChangeRecordNewData().getOperationalInfo());
-
+
em.persist(model);
}
-
+
}
-
+
}
if (rec.getChangeRecordNull() != null) {
//No action required
@@ -659,7 +737,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
tx.commit();
-
+
} catch (Exception drfm) {
logger.warn("Error persisting change record!", drfm);
StringWriter sw = new StringWriter();
@@ -672,7 +750,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
em.close();
}
}
-
+
private HighWaterMarkVectorType getLastChangeRecordFrom(String notifyingNode) {
HighWaterMarkVectorType ret = new HighWaterMarkVectorType();
ChangeRecordIDType cid = new ChangeRecordIDType();
@@ -688,9 +766,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
} catch (Exception ex) {
logger.info(ex);
}
-
+
tx.rollback();
-
+
} catch (Exception drfm) {
logger.warn("error caught fetching newest record from node " + notifyingNode, drfm);
} finally {
@@ -699,22 +777,47 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
em.close();
}
-
+
ret.getHighWaterMark().add(cid);
-
+
return ret;
}
+
}
- // private void ValidateDontChangeMyRecordsAtAnotherNode(String )
- private void ValidateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
+ /**
+ * used to check for alterations on *this node's data from another node,
+ * which isn't allowed
+ *
+ * @param ue
+ * @param node
+ * @throws Exception
+ */
+ private static void ValidateNodeIdMisMatches(UddiEntity ue, String node) throws Exception {
+ if (ue == null) {
+ return;//object doesn't exist
+ }
+ if (ue.getNodeId().equals(node)) {
+ throw new Exception("Alert! attempt to alter locally owned entity " + ue.getEntityKey() + " owned by " + ue.getAuthorizedName() + "@" + ue.getNodeId());
+ }
+ }
+
+ /**
+ * use to validate that changed data maintained ownership, except for
+ * business entities and tmodels since they allow transfer
+ *
+ * @param newNodeId
+ * @param currentOwningNode
+ * @throws Exception
+ */
+ private static void ValidateNodeIdMatches(String newNodeId, String currentOwningNode) throws Exception {
if (newNodeId == null || currentOwningNode == null) {
throw new Exception("either the local node ID is null or the inbound replication data's node id is null");
}
//only time this is allowed is custody transfer
if (!newNodeId.equals(currentOwningNode)) {
+ logger.info("AUDIT, custody transfer from node, " + currentOwningNode + " to " + newNodeId + " current node is " + node);
//throw new Exception("node id mismatch!");
- logger.info("AUDIT, custory transfer from node, " + currentOwningNode + " to " + newNodeId);
}
//if i already have a record and "own it" and the remote node has a record with the same key, reject the update
@@ -737,14 +840,14 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
//throw new Exception("node id mismatch! this node already has a record for key " + newDataOperationalInfo.getEntityKey() + " and I'm the authority for it.");
}
}
-
+
private synchronized UDDIReplicationPortType getReplicationClient(String node) {
if (cache.containsKey(node)) {
return cache.get(node);
}
UDDIService svc = new UDDIService();
UDDIReplicationPortType replicationClient = svc.getUDDIReplicationPort();
-
+
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
try {
@@ -753,7 +856,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
sql.toString();
Query qry = em.createQuery(sql.toString());
qry.setMaxResults(1);
-
+
org.apache.juddi.model.ReplicationConfiguration resultList = (org.apache.juddi.model.ReplicationConfiguration) qry.getSingleResult();
for (Operator o : resultList.getOperator()) {
if (o.getOperatorNodeID().equalsIgnoreCase(node)) {
@@ -763,7 +866,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
}
tx.rollback();
-
+
} catch (Exception ex) {
logger.fatal("Node not found!" + node, ex);
} finally {
@@ -774,7 +877,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
//em.close();
return null;
-
+
}
private Map<String, UDDIReplicationPortType> cache = new HashMap<String, UDDIReplicationPortType>();
@@ -788,11 +891,11 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
long startTime = System.currentTimeMillis();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.DO_PING, QueryStatus.SUCCESS, procTime);
-
+
return node;
-
+
}
-
+
@SOAPBinding(parameterStyle = SOAPBinding.ParameterStyle.BARE)
@WebResult(name = "changeRecords", targetNamespace = "urn:uddi-org:repl_v3", partName = "body")
// @WebMethod(operationName = "get_changeRecords", action = "get_changeRecords")
@@ -805,7 +908,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
HighWaterMarkVectorType changesAlreadySeen = body.getChangesAlreadySeen();
BigInteger responseLimitCount = body.getResponseLimitCount();
HighWaterMarkVectorType responseLimitVector = body.getResponseLimitVector();
-
+
new ValidateReplication(null).validateGetChangeRecords(requestingNode, changesAlreadySeen, responseLimitCount, responseLimitVector, FetchEdges(), ctx);
//TODO should we validate that "requestingNode" is in the replication config?
@@ -838,7 +941,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
tx.begin();
Long firstrecord = 0L;
Long lastrecord = null;
-
+
if (changesAlreadySeen != null) {
//this is basically a lower limit (i.e. the newest record that was processed by the requestor
//therefore we want the oldest record stored locally to return to the requestor for processing
@@ -857,7 +960,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
}
}
-
+
logger.info("Query db for replication changes, lower index is " + (firstrecord) + " last index " + lastrecord + " record limit " + maxrecords);
Query createQuery = null;
/*
@@ -887,7 +990,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
createQuery.setMaxResults(maxrecords);
createQuery.setParameter("inbound", firstrecord);
createQuery.setParameter("node", node);
-
+
List<org.apache.juddi.model.ChangeRecord> records = (List<org.apache.juddi.model.ChangeRecord>) createQuery.getResultList();
logger.info(records.size() + " CR records returned from query");
for (int i = 0; i < records.size(); i++) {
@@ -895,18 +998,18 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (!Excluded(changesAlreadySeen, r)) {
ret.add(r);
}
-
+
}
-
+
tx.rollback();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.GET_CHANGERECORDS,
QueryStatus.SUCCESS, procTime);
-
+
} catch (Exception ex) {
logger.fatal("Error, this node is: " + node, ex);
throw new FatalErrorException(new ErrorMessage("E_fatalError", ex.getMessage()));
-
+
} finally {
if (tx.isActive()) {
tx.rollback();
@@ -934,12 +1037,12 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
public List<ChangeRecordIDType> getHighWaterMarks()
throws DispositionReportFaultMessage {
long startTime = System.currentTimeMillis();
-
+
List<ChangeRecordIDType> ret = new ArrayList<ChangeRecordIDType>();
//fetch from database the highest known watermark
ReplicationConfiguration FetchEdges = FetchEdges();
-
+
EntityManager em = PersistenceManager.getEntityManager();
EntityTransaction tx = em.getTransaction();
HashMap<String, Long> map = new HashMap<String, Long>();
@@ -962,7 +1065,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
//per the spec
}
map.put(nextNode, id);
-
+
}
}
}
@@ -977,21 +1080,21 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
x.setNodeID(node);
x.setOriginatingUSN(id);
ret.add(x);
-
+
tx.rollback();
long procTime = System.currentTimeMillis() - startTime;
serviceCounter.update(ReplicationQuery.GET_HIGHWATERMARKS, QueryStatus.SUCCESS, procTime);
-
+
} catch (Exception drfm) {
throw new FatalErrorException(new ErrorMessage("E_fatalError", drfm.getMessage()));
-
+
} finally {
if (tx.isActive()) {
tx.rollback();
}
em.close();
}
-
+
Iterator<Map.Entry<String, Long>> iterator = map.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> next = iterator.next();
@@ -1015,7 +1118,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
//some other node just told us there's new records available, call
//getChangeRecords from the remote node asynch
new ValidateReplication(null).validateNotifyChangeRecordsAvailable(body, ctx);
-
+
logger.info(body.getNotifyingNode() + " just told me that there are change records available, enqueuing...size is " + queue.size());
if (!queue.contains(body.getNotifyingNode())) {
queue.add(body.getNotifyingNode());
@@ -1069,9 +1172,9 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
if (!ok) {
throw new TransferNotAllowedException(new ErrorMessage("E_transferNotAllowedUnknownNode"));
}
-
+
new ValidateReplication(null).validateTransfer(em, body);
-
+
TransferEntities te = new TransferEntities();
te.setKeyBag(body.getKeyBag());
te.setTransferToken(body.getTransferToken());
@@ -1081,7 +1184,7 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
//discard the token
logger.debug("request validated, processing transfer");
List<ChangeRecord> executeTransfer = new UDDICustodyTransferImpl().executeTransfer(te, em, body.getTransferOperationalInfo().getAuthorizedName(), body.getTransferOperationalInfo().getNodeID());
-
+
for (ChangeRecord c : executeTransfer) {
try {
c.setChangeID(new ChangeRecordIDType());
@@ -1146,5 +1249,5 @@ public class UDDIReplicationImpl extends AuthenticatedService implements UDDIRep
}
}
}
-
+
}
http://git-wip-us.apache.org/repos/asf/juddi/blob/ba85baa1/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java
----------------------------------------------------------------------
diff --git a/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java b/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java
index 7e1b35b..a91171d 100644
--- a/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java
+++ b/juddi-core/src/main/java/org/apache/juddi/model/UddiEntity.java
@@ -40,6 +40,7 @@ public abstract class UddiEntity implements Comparable<UddiEntity>{
protected Date modifiedIncludingChildren;
protected String nodeId;
protected String authorizedName;
+ protected boolean xfer = false;
@Id
@Column(name = "entity_key", nullable = false, length = 255)
@@ -116,4 +117,13 @@ public abstract class UddiEntity implements Comparable<UddiEntity>{
else return 0;
}
+ public void setIsTransferInProgress(boolean b) {
+ xfer = b;
+ }
+ @Column(name="xfer", nullable=false)
+ public boolean getIsTransferInProgress()
+ {
+ return xfer;
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@juddi.apache.org
For additional commands, e-mail: commits-help@juddi.apache.org