You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/11/19 00:48:11 UTC
usergrid git commit: Fixes issue that caused shards to get removed
prematurely.
Repository: usergrid
Updated Branches:
refs/heads/release 784fe51ca -> 9bc22410d
Fixes issue that caused shards to get removed prematurely.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9bc22410
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9bc22410
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9bc22410
Branch: refs/heads/release
Commit: 9bc22410dd785482f715faec4cbfeae0e712a502
Parents: 784fe51
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Nov 18 15:48:07 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Nov 18 15:48:07 2015 -0800
----------------------------------------------------------------------
.../impl/EdgeSerializationImpl.java | 61 ++++++++++++++++++--
.../shard/impl/EdgeShardSerializationImpl.java | 3 +-
.../shard/impl/ShardGroupColumnIterator.java | 4 +-
.../shard/impl/ShardGroupCompactionImpl.java | 2 +-
.../impl/shard/EdgeShardSerializationTest.java | 44 ++++++++++++++
5 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 9e25946..0f4d722 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -26,15 +26,15 @@ import java.util.UUID;
import javax.inject.Inject;
+import com.google.common.base.Optional;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
@@ -303,6 +303,15 @@ public class EdgeSerializationImpl implements EdgeSerialization {
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
}
+
+ @Override
+ protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+ final SearchByEdge searchFullRange = new SimpleSearchByEdge(
+ search.sourceNode(), search.getType(),search.targetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent());
+
+ return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, searchFullRange, readShards );
+ }
};
}
@@ -329,6 +338,15 @@ public class EdgeSerializationImpl implements EdgeSerialization {
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
}
+
+ @Override
+ protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+ final SearchByEdgeType searchFullRange = new SimpleSearchByEdgeType(
+ edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent(), false );
+
+ return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, searchFullRange, readShards );
+ }
};
}
@@ -357,6 +375,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
return shardedEdgeSerialization
.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
}
+
+ @Override
+ protected Iterator<MarkedEdge> getIteratorFullRange (final Collection<Shard> readShards) {
+
+ final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType(
+ edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ edgeType.getIdType(), Optional.absent(), false );
+
+ return shardedEdgeSerialization
+ .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards);
+ }
};
}
@@ -382,6 +411,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
}
+
+ @Override
+ protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+ final SearchByEdgeType edgeTypeFullRange = new SimpleSearchByEdgeType(
+ edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.absent(), false );
+
+ return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeTypeFullRange, readShards );
+ }
+
};
}
@@ -411,6 +451,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
return shardedEdgeSerialization
.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
}
+
+ @Override
+ protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+ final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType(
+ edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ edgeType.getIdType(), Optional.absent(), false );
+
+ return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards);
+
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index f107307..2f83d6f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -156,7 +156,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
+ batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey )
+ .deleteColumn( shard.getShardIndex() );
return batch;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index 72b617f..9604e63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -117,6 +117,8 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
*/
protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards );
+ protected abstract Iterator<MarkedEdge> getIteratorFullRange( Collection<Shard> readShards );
+
public boolean advance() {
@@ -141,7 +143,7 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
logger.trace( "Our shard is empty, we need to perform an audit on shard group {}", group );
//fire and forget if we miss it here, we'll get it next read
- shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIterator( group.getReadShards() ) );
+ shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIteratorFullRange( group.getReadShards() ) );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 7dd0521..e663d5a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -150,7 +150,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
.checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet. Ignoring compaction." );
if(LOG.isDebugEnabled()) {
- LOG.debug("Compacting shard group. count is {} ", countAudits.get());
+ LOG.debug("Compacting shard group. Audit count is {} ", countAudits.get());
}
final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 2641ed7..1f8bfa9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -40,6 +40,8 @@ import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals;
@@ -52,6 +54,8 @@ import static org.mockito.Mockito.when;
@UseModules({ TestGraphModule.class })
public class EdgeShardSerializationTest {
+ private static final Logger log = LoggerFactory.getLogger( EdgeShardSerializationTest.class );
+
@Inject
@Rule
@@ -203,4 +207,44 @@ public class EdgeShardSerializationTest {
assertFalse( results.hasNext() );
}
+
+ @Test
+ public void sameShardIndexRemoval() throws ConnectionException {
+
+ final Id now = IdGenerator.createId( "test" );
+
+ final long timestamp = 2000L;
+
+ final Shard shard1 = new Shard( 1000L, timestamp, false );
+ final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true );
+
+
+ final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" );
+ MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta );
+ batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) );
+ batch.execute();
+
+
+ Iterator<Shard> results =
+ edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+
+ // Latest timestamp comes first
+ assertEquals( shard2, results.next() );
+
+ // This should now not remove anything
+ edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute();
+
+
+ // Get iterator again
+ results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+
+ // We should still have shard3 stored
+ assertEquals( shard2, results.next() );
+
+
+
+ }
+
+
+
}