You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2015/11/19 00:48:11 UTC

usergrid git commit: Fixes issue that caused shards to get removed prematurely.

Repository: usergrid
Updated Branches:
  refs/heads/release 784fe51ca -> 9bc22410d


Fixes issue that caused shards to get removed prematurely.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9bc22410
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9bc22410
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9bc22410

Branch: refs/heads/release
Commit: 9bc22410dd785482f715faec4cbfeae0e712a502
Parents: 784fe51
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Nov 18 15:48:07 2015 -0800
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Nov 18 15:48:07 2015 -0800

----------------------------------------------------------------------
 .../impl/EdgeSerializationImpl.java             | 61 ++++++++++++++++++--
 .../shard/impl/EdgeShardSerializationImpl.java  |  3 +-
 .../shard/impl/ShardGroupColumnIterator.java    |  4 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  2 +-
 .../impl/shard/EdgeShardSerializationTest.java  | 44 ++++++++++++++
 5 files changed, 106 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 9e25946..0f4d722 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -26,15 +26,15 @@ import java.util.UUID;
 
 import javax.inject.Inject;
 
+import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.*;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
@@ -303,6 +303,15 @@ public class EdgeSerializationImpl implements EdgeSerialization {
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
             }
+
+            @Override
+            protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+                final SearchByEdge searchFullRange = new SimpleSearchByEdge(
+                    search.sourceNode(), search.getType(),search.targetNode(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent());
+
+                return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, searchFullRange, readShards );
+            }
         };
     }
 
@@ -329,6 +338,15 @@ public class EdgeSerializationImpl implements EdgeSerialization {
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
             }
+
+            @Override
+            protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+                final SearchByEdgeType searchFullRange = new SimpleSearchByEdgeType(
+                    edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING, Optional.absent(), false );
+
+                return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, searchFullRange, readShards );
+            }
         };
     }
 
@@ -357,6 +375,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
                 return shardedEdgeSerialization
                         .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
             }
+
+            @Override
+            protected Iterator<MarkedEdge> getIteratorFullRange (final Collection<Shard> readShards) {
+
+                final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType(
+                    edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    edgeType.getIdType(), Optional.absent(), false );
+
+                return shardedEdgeSerialization
+                    .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards);
+            }
         };
     }
 
@@ -382,6 +411,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
             }
+
+            @Override
+            protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+                final SearchByEdgeType edgeTypeFullRange = new SimpleSearchByEdgeType(
+                    edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    Optional.absent(), false );
+
+                return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeTypeFullRange, readShards );
+            }
+
         };
     }
 
@@ -411,6 +451,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
                 return shardedEdgeSerialization
                         .getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
             }
+
+            @Override
+            protected Iterator<MarkedEdge> getIteratorFullRange( final Collection<Shard> readShards ) {
+
+                final SearchByIdType edgeTypeFullRange = new SimpleSearchByIdType(
+                    edgeType.getNode(), edgeType.getType(), Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    edgeType.getIdType(), Optional.absent(), false );
+
+                return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeTypeFullRange, readShards);
+
+            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index f107307..2f83d6f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -156,7 +156,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
+        batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey )
+            .deleteColumn( shard.getShardIndex() );
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index 72b617f..9604e63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -117,6 +117,8 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
      */
     protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards );
 
+    protected abstract Iterator<MarkedEdge> getIteratorFullRange( Collection<Shard> readShards );
+
 
     public boolean advance() {
 
@@ -141,7 +143,7 @@ public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
                 logger.trace( "Our shard is empty, we need to perform an audit on shard group {}", group );
 
                 //fire and forget if we miss it here, we'll get it next read
-                shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIterator( group.getReadShards() ) );
+                shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIteratorFullRange( group.getReadShards() ) );
 
 
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 7dd0521..e663d5a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -150,7 +150,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             .checkArgument( group.shouldCompact( startTime ), "Compaction cannot be run yet.  Ignoring compaction." );
 
         if(LOG.isDebugEnabled()) {
-            LOG.debug("Compacting shard group. count is {} ", countAudits.get());
+            LOG.debug("Compacting shard group. Audit count is {} ", countAudits.get());
         }
         final CompactionResult.CompactionBuilder resultBuilder = CompactionResult.builder();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9bc22410/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 2641ed7..1f8bfa9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -40,6 +40,8 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
@@ -52,6 +54,8 @@ import static org.mockito.Mockito.when;
 @UseModules({ TestGraphModule.class })
 public class EdgeShardSerializationTest {
 
+    private static final Logger log = LoggerFactory.getLogger( EdgeShardSerializationTest.class );
+
 
     @Inject
     @Rule
@@ -203,4 +207,44 @@ public class EdgeShardSerializationTest {
 
         assertFalse( results.hasNext() );
     }
+
+    @Test
+    public void sameShardIndexRemoval() throws ConnectionException {
+
+        final Id now = IdGenerator.createId( "test" );
+
+        final long timestamp = 2000L;
+
+        final Shard shard1 = new Shard( 1000L, timestamp, false );
+        final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true );
+
+
+        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" );
+        MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta );
+        batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) );
+        batch.execute();
+
+
+        Iterator<Shard> results =
+            edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+
+        // Latest timestamp  comes first
+        assertEquals( shard2, results.next() );
+
+        // This should now not remove anything
+        edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute();
+
+
+        // Get iterator again
+        results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
+
+        // We should still have shard3 stored
+        assertEquals( shard2, results.next() );
+
+
+
+    }
+
+
+
 }