You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/28 18:21:03 UTC

[2/5] usergrid git commit: Move to shard deletion being a mark plus read filtering strategy vs. deleting. This is so Usergrid can keep proper shard end times and 're-activate' a shard by flipping the deleted flag when past data is re-written to a collec

Move to shard deletion being a mark plus read filtering strategy vs. deleting.  This is so Usergrid can keep proper shard end times and 're-activate' a shard by flipping the deleted flag when past data is re-written to a collection.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/42c5c4b8
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/42c5c4b8
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/42c5c4b8

Branch: refs/heads/hotfix-20160819
Commit: 42c5c4b8760b78ea1e8ef9b8d583a92f2a45e1d4
Parents: dbdd243
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Aug 27 23:24:01 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Aug 27 23:24:01 2016 -0700

----------------------------------------------------------------------
 .../astyanax/MultiRowShardColumnIterator.java   |  27 ++
 .../persistence/core/shard/SmartShard.java      |   9 +-
 .../impl/shard/NodeShardAllocation.java         |   5 +-
 .../graph/serialization/impl/shard/Shard.java   |  17 +-
 .../impl/shard/impl/EdgeSearcher.java           |  17 +-
 .../shard/impl/NodeShardAllocationImpl.java     |   9 +-
 .../impl/shard/impl/NodeShardCacheImpl.java     |  24 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |   2 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java |  24 +-
 .../impl/ShardedEdgeSerializationImpl.java      | 106 +++--
 .../shard/impl/serialize/ShardSerializer.java   |  11 +-
 .../graph/GraphManagerShardConsistencyIT.java   | 396 +++++++++++++++++--
 .../impl/shard/NodeShardAllocationTest.java     |   4 +-
 .../impl/shard/NodeShardCacheTest.java          |   4 +-
 14 files changed, 559 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
index f3e8d4c..04266e1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -127,6 +127,23 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
             // advance to the next shard
             currentShard = currentShardIterator.next();
 
+            // handle marked deleted shards
+            if( currentShard.isDeleted() && currentShardIterator.hasNext()){
+
+                if(logger.isTraceEnabled()) {
+                    logger.trace("Shard is marked deleted, advancing to next - {}", currentShard);
+                }
+
+                currentShard = currentShardIterator.next();
+            }else if ( currentShard.isDeleted() && !currentShardIterator.hasNext()){
+
+                if(logger.isTraceEnabled()) {
+                    logger.trace("Shard is marked deleted, and there is no more - {}", currentShard);
+                }
+
+                return false;
+            }
+
             if(logger.isTraceEnabled()){
                 logger.trace("Shard after advance: {}", currentShard);
 
@@ -221,6 +238,16 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
                 }
             }
 
+            // skip over shards that are marked deleted
+            if( currentShard.isDeleted() && currentShardIterator.hasNext() ){
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("Shard is marked deleted - {}", currentShard);
+                }
+
+                currentShard = currentShardIterator.next();
+            }
+
 
             if(logger.isTraceEnabled()){
                 logger.trace("all shards when starting: {}", rowKeysWithShardEnd);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
index 39ddc35..dd6df34 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -25,13 +25,15 @@ public class SmartShard<R, T> {
     final ScopedRowKey<R> rowKey;
     final T shardEnd;
     final long shardIndex;
+    final boolean isDeleted;
 
 
-    public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd){
+    public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd, final boolean isDeleted){
 
         this.rowKey = rowKey;
         this.shardIndex = shardIndex;
         this.shardEnd = shardEnd;
+        this.isDeleted = isDeleted;
     }
 
 
@@ -48,12 +50,15 @@ public class SmartShard<R, T> {
         return shardIndex;
     }
 
+    public boolean isDeleted(){
+        return isDeleted;
+    }
 
 
     @Override
     public String toString(){
 
-        return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }";
+        return "Shard { rowKey="+rowKey + ", shardIndex="+shardIndex+", shardEnd="+shardEnd+", isDeleted="+isDeleted+" }";
 
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index fcc0fc3..49fde9b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
-import com.google.common.base.Optional;
-
 
 /**
  * Interface used to create and retrieve shards
@@ -38,11 +36,10 @@ public interface NodeShardAllocation {
      * Get all shards for the given info.  If none exist, a default shard should be allocated.  The nodeId is the source node
      *
      * @param scope The application scope
-     * @param maxShardId The max value to start seeking from.  Values <= this will be returned if specified
      * @param directedEdgeMeta The directed edge metadata to use
      * @return A list of all shards <= the current shard.  This will always return 0l if no shards are allocated
      */
-    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta );
+    public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta);
 
 
     /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 6394703..f92c37a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -32,8 +32,9 @@ public class Shard implements Comparable<Shard> {
 
     private final long shardIndex;
     private final long createdTime;
-    private final boolean compacted;
+    private boolean compacted;
     private Optional<DirectedEdge> shardEnd;
+    private boolean deleted;
 
 
     public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
@@ -41,6 +42,7 @@ public class Shard implements Comparable<Shard> {
         this.createdTime = createdTime;
         this.compacted = compacted;
         this.shardEnd = Optional.absent();
+        this.deleted = false;
     }
 
 
@@ -67,6 +69,10 @@ public class Shard implements Comparable<Shard> {
         return compacted;
     }
 
+    public void setCompacted(final boolean compacted){
+        this.compacted = compacted;
+    }
+
 
     /**
      * Returns true if this is the minimum shard
@@ -84,6 +90,14 @@ public class Shard implements Comparable<Shard> {
         return shardEnd;
     }
 
+    public boolean isDeleted(){
+        return deleted;
+    }
+
+    public void setDeleted( final boolean deleted){
+        this.deleted = deleted;
+    }
+
 
     /**
      * Compare the shards based on the timestamp first, then the created time second
@@ -174,6 +188,7 @@ public class Shard implements Comparable<Shard> {
         }else{
             string.append("null");
         }
+        string.append(", isDeleted=").append(deleted);
         string.append(" }");
 
         return string.toString();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 917e943..eb90866 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -80,6 +80,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
 
     public List<ScopedRowKey<R>> getRowKeys() {
 
+        if(logger.isTraceEnabled()) {
+            logger.trace("Shards: {}", shards);
+        }
+
         List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
 
         for(Shard shard : shards){
@@ -89,12 +93,18 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
             rowKeys.add( rowKey );
         }
 
+        if(logger.isTraceEnabled()) {
+            logger.trace("Resulting Shards: {}", rowKeys);
+        }
 
         return rowKeys;
     }
 
     public List<SmartShard> getRowKeysWithShardEnd(){
 
+        if(logger.isTraceEnabled()) {
+            logger.trace("Shards: {}", shards);
+        }
 
         final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size());
 
@@ -111,9 +121,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
                 shardEnd = null;
             }
 
-            rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd));
+            rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd, shard.isDeleted()));
         }
 
+        if(logger.isTraceEnabled()) {
+            logger.trace("Resulting Smart Shards: {}", rowKeysWithShardEnd);
+        }
+
+
         return rowKeysWithShardEnd;
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 47b630f..cf9a51c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.util.Collections;
 import java.util.Iterator;
 
+import com.google.common.base.Optional;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
@@ -78,11 +78,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     @Override
-    public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId,
-                                                final DirectedEdgeMeta directedEdgeMeta ) {
+    public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope,
+                                               final DirectedEdgeMeta directedEdgeMeta) {
 
         ValidationUtils.validateApplicationScope( scope );
-        Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
         GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
 
         Iterator<Shard> existingShards;
@@ -93,7 +92,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         }
 
         else {
-            existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+            existingShards = edgeShardSerialization.getShardMetaData( scope, Optional.absent(), directedEdgeMeta );
 
             /**
              * We didn't get anything out of cassandra, so we need to create the minimum shard

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 0a259f0..8e3be4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -21,42 +21,26 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 import java.beans.PropertyChangeEvent;
 import java.beans.PropertyChangeListener;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.inject.Singleton;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
@@ -164,7 +148,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
         } else {
 
-            entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ));
+            entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta ));
 
         }
 
@@ -330,7 +314,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
             final Iterator<ShardEntryGroup> edges =
-                nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+                nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta );
 
             final CacheEntry cacheEntry = new CacheEntry( edges );
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 0853adb..8db491f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -346,7 +346,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
             //Overwrite our shard index with a newly created one that has been marked as compacted
             Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
-            compactedShard.setShardEnd(targetShard.getShardEnd());
+            compactedShard.setShardEnd(Optional.absent());
 
             if(logger.isTraceEnabled()) {
                 logger.trace("Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", compactedShard);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 2fc0d50..7d91eda 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -191,21 +191,35 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
                 return DeleteResult.NO_OP;
             }
 
+            if(shard.isDeleted()){
+                if(logger.isTraceEnabled()){
+                    logger.trace("Shard {} already deleted.  Short circuiting.", shard);
+                }
+                return DeleteResult.NO_OP;
+            }
+
+            shard.setDeleted(true);
+
+            final MutationBatch setShardDeletedFlagMutation =
+                edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta );
+
+            /* Previously the below was used for actually deleting the shard vs.a marking strategy with read filtering
 
-            final MutationBatch shardRemovalMutation =
-                edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+                 final MutationBatch shardRemovalMutation =
+                    edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+            */
 
             if ( rollup == null ) {
-                rollup = shardRemovalMutation;
+                rollup = setShardDeletedFlagMutation;
             }
 
             else {
-                rollup.mergeShallow( shardRemovalMutation );
+                rollup.mergeShallow( setShardDeletedFlagMutation );
             }
 
             result = DeleteResult.DELETED;
 
-            logger.info( "Removing shard {} in group {}", shard, shardEntryGroup );
+            logger.info( "{} - Marking shard {} as deleted in group {}", Thread.currentThread().getName()shard, shardEntryGroup );
         }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 65a6f40..55eb172 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -43,15 +43,7 @@ import org.apache.usergrid.persistence.graph.SearchByEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
@@ -88,12 +80,15 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
     protected final GraphFig graphFig;
     protected final EdgeShardStrategy writeEdgeShardStrategy;
     protected final TimeService timeService;
+    protected final EdgeShardSerialization edgeShardSerialization;
+
 
 
     @Inject
     public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
                                          final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
-                                         final TimeService timeService ) {
+                                         final TimeService timeService,
+                                         final EdgeShardSerialization edgeShardSerialization ) {
 
 
         checkNotNull( "keyspace required", keyspace );
@@ -101,6 +96,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         checkNotNull( "consistencyFig required", graphFig );
         checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
         checkNotNull( "timeService required", timeService );
+        checkNotNull( "edgeShardSerialization required", edgeShardSerialization );
+
 
 
         this.keyspace = keyspace;
@@ -108,6 +105,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         this.graphFig = graphFig;
         this.writeEdgeShardStrategy = writeEdgeShardStrategy;
         this.timeService = timeService;
+        this.edgeShardSerialization = edgeShardSerialization;
     }
 
 
@@ -119,7 +117,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         GraphValidation.validateEdge( markedEdge );
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
-        return new SourceWriteOp( columnFamilies, markedEdge ) {
+        return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -144,7 +142,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
 
-        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -168,7 +166,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
 
-        return new TargetWriteOp( columnFamilies, markedEdge ) {
+        return new TargetWriteOp( columnFamilies, markedEdge, targetEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -194,7 +192,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
 
-        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -219,7 +217,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
 
 
-        return new EdgeVersions( columnFamilies, markedEdge ) {
+        return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -238,7 +236,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                                                final MarkedEdge markedEdge, final Collection<Shard> shards,
                                                final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
-        return new SourceWriteOp( columnFamilies, markedEdge ) {
+        return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -258,7 +256,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                                                              final Collection<Shard> shards,
                                                              final DirectedEdgeMeta directedEdgeMeta,
                                                              final UUID timestamp ) {
-        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+        return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -279,7 +277,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                                              final MarkedEdge markedEdge, final Collection<Shard> shards,
                                              final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
-        return new TargetWriteOp( columnFamilies, markedEdge ) {
+        return new TargetWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -300,7 +298,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                                                            final DirectedEdgeMeta directedEdgeMeta,
                                                            final UUID timestamp ) {
 
-        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+        return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -320,7 +318,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                                              final MarkedEdge markedEdge, final Collection<Shard> shards,
                                              final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
 
-        return new EdgeVersions( columnFamilies, markedEdge ) {
+        return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) {
 
             @Override
             void writeEdge( final MutationBatch batch,
@@ -696,6 +694,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
          */
         protected abstract boolean isDeleted();
 
+        /**
+         * Get the directed edge meta for the op
+         */
+        protected abstract DirectedEdgeMeta getDirectedEdgeMeta();
+
 
         /**
          * Write the edge with the given data
@@ -725,6 +728,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             for ( Shard shard : shards ) {
                 final R rowKey = getRowKey( shard );
                 writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+
+                // if an edge is being written to this shard, un-delete it in case it was previously marked
+                // don't un-delete if the edge write is to actually remove an edge
+                // Usergrid allows entities to be written with a UUID generated from the past (time)
+                if(shard.isDeleted() && !isDeleted) {
+                    logger.info("Shard is deleted. Un-deleting as new data is being written to the shard - {}", shard);
+                    shard.setDeleted(false);
+                    batch.mergeShallow(edgeShardSerialization.writeShardMeta(scope, shard, getDirectedEdgeMeta()));
+                }
+
             }
 
 
@@ -745,12 +758,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         private final String type;
         private final boolean isDeleted;
         private final DirectedEdge directedEdge;
+        private final DirectedEdgeMeta directedEdgeMeta;
 
 
         /**
          * Write the source write operation
          */
-        private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+        private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+                               final DirectedEdgeMeta directedEdgeMeta ) {
             this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
 
             this.sourceNodeId = markedEdge.getSourceNode();
@@ -759,6 +774,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             this.isDeleted = markedEdge.isDeleted();
 
             this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -784,6 +800,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         protected boolean isDeleted() {
             return isDeleted;
         }
+
+        @Override
+        protected DirectedEdgeMeta getDirectedEdgeMeta() {
+            return directedEdgeMeta;
+        }
     }
 
 
@@ -798,12 +819,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         private Id targetId;
         private final boolean isDeleted;
         private final DirectedEdge directedEdge;
+        private final DirectedEdgeMeta directedEdgeMeta;
 
 
         /**
          * Write the source write operation
          */
-        private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+        private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+                                         final DirectedEdgeMeta directedEdgeMeta ) {
             this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
 
             this.sourceNodeId = markedEdge.getSourceNode();
@@ -813,6 +836,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             this.isDeleted = markedEdge.isDeleted();
 
             this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -838,6 +862,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         protected boolean isDeleted() {
             return isDeleted;
         }
+
+        @Override
+        protected DirectedEdgeMeta getDirectedEdgeMeta() {
+            return directedEdgeMeta;
+        }
     }
 
 
@@ -853,12 +882,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         private final String type;
         private final boolean isDeleted;
         private final DirectedEdge directedEdge;
+        private final DirectedEdgeMeta directedEdgeMeta;
 
 
         /**
          * Write the source write operation
          */
-        private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+        private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+                               final DirectedEdgeMeta directedEdgeMeta ) {
             this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
 
             this.targetNode = markedEdge.getTargetNode();
@@ -867,6 +898,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             this.isDeleted = markedEdge.isDeleted();
 
             this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -892,6 +925,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         protected boolean isDeleted() {
             return isDeleted;
         }
+
+        @Override
+        protected DirectedEdgeMeta getDirectedEdgeMeta() {
+            return directedEdgeMeta;
+        }
     }
 
 
@@ -909,12 +947,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
         final boolean isDeleted;
         final DirectedEdge directedEdge;
+        final DirectedEdgeMeta directedEdgeMeta;
 
 
         /**
          * Write the source write operation
          */
-        private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+        private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+                                         final DirectedEdgeMeta directedEdgeMeta ) {
             this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
 
             this.targetNode = markedEdge.getTargetNode();
@@ -924,6 +964,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             this.isDeleted = markedEdge.isDeleted();
 
             this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -949,6 +991,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         protected boolean isDeleted() {
             return isDeleted;
         }
+
+        @Override
+        protected DirectedEdgeMeta getDirectedEdgeMeta() {
+            return directedEdgeMeta;
+        }
     }
 
 
@@ -967,11 +1014,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         final boolean isDeleted;
         final Long edgeVersion;
 
+        final DirectedEdgeMeta directedEdgeMeta;
+
 
         /**
          * Write the source write operation
          */
-        private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+        private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+                              final DirectedEdgeMeta directedEdgeMeta ) {
             this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
 
             this.targetNode = markedEdge.getTargetNode();
@@ -981,6 +1031,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
             this.isDeleted = markedEdge.isDeleted();
 
             this.edgeVersion = markedEdge.getTimestamp();
+            this.directedEdgeMeta = directedEdgeMeta;
         }
 
 
@@ -1006,6 +1057,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         protected boolean isDeleted() {
             return isDeleted;
         }
+
+        @Override
+        protected DirectedEdgeMeta getDirectedEdgeMeta() {
+            return directedEdgeMeta;
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
index 8ab6288..b47f7cc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
@@ -58,6 +58,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
         }
 
         composite.addComponent( shard.isCompacted(), BOOLEAN_SERIALIZER);
+        composite.addComponent( shard.isDeleted(), BOOLEAN_SERIALIZER);
 
         return composite.serialize();
     }
@@ -67,7 +68,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
     public Shard fromByteBuffer( final ByteBuffer byteBuffer ) {
 
         DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
-        Preconditions.checkArgument( composite.size() == 5, "Composite should 5 elements" );
+        Preconditions.checkArgument( composite.size() == 5 || composite.size() == 6, "Composite should 5 elements" );
 
 
         final byte version = composite.get(0, BYTE_SERIALIZER);
@@ -76,9 +77,15 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
         final DirectedEdge shardEnd = composite.get( 3, EDGE_SERIALIZER);
         final boolean isCompacted = composite.get( 4, BOOLEAN_SERIALIZER);
 
-
         final Shard shard = new Shard(shardIndex, shardCreated, isCompacted);
         shard.setShardEnd(Optional.fromNullable(shardEnd));
+
+        if( composite.size() == 6){
+            final boolean isDeleted = composite.get( 5, BOOLEAN_SERIALIZER);
+            shard.setDeleted(isDeleted);
+        }
+
+
         return shard;
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 0d6a27e..6e6abd8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -22,12 +22,7 @@ package org.apache.usergrid.persistence.graph;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -136,7 +131,7 @@ public class GraphManagerShardConsistencyIT {
         // get the system property of the UUID to use.  If one is not set, use the defualt
         String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
 
-        scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
+        scope = new ApplicationScopeImpl( IdGenerator.createId(UUIDGenerator.newTimeUUID(), "test" ) );
 
 
         reporter =
@@ -149,19 +144,24 @@ public class GraphManagerShardConsistencyIT {
 
 
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
         reporter.stop();
         reporter.report();
 
-        if(writeExecutor != null){
+        if(writeExecutor != null && !writeExecutor.isShutdown()){
             writeExecutor.shutdownNow();
+            writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
 
         }
-        if(deleteExecutor != null){
+        if(deleteExecutor != null && !deleteExecutor.isShutdown()){
             deleteExecutor.shutdownNow();
+            deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+
 
         }
 
+        Thread.sleep(3000);
+
     }
 
 
@@ -251,7 +251,7 @@ public class GraphManagerShardConsistencyIT {
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
                 Future<Boolean> future =
-                    writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+                    writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
 
                 futures.add( future );
             }
@@ -299,7 +299,9 @@ public class GraphManagerShardConsistencyIT {
                 @Override
                 public void onSuccess( @Nullable final Long result ) {
                     logger.info( "Successfully ran the read, re-running" );
-                    writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+                    if( !writeExecutor.isShutdown() ) {
+                        writeExecutor.submit(new ReadWorker(gmf, generator, writeCount, readMeter));
+                    }
                 }
 
 
@@ -383,11 +385,12 @@ public class GraphManagerShardConsistencyIT {
         }
 
 
-        //now continue reading everything for 30 seconds
+        //now continue reading everything for 30 seconds to make sure things are OK
 
         Thread.sleep(30000);
 
-        writeExecutor.shutdownNow();
+        //writeExecutor.shutdownNow();
+        //writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
     }
 
 
@@ -494,7 +497,7 @@ public class GraphManagerShardConsistencyIT {
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
                 Future<Boolean> future =
-                    deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+                    deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
 
                 futures.add( future );
             }
@@ -563,14 +566,18 @@ public class GraphManagerShardConsistencyIT {
                 .filter(markedEdge -> {
 
                     // if it's already been marked let's filter, move on as async deleteEdge()
-                    logger.trace("edge already marked, may indicated a problem with gm.deleteEdge(): {}", markedEdge);
-                    return !markedEdge.isDeleted();
+                    if(markedEdge.isDeleted()) {
+                        logger.info("Edge already marked, but gm.deleteEdge() is async, Edge: {}", markedEdge);
+                    }
+                    //return !markedEdge.isDeleted();
+                    return true;
                 })
                 .flatMap( edge -> manager.markEdge( edge ))
                      .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
 
             totalDeleted += count;
-            Thread.sleep( 500 );
+            logger.info("Sleeping 250ms second because deleteEdge() is async.");
+            Thread.sleep( 250 );
         }
 
 
@@ -591,7 +598,9 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public void onSuccess( @Nullable final Long result ) {
                 logger.info( "Successfully ran the read, re-running" );
-                deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
+                if( !deleteExecutor.isShutdown() ) {
+                    deleteExecutor.submit(new ReadWorker(gmf, generator, 0, readMeter));
+                }
             }
 
 
@@ -641,7 +650,16 @@ public class GraphManagerShardConsistencyIT {
 
                 logger.info( "Shard size for group is {}", group.getReadShards() );
 
-                shardCount += group.getReadShards().size();
+                Collection<Shard> shards = group.getReadShards();
+
+                for(Shard shard: shards){
+
+                    if(!shard.isDeleted()){
+                        shardCount++;
+                    }
+                }
+
+                //shardCount += group.getReadShards().size();
             }
 
 
@@ -657,12 +675,337 @@ public class GraphManagerShardConsistencyIT {
         }
 
         //now that we have finished deleting and shards are removed, shutdown
-        deleteExecutor.shutdownNow();
+        //deleteExecutor.shutdownNow();
+        //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
 
-        Thread.sleep( 3000 ); // sleep before the next test
     }
 
 
+
+    @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup is async
+    @Category(StressTest.class)
+    public void writeThousandsDeleteWriteAgain()
+        throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 2000 );
+
+
+        final Id sourceId = IdGenerator.createId( "sourceDeleteRewrite" );
+        final String deleteEdgeType = "testDeleteRewrite";
+
+        final List<Edge> edges = new ArrayList<>();
+
+        final EdgeGenerator generator = new EdgeGenerator() {
+
+
+            @Override
+            public Edge newEdge() {
+                Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDeleteRewrite" ) );
+
+                edges.add(edge);
+
+                return edge;
+            }
+
+
+            @Override
+            public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
+                return manager.loadEdgesFromSource(
+                    new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.<Edge>absent(), false ) );
+            }
+        };
+
+
+        final int numInjectors = 3;
+
+        /**
+         * create injectors.  This way all the caches are independent of one another.  This is the same as
+         * multiple nodes if there are multiple injectors
+         */
+        final List<Injector> injectors = createInjectors( numInjectors );
+
+
+        final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+        final long shardSize = graphFig.getShardSize();
+
+
+        //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
+        // power for writes
+        final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
+
+        final int numWorkersPerInjector = numProcessors / numInjectors;
+
+        final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
+
+
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
+
+        createDeleteExecutor( numWorkersPerInjector );
+
+
+        final AtomicLong writeCounter = new AtomicLong();
+
+
+        //min stop time the min delta + 1 cache cycle timeout
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+
+
+        logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
+            numInjectors );
+
+
+        final List<Future<Boolean>> futures = new ArrayList<>();
+
+
+        for ( Injector injector : injectors ) {
+            final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+
+            for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+                Future<Boolean> future =
+                    deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
+
+                futures.add( future );
+            }
+        }
+
+        /**
+         * Wait for all writes to complete
+         */
+        for ( Future<Boolean> future : futures ) {
+            future.get();
+        }
+
+        // now get all our shards
+        final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType );
+
+        final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
+
+
+        final long writeCount = writeCounter.get();
+        final Meter readMeter = registry.meter( "readThroughput-deleteTestRewrite" );
+
+
+        //check our shard state
+
+
+        final Iterator<ShardEntryGroup> existingShardGroups =
+            cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+        int shardCount = 0;
+
+        while ( existingShardGroups.hasNext() ) {
+            final ShardEntryGroup group = existingShardGroups.next();
+
+            shardCount++;
+
+            logger.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+        }
+
+
+        logger.info( "Found {} shard groups", shardCount );
+
+
+        //now mark and delete all the edges
+
+
+        final GraphManager manager = gmf.createEdgeManager( scope );
+
+        //sleep occasionally to stop pushing cassandra over
+
+        long count = Long.MAX_VALUE;
+
+        Thread.sleep(3000); // let's make sure everything is written
+
+        long totalDeleted = 0;
+
+        // now do the deletes
+        while(count != 0) {
+
+            logger.info("total deleted: {}", totalDeleted);
+            if(count != Long.MAX_VALUE) { // count starts with Long.MAX
+                logger.info("deleted {} entities, continuing until count is 0", count);
+            }
+            //take 1000 then sleep
+            count = generator.doSearch( manager ).take( 1000 )
+                .filter(markedEdge -> {
+
+                    // if it's already been marked let's filter, move on as async deleteEdge()
+                    if(markedEdge.isDeleted()){
+                        logger.info("Edge already marked, gm.deleteEdge() is Async, Edge: {}", markedEdge);
+                    }
+                    //return !markedEdge.isDeleted();
+                    return true;
+                })
+                .flatMap( edge -> manager.markEdge( edge ))
+                .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+
+            totalDeleted += count;
+            logger.info("Sleeping 250ms second because deleteEdge() is an async process");
+            Thread.sleep( 250 );
+        }
+
+
+        logger.info("Sleeping before starting the read");
+        Thread.sleep(6000); //  let the edge readers start
+
+        // loop with a reader until our shards are gone
+
+
+        GraphManager gm = gmf.createEdgeManager( scope );
+
+
+
+
+        //do a read to eventually trigger our group compaction. Take 2 pages of columns
+        long returnedEdgeCount = generator.doSearch( gm )
+
+            .doOnNext( edge -> readMeter.mark() )
+
+            .countLong().toBlocking().last();
+
+        logger.info( "Completed reading {} edges", returnedEdgeCount );
+
+        if ( 0 != returnedEdgeCount ) {
+
+            //logger.warn( "Unexpected edge count returned!!!  Expected {} but was {}", 0,
+             //   returnedEdgeCount );
+
+            fail("Unexpected edge count returned!!!  Expected 0 but was "+ returnedEdgeCount );
+        }
+
+        logger.info("Got expected read count of 0");
+
+
+        //we have to get it from the cache, because this will trigger the compaction process
+        final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+        ShardEntryGroup group = null;
+
+        while ( groups.hasNext() ) {
+
+            group = groups.next();
+            logger.info( "Shard size for group is {}", group.getReadShards() );
+            Collection<Shard> shards = group.getReadShards();
+
+            for(Shard shard: shards){
+                if(!shard.isDeleted()){
+                    shardCount++;
+                }
+            }
+        }
+
+
+        // we're done, 1 shard remains, we have a group, and it's our default shard
+        if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex()  ) {
+            logger.info( "All compactions complete," );
+
+        }
+
+
+        Thread.sleep( 2000 );
+
+
+        logger.info( "Re-Writing same edges", workerWriteLimit, numWorkersPerInjector,
+            numInjectors );
+
+
+        int edgeCount = 0;
+        if ( edges.size() > 0) {
+
+            for (Edge edge : edges) {
+
+                Edge returned = gm.writeEdge(edge).toBlocking().last();
+
+                assertNotNull("Returned has a version", returned.getTimestamp());
+
+                edgeCount++;
+
+                writeMeter.mark();
+
+                writeCounter.incrementAndGet();
+
+
+                if (edgeCount % 100 == 0) {
+                    logger.info("wrote: " + edgeCount);
+                }
+
+
+            }
+            logger.info("Re-wrote total: {}", edgeCount);
+        }
+
+        int retries = 2;
+        while ( retries > 0 ) {
+
+
+            //do a read to eventually trigger our group compaction. Take 2 pages of columns
+            returnedEdgeCount = generator.doSearch( gm )
+
+                .doOnNext( edge -> readMeter.mark() )
+
+                .countLong().toBlocking().last();
+
+            logger.info( "Completed reading {} edges", returnedEdgeCount );
+
+            if ( edgeCount != returnedEdgeCount ) {
+                logger.warn( "Unexpected edge count returned!!!  Expected {} but was {}", edgeCount,
+                    returnedEdgeCount );
+            }
+
+            retries--;
+
+            if( returnedEdgeCount == edgeCount ){
+                logger.info("Got expected read count of {}", edgeCount);
+                break;
+
+            }
+        }
+
+        //we have to get it from the cache, because this will trigger the compaction process
+        final Iterator<ShardEntryGroup> finalgroups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+        ShardEntryGroup finalgroup = null;
+
+        // reset the shard count
+        shardCount = 0;
+        while ( finalgroups.hasNext() ) {
+
+            finalgroup = finalgroups.next();
+
+            logger.info( "Shard size for group is {}", finalgroup.getReadShards() );
+
+            Collection<Shard> shards = finalgroup.getReadShards();
+
+            for(Shard shard: shards){
+
+                if(!shard.isDeleted()){
+                    shardCount++;
+                }
+            }
+
+        }
+
+
+        // we're done, 1 shard remains, we have a group, and it's our default shard
+        if ( shardCount > 1 ) {
+            logger.info( "All done. Final shard count: {}", shardCount );
+            assertEquals(1,1);
+
+        }else{
+            fail("There should be more than 1 shard");
+        }
+
+        //deleteExecutor.shutdownNow();
+        //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+
+    }
+
+
+
     private class Worker implements Callable<Boolean> {
         private final GraphManagerFactory factory;
         private final EdgeGenerator generator;
@@ -671,8 +1014,8 @@ public class GraphManagerShardConsistencyIT {
         private final AtomicLong writeCounter;
 
 
-        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
-                        final long minExecutionTime, final AtomicLong writeCounter ) {
+        private Worker(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
+                       final long minExecutionTime, final AtomicLong writeCounter) {
             this.factory = factory;
             this.generator = generator;
             this.writeLimit = writeLimit;
@@ -699,7 +1042,6 @@ public class GraphManagerShardConsistencyIT {
                 assertNotNull( "Returned has a version", returned.getTimestamp() );
 
 
-                writeMeter.mark();
 
                 writeCounter.incrementAndGet();
 
@@ -738,7 +1080,7 @@ public class GraphManagerShardConsistencyIT {
             GraphManager gm = factory.createEdgeManager( scope );
 
 
-            while ( true ) {
+            while ( !Thread.currentThread().isInterrupted() ) {
 
 
                 //do a read to eventually trigger our group compaction. Take 2 pages of columns
@@ -757,6 +1099,8 @@ public class GraphManagerShardConsistencyIT {
 
                 assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount );
             }
+
+            return 0L;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 00406c0..8a9dcfe 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -564,7 +564,7 @@ public class NodeShardAllocationTest {
 
 
         final Iterator<ShardEntryGroup> result =
-                approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+                approximation.getShards( scope, directedEdgeMeta );
 
 
         assertTrue( "Shards present", result.hasNext() );
@@ -663,7 +663,7 @@ public class NodeShardAllocationTest {
 
 
         final Iterator<ShardEntryGroup> result =
-                approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+                approximation.getShards( scope, directedEdgeMeta );
 
 
         ShardEntryGroup shardEntryGroup = result.next();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 3b717a4..ecf0091 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -102,7 +102,7 @@ public class NodeShardCacheTest {
         /**
          * Simulate returning no shards at all.
          */
-        when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) )
+        when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) )
 
                 //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
                 .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
@@ -191,7 +191,7 @@ public class NodeShardCacheTest {
         /**
          * Simulate returning no shards at all.
          */
-        when( allocation.getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
+        when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) )
 
                 //use "thenAnswer" so we always return the value, even if  it's invoked more than 1 time.
                 .thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {