You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/06/23 22:26:11 UTC
[06/11] usergrid git commit: Better handle different combinations of
possible duplicate entities and their respective versions.
Better handle different combinations of possible duplicate entities and their respective versions.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/70f50c22
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/70f50c22
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/70f50c22
Branch: refs/heads/release-2.1.1
Commit: 70f50c22dfae76471713a6a01eae772b1b7bd9b6
Parents: 5c81a02
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Jun 22 22:35:59 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Jun 22 22:35:59 2016 -0700
----------------------------------------------------------------------
.../UniqueValueSerializationStrategyImpl.java | 116 +++++++++++++------
...niqueValueSerializationStrategyImplTest.java | 115 ++++++++++++++++++
2 files changed, 197 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70f50c22/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
index a323746..2a53194 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImpl.java
@@ -71,6 +71,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
public static final int COL_VALUE = 0x0;
+ private final Comparator<UniqueValue> uniqueValueComparator = new UniqueValueComparator();
+
private final SerializationFig serializationFig;
protected final Keyspace keyspace;
@@ -266,7 +268,7 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
Iterator<Row<ScopedRowKey<FieldKey>, EntityVersion>> results =
keyspace.prepareQuery( CF_UNIQUE_VALUES ).setConsistencyLevel( consistencyLevel ).getKeySlice( keys )
- .withColumnRange(new RangeBuilder().setLimit(serializationFig.getBufferSize()).build())
+ .withColumnRange(new RangeBuilder().setLimit(serializationFig.getMaxLoadSize()).build())
.execute().getResult().iterator();
@@ -285,10 +287,8 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
continue;
}
- // these used duplicate tracking and cleanup
- UUID oldestEntityUUID = null;
- UniqueValueImpl firstUniqueValue = null;
+ List<UniqueValue> candidates = new ArrayList<>();
/**
* While iterating the columns, a rule is being enforced to only EVER return the oldest UUID. This means
@@ -304,55 +304,77 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
final EntityVersion entityVersion = columnList.next().getName();
- final UUID currentEntityUUID = entityVersion.getEntityId().getUuid();
- final UniqueValueImpl uniqueValue =
+ final UniqueValue uniqueValue =
new UniqueValueImpl(field, entityVersion.getEntityId(), entityVersion.getEntityVersion());
- // keep track of the first and oldest entity (not version),
- // knowing the the first one may not be the 'oldest'
- if(oldestEntityUUID == null){
+ // set the initial candidate and move on
+ if(candidates.size() == 0){
+ candidates.add(uniqueValue);
+ continue;
+ }
- oldestEntityUUID = currentEntityUUID;
- firstUniqueValue = uniqueValue;
+ final int result = uniqueValueComparator.compare(uniqueValue, candidates.get(candidates.size() -1));
- }else if(currentEntityUUID.timestamp() < oldestEntityUUID.timestamp()){
+ if(result == 0){
- oldestEntityUUID = currentEntityUUID;
- }
+ // do nothing, only versions can be newer and we're not worried about newer versions of same entity
+ if(logger.isTraceEnabled()){
+ logger.trace("Candidate unique value is equal to the current unique value");
+ }
+ // update candidate w/ latest version
+ candidates.add(uniqueValue);
- // only return the oldest (original) unique value entries
- if(currentEntityUUID.timestamp() <= oldestEntityUUID.timestamp() ){
+ }else if(result < 0){
- if (logger.isTraceEnabled()) {
- logger.trace("Putting unique value [{}={}] into result set with entity id [{}] and entity version [{}]",
- field.getName(), field.getValue().toString(),
- entityVersion.getEntityId(),
- entityVersion.getEntityVersion());
- }
+ // delete the duplicate from the unique value index
+ candidates.forEach(candidate -> {
- uniqueValueSet.addValue(uniqueValue);
+ try {
- }
- // remove the newer duplicated unique value entries
- else{
+ logger.warn("Duplicate unique value [{}={}] found, removing older entry " +
+ "with entity id [{}] and entity version [{}]", field.getName(), field.getValue().toString(),
+ candidate.getEntityId().getUuid(), candidate.getEntityVersion() );
- delete(appScope, uniqueValue ).execute();
+ delete(appScope, candidate ).execute();
+ } catch (ConnectionException e) {
+ // do nothing for now
+ }
- }
+ });
- }
+ candidates.clear();
+
+ if(logger.isTraceEnabled()) {
+ logger.info("Updating candidate to entity id [{}] and entity version [{}]",
+ uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion());
+
+ }
- // check to see if the very first unique value entry was overwritten because it was not the oldest
- // if so, clean up its unique index
- if( !uniqueValueSet.getValue( firstUniqueValue.getField().getName() )
- .getEntityId().getUuid().equals(firstUniqueValue.getEntityId().getUuid()) ){
+ // add our new candidate to the list
+ candidates.add(uniqueValue);
+
+ // add our new candidate to the result set
+ //uniqueValueSet.addValue(candidate);
+
+ }else{
+
+ logger.warn("Duplicate unique value [{}={}] found, removing newer entry " +
+ "with entity id [{}] and entity version [{}]", field.getName(), field.getValue().toString(),
+ uniqueValue.getEntityId().getUuid(), uniqueValue.getEntityVersion() );
+
+ // delete the duplicate from the unique value index
+ delete(appScope, uniqueValue ).execute();
+
+
+ }
- delete(appScope, firstUniqueValue).execute();
}
+ // take the last candidate ( should be the latest version) and add to the result set
+ uniqueValueSet.addValue(candidates.get(candidates.size() -1));
}
@@ -472,4 +494,30 @@ public abstract class UniqueValueSerializationStrategyImpl<FieldKey, EntityKey>
* @param uniqueValueId The uniqueValue
*/
protected abstract EntityKey createEntityUniqueLogKey(final Id applicationId, final Id uniqueValueId );
+
+
+
+ private class UniqueValueComparator implements Comparator<UniqueValue> {
+
+ @Override
+ public int compare(UniqueValue o1, UniqueValue o2) {
+
+ if( o1.getEntityId().getUuid().equals(o2.getEntityId().getUuid())){
+
+ return 0;
+
+ }else if( o1.getEntityId().getUuid().timestamp() < o2.getEntityId().getUuid().timestamp()){
+
+ return -1;
+
+ }
+
+ // if the UUIDs are not equal and o1's timestamp is not less than o2's timestamp,
+ // then o1 must be greater than o2
+ return 1;
+
+
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/70f50c22/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
index 6c4e72b..ed3e42b 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/UniqueValueSerializationStrategyImplTest.java
@@ -442,4 +442,119 @@ public abstract class UniqueValueSerializationStrategyImplTest {
}
+ @Test
+ public void testDuplicateEntitiesDescending() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId3, version1 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
+
+
+ strategy.write( scope, stored1 ).execute();
+ strategy.write( scope, stored2 ).execute();
+ strategy.write( scope, stored3 ).execute();
+
+
+ // load descending to get the older version of entity for this unique value
+ UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field ));
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored3, retrieved );
+
+
+ }
+
+ @Test
+ public void testDuplicateEntitiesAscending() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version1 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version2 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId3, version3 );
+
+
+ strategy.write( scope, stored1 ).execute();
+ strategy.write( scope, stored2 ).execute();
+ strategy.write( scope, stored3 ).execute();
+
+
+ // load descending to get the older version of entity for this unique value
+ UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field ));
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored1, retrieved );
+
+
+ }
+
+ @Test
+ public void testMixedDuplicates() throws ConnectionException, InterruptedException {
+
+ ApplicationScope scope =
+ new ApplicationScopeImpl( new SimpleId( "organization" ) );
+
+ IntegerField field = new IntegerField( "count", 5 );
+ Id entityId1 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId2 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+ Id entityId3 = new SimpleId( UUIDGenerator.newTimeUUID(), "entity" );
+
+
+
+ UUID version1 = UUIDGenerator.newTimeUUID();
+ UUID version2 = UUIDGenerator.newTimeUUID();
+ UUID version3 = UUIDGenerator.newTimeUUID();
+ UUID version4 = UUIDGenerator.newTimeUUID();
+ UUID version5 = UUIDGenerator.newTimeUUID();
+
+ UniqueValue stored1 = new UniqueValueImpl( field, entityId1, version5 );
+ UniqueValue stored2 = new UniqueValueImpl( field, entityId2, version4 );
+ UniqueValue stored3 = new UniqueValueImpl( field, entityId1, version3 );
+ UniqueValue stored4 = new UniqueValueImpl( field, entityId3, version2 );
+ UniqueValue stored5 = new UniqueValueImpl( field, entityId3, version1 );
+
+
+
+ strategy.write( scope, stored1 ).execute();
+ strategy.write( scope, stored2 ).execute();
+ strategy.write( scope, stored3 ).execute();
+ strategy.write( scope, stored4 ).execute();
+ strategy.write( scope, stored5 ).execute();
+
+
+ // load descending to get the older version of entity for this unique value
+ UniqueValueSet fields = strategy.load( scope, entityId1.getType(), Collections.<Field>singleton( field ));
+
+ UniqueValue retrieved = fields.getValue( field.getName() );
+ assertEquals( stored1, retrieved );
+
+
+ }
+
}