You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2015/10/22 20:29:30 UTC

[01/15] usergrid git commit: Adds trace logging for debugging purposes to diagnose issue

Repository: usergrid
Updated Branches:
  refs/heads/2.1-release 1fe1d1a34 -> 0604247f6


Adds trace logging for debugging purposes to diagnose issue


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c417099d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c417099d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c417099d

Branch: refs/heads/2.1-release
Commit: c417099d6260163ad391c76162b673efb861bf3a
Parents: a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 10:19:28 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 10:19:28 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |  3 +-
 .../core/astyanax/MultiRowColumnIterator.java   | 46 +++++++-------------
 .../shard/impl/ShardGroupColumnIterator.java    | 11 +++++
 .../impl/shard/impl/ShardsColumnIterator.java   | 10 +++++
 4 files changed, 38 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d160aac..ccb6221 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -204,8 +204,9 @@ public class IndexServiceImpl implements IndexService {
 
         //If we get no search results, its possible that something was already deleted or
         //that it wasn't indexed yet. In either case we can't delete anything and return an empty observable..
-        if(crs.isEmpty())
+        if(crs.isEmpty()) {
             return Observable.empty();
+        }
 
         UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID();
         //not actually sure about the timestamp but ah well. works.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 667992c..c6c1a12 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -50,7 +50,7 @@ import com.netflix.astyanax.util.RangeBuilder;
  */
 public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
-    private static final Logger LOG = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+    private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
 
     private final int pageSize;
 
@@ -148,6 +148,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     public void advance() {
 
+
+        logger.trace( "Advancing multi row column iterator" );
+
         /**
          * If the edge is present, we need to being seeking from this
          */
@@ -156,7 +159,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
 
 
-        //TODO, finalize why this isn't working as expected
         final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
 
         final RangeBuilder rangeBuilder = new RangeBuilder();
@@ -174,6 +176,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         rangeBuilder.setLimit( selectSize );
 
+        logger.trace( "Executing cassandra query" );
 
         /**
          * Get our list of slices
@@ -242,7 +245,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         currentColumnIterator = mergedResults.iterator();
 
-        LOG.trace( "Finished parsing {} rows for results", rowKeys.size() );
+        logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
     }
 
 
@@ -275,6 +278,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
      */
     private List<T> singleRowResult( final Rows<R, C> result ) {
 
+        logger.trace( "Only a single row has columns.  Parsing directly" );
 
         for ( R key : result.getKeys() ) {
             final ColumnList<C> columnList = result.getRow( key ).getColumns();
@@ -307,6 +311,8 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
      */
     private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
 
+        logger.trace( "Multiple rows have columns.  Merging" );
+
 
         final List<T> mergedResults = new ArrayList<>(maxSize);
 
@@ -354,50 +360,28 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
                     continue;
                 }
 
+                logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+
                 mergedResults.add( insertIndex, returnedValue );
 
 
                 //prune the mergedResults
                 while ( mergedResults.size() > maxSize ) {
+
+                    logger.trace( "Trimming results to size {}", maxSize );
+
                     //just remove from our tail until the size falls to the correct value
                     mergedResults.remove(mergedResults.size()-1);
                 }
             }
 
-            LOG.trace( "Candidate result set size is {}", mergedResults.size() );
+            logger.trace( "Candidate result set size is {}", mergedResults.size() );
 
         }
         return mergedResults;
     }
 
 
-    /**
-     * Iterator wrapper that parses as it iterates for single row cases
-     */
-    private class SingleRowIterator implements Iterator<T> {
-
-        private Iterator<Column<C>> columnIterator;
-
-        private SingleRowIterator (final ColumnList<C> columns){
-            this.columnIterator = columns.iterator();
-        }
-        @Override
-        public boolean hasNext() {
-            return columnIterator.hasNext();
-        }
-
-
-        @Override
-        public T next() {
-            return columnParser.parseColumn( columnIterator.next() );
-        }
-
-
-        @Override
-        public void remove() {
-          throw new UnsupportedOperationException( "Unable to remove single row" );
-        }
-    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index 8779b96..a794a16 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -28,6 +28,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
@@ -51,6 +54,8 @@ import com.netflix.astyanax.util.RangeBuilder;
 public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
 
 
+    private static final Logger logger = LoggerFactory.getLogger( ShardGroupColumnIterator.class );
+
     private final Iterator<ShardEntryGroup> entryGroupIterator;
     private Iterator<T> elements;
 
@@ -108,21 +113,27 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
 
     public boolean advance(){
 
+        logger.trace( "Advancing from shard entry group iterator" );
+
         while(entryGroupIterator.hasNext()){
 
             final ShardEntryGroup group = entryGroupIterator.next();
 
+            logger.trace( "Shard entry group is {}.  Searching for edges in the shard", group );
+
             elements = getIterator( group.getReadShards() );
 
             /**
              * We're done, we have some columns to return
              */
             if(elements.hasNext()){
+                logger.trace( "Found edges in shard entry group {}", group );
                 return true;
             }
 
         }
 
+        logger.trace( "Completed iterating shard group iterator" );
 
         return false;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index e35107c..4411c25 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -6,6 +6,9 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
 import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
@@ -28,6 +31,9 @@ import com.netflix.astyanax.util.RangeBuilder;
  */
 public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
 
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardsColumnIterator.class );
+
     private final EdgeSearcher<R, C, T> searcher;
 
     private final MultiTennantColumnFamily<ScopedRowKey<R>, C> cf;
@@ -87,6 +93,8 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
      */
     private void startIterator() {
 
+        logger.trace( "Starting shards column iterator" );
+
 
         /**
          * If the edge is present, we need to being seeking from this
@@ -105,6 +113,8 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
          */
         final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys();
 
+        logger.trace( "Searching with row keys {}", rowKeys );
+
         currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
 
 


[06/15] usergrid git commit: Added tests for mocking

Posted by md...@apache.org.
Added tests for mocking


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0286dcc6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0286dcc6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0286dcc6

Branch: refs/heads/2.1-release
Commit: 0286dcc65c322c0aa606b760efc3cfe9c59f5ed6
Parents: 9fddd75
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 16:15:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 16:15:51 2015 -0600

----------------------------------------------------------------------
 .../impl/shard/ShardEntryGroup.java             |  11 +-
 .../impl/shard/ShardGroupDeletion.java          |   5 +
 .../impl/shard/impl/ShardGroupDeletionImpl.java |  19 +-
 .../shard/impl/ShardGroupDeletionImplTest.java  | 300 +++++++++++++++++++
 4 files changed, 325 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 11bf7a4..555d467 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -291,11 +291,20 @@ public class ShardEntryGroup {
                  * cache refresh, we can't compact yet.
                  */
 
-                && currentTime - delta > maxCreatedTime;
+                && !isNew( currentTime );
     }
 
 
     /**
+     * Return true if our current time - delta is newer than our maxCreatedtime
+     * @param currentTime
+     * @return
+     */
+    public boolean isNew(final long currentTime){
+        return currentTime - delta < maxCreatedTime;
+    }
+
+    /**
      * Return true if this shard can be deleted AFTER all of the data in it has been moved
      */
     public boolean canBeDeleted( final Shard shard ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 3d5a1ef..640e8bd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -56,6 +56,11 @@ public interface ShardGroupDeletion {
         CONTAINS_EDGES,
 
         /**
+         * Compaction pending, we can't delete it
+         */
+        COMPACTION_PENDING,
+
+        /**
          * The shard is too new, and may not have been fully replicated, we can't delete it safely
          */
         TOO_NEW,

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index c51a021..9c97d77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
@@ -57,12 +58,15 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
 
     private final ListeningExecutorService asyncTaskExecutor;
     private final EdgeShardSerialization edgeShardSerialization;
+    private final TimeService timeService;
 
 
     @Inject
     public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
-                                   final EdgeShardSerialization edgeShardSerialization ) {
+                                   final EdgeShardSerialization edgeShardSerialization,
+                                   final TimeService timeService ) {
         this.edgeShardSerialization = edgeShardSerialization;
+        this.timeService = timeService;
         this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
     }
 
@@ -125,17 +129,14 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
          * Compaction is pending, we cannot check it
          */
         if ( shardEntryGroup.isCompactionPending() ) {
-            return DeleteResult.NOT_CHECKED;
+            return DeleteResult.COMPACTION_PENDING;
         }
 
 
-        /**
-         * If any of the shards can't be deleted, then we can't delete it
-         */
-        for ( final Shard shard : shardEntryGroup.getReadShards() ) {
-            if ( !shardEntryGroup.canBeDeleted( shard ) ) {
-                return DeleteResult.TOO_NEW;
-            }
+        final long currentTime = timeService.getCurrentTime();
+
+        if ( shardEntryGroup.isNew( currentTime ) ) {
+            return DeleteResult.TOO_NEW;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
new file mode 100644
index 0000000..70c9b01
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.netflix.astyanax.MutationBatch;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ShardGroupDeletionImplTest {
+
+
+    protected AsyncTaskExecutor asyncTaskExecutor;
+    protected ListeningExecutorService listeningExecutorService;
+    private ApplicationScopeImpl scope;
+
+
+    @Before
+    public void setup() {
+
+
+        this.scope = new ApplicationScopeImpl( createId( "application" ) );
+    }
+
+
+    @After
+    public void shutDown() {
+        listeningExecutorService.shutdownNow();
+    }
+
+
+    @Test
+    public void shardCannotBeCompacted() throws ExecutionException, InterruptedException {
+
+        final long createTime = 10000;
+
+        final long currentTime = createTime;
+
+        final Shard shard0 = new Shard( 0, createTime, true );
+
+        final Shard shard1 = new Shard( 1000, createTime, false );
+
+        //set a 1 delta for testing
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard1 );
+        group.addShard( shard0 );
+
+        assertTrue( "this should return true for our test to succeed", group.isCompactionPending() );
+
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should not delete with pending compaction", ShardGroupDeletion.DeleteResult.COMPACTION_PENDING,
+            result );
+    }
+
+
+    @Test
+    public void shardTooNew() throws ExecutionException, InterruptedException {
+
+        final long createTime = 10000;
+
+        final long currentTime = createTime;
+
+        final Shard shard0 = new Shard( 0, createTime, true );
+
+
+        ////set a delta for way in the future
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard0 );
+
+        assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+        assertTrue( "this should return true for our test to succeed", group.isNew( currentTime ) );
+
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should not delete within timeout period", ShardGroupDeletion.DeleteResult.TOO_NEW, result );
+    }
+
+
+    @Test
+    public void hasEdges() throws ExecutionException, InterruptedException {
+
+        final long createTime = 10000;
+
+        final long currentTime = createTime * 2;
+
+        final Shard shard0 = new Shard( 0, createTime, true );
+
+
+        ////set a delta for way in the future
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard0 );
+
+        assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+        assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+        final Iterator<MarkedEdge> notMarkedIterator = Collections.singleton(
+            ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+                                                                  .iterator();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, notMarkedIterator );
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, result );
+
+        //now check when marked we also retain them
+
+        final Iterator<MarkedEdge> markedEdgeIterator = Collections.singleton(
+            ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+                                                                   .iterator();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> markedFuture =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, markedEdgeIterator );
+
+        final ShardGroupDeletion.DeleteResult markedResult = future.get();
+
+        assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, markedResult );
+    }
+
+
+    @Test
+    public void testDeletion() throws ExecutionException, InterruptedException {
+
+        final long createTime = 10000;
+
+        final long currentTime = createTime * 2;
+
+        final Shard shard0 = new Shard( 0, createTime, true );
+
+
+        ////set a delta for way in the future
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard0 );
+
+        assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+        assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+        //mock up returning a mutation
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+
+        when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta) )).thenReturn( mock(
+            MutationBatch.class) );
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should  delete", ShardGroupDeletion.DeleteResult.DELETED, result );
+    }
+
+
+
+    private DirectedEdgeMeta getDirectedEdgeMeta() {
+
+        final Id sourceId = createId( "source" );
+        final String edgeType = "test";
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+
+        return directedEdgeMeta;
+    }
+
+
+    private void initExecutor( final int numberThreads, final int queueLength ) {
+        listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+            .createTaskExecutor( "GraphTaskExecutor", numberThreads, queueLength,
+                TaskExecutorFactory.RejectionAction.ABORT ) );
+
+        asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+        when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService );
+    }
+}


[09/15] usergrid git commit: Fixes reporter issue and marks test as ignored. When run regularly it kills Cassandra. I will unignore once merged with the new sharding logic.

Posted by md...@apache.org.
Fixes reporter issue and marks test as ignored. When run regularly it kills Cassandra.  I will unignore once merged with the new sharding logic.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f3693c85
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f3693c85
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f3693c85

Branch: refs/heads/2.1-release
Commit: f3693c8570e018d924add2a4851a2db7f805ccb0
Parents: 10d0a07
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:48:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:48:16 2015 -0600

----------------------------------------------------------------------
 .../graph/GraphManagerShardConsistencyIT.java          | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f3693c85/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index b82c28a..9ee7b95 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,9 +87,7 @@ public class GraphManagerShardConsistencyIT {
 
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
 
-    private static final Slf4jReporter reporter =
-        Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
-                     .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+    private Slf4jReporter reporter;
 
 
     protected ApplicationScope scope;
@@ -133,6 +132,11 @@ public class GraphManagerShardConsistencyIT {
         scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
 
 
+        reporter =
+                Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+                             .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+
         reporter.start( 10, TimeUnit.SECONDS );
     }
 
@@ -383,7 +387,8 @@ public class GraphManagerShardConsistencyIT {
     }
 
 
-    @Test
+    @Test(timeout=120000)
+    @Ignore("This works, but is occasionally causing cassandra to fall over.  Unignore when merged with new shard strategy")
     public void writeThousandsDelete()
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
 


[10/15] usergrid git commit: Add endpoint for setting apigee mobile config properties on the application.

Posted by md...@apache.org.
Add endpoint for setting apigee mobile config properties on the application.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/88d11790
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/88d11790
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/88d11790

Branch: refs/heads/2.1-release
Commit: 88d11790c2d077140a71347f97ad5bc75346af2c
Parents: f3693c8
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 21:16:50 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 21:16:50 2015 -0700

----------------------------------------------------------------------
 .../rest/applications/ApplicationResource.java  | 50 ++++++++++++++++++++
 1 file changed, 50 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/88d11790/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index c521d4f..4eaf09a 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.rest.applications;
 
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.sun.jersey.api.json.JSONWithPadding;
 import com.sun.jersey.api.view.Viewable;
 import org.apache.amber.oauth2.common.error.OAuthError;
@@ -47,6 +48,7 @@ import org.apache.usergrid.rest.exceptions.NotFoundExceptionMapper;
 import org.apache.usergrid.rest.exceptions.RedirectionException;
 import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
 import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
 import org.apache.usergrid.security.oauth.AccessInfo;
 import org.apache.usergrid.security.oauth.ClientCredentialsInfo;
 import org.slf4j.Logger;
@@ -601,4 +603,52 @@ public class ApplicationResource extends ServiceResource {
         }
         return new JSONWithPadding( value, callback );
     }
+
+    // Specifically require superuser access as this is setting app properties directly (only way to currently do this
+    // with Apigee's apigeeMobileConfig
+    @RequireSystemAccess
+    @POST
+    @Path("apm/apigeeMobileConfig")
+    @Consumes(APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public JSONWithPadding setAPMConfig( @Context UriInfo ui,
+                                         @QueryParam("callback") @DefaultValue("callback") String callback,
+                                         Map<String, Object> json) throws Exception {
+
+        if(json == null || json.size() < 1){
+            logger.error("Param {} cannot be null for POST apm/apigeeMobileConfig", APIGEE_MOBILE_APM_CONFIG_JSON_KEY);
+            throw new IllegalArgumentException("Request body cannot be empty and must include apigeeMobileConfig params");
+        }
+
+        final String requestAppUUID = (String) json.get("applicationUUID");
+        if(!requestAppUUID.equalsIgnoreCase(applicationId.toString())){
+            logger.error("Provided application UUID {} does not match actual application UUID {}",
+                requestAppUUID,
+                applicationId.toString());
+            throw new IllegalArgumentException(
+                String.format("Provided application UUID %s does not match actual application UUID %s",
+                requestAppUUID,
+                applicationId.toString())
+            );
+        }
+
+        final String apmConfig = new ObjectMapper().writeValueAsString(json);
+        if(logger.isDebugEnabled()){
+            logger.debug("Received request to set apigeeMobileConfig properties with: {}", apmConfig);
+        }
+
+
+        EntityManager em = emf.getEntityManager( applicationId );
+        em.setProperty(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
+            APIGEE_MOBILE_APM_CONFIG_JSON_KEY,
+            apmConfig
+        );
+
+        logger.info("Successfully set apigeeMobileConfig properties with: {}", apmConfig);
+
+        return new JSONWithPadding(apmConfig, callback);
+
+    }
+
+
 }


[02/15] usergrid git commit: USERGRID-1056 - Move the filtering of older entity versions on index deletes to Usergrid and not in the Elasticsearch query.

Posted by md...@apache.org.
USERGRID-1056 - Move the filtering of older entity versions on index deletes to Usergrid and not in the Elasticsearch query.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2c1147e4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2c1147e4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2c1147e4

Branch: refs/heads/2.1-release
Commit: 2c1147e49f76702dcf03ecaef6ece1f30306aa5f
Parents: c417099
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:07:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:07:18 2015 -0700

----------------------------------------------------------------------
 .../index/impl/EsEntityIndexImpl.java           | 45 +++++++++++++++-----
 1 file changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/2c1147e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index f6ebce2..9be591e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -460,7 +460,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
             while(true){
                 //add search result hits to some sort of running tally of hits.
-                candidates = aggregateScrollResults( candidates, searchResponse );
+                candidates = aggregateScrollResults( candidates, searchResponse, null );
 
                 SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
                     .getScrollBuilder( searchResponse.getScrollId() )
@@ -510,11 +510,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
             IndexingUtils.entityId(entityId));
 
-        FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte(markedVersion);
-
-        FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter, entityVersionFilter);
-
-        srb.setPostFilter(andFilter);
+        srb.setPostFilter(entityIdFilter);
 
 
 
@@ -536,7 +532,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
             while(true){
                 //add search result hits to some sort of running tally of hits.
-                candidates = aggregateScrollResults( candidates, searchResponse );
+                candidates = aggregateScrollResults( candidates, searchResponse, markedVersion);
 
                 SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
                     .getScrollBuilder( searchResponse.getScrollId() )
@@ -649,8 +645,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
         return candidateResults;
     }
 
-    private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
-                                                          final SearchResponse searchResponse ){
+    private List<CandidateResult> aggregateScrollResults(List<CandidateResult> candidates,
+                                                         final SearchResponse searchResponse, final UUID markedVersion){
 
         final SearchHits searchHits = searchResponse.getHits();
         final SearchHit[] hits = searchHits.getHits();
@@ -659,7 +655,36 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
 
             final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
 
-            candidates.add( candidateResult );
+            // if comparing against the latestVersion, make sure we only add the candidateResult if it's
+            // older than or equal to the latest marked version
+            if (markedVersion != null) {
+
+                if(candidateResult.getVersion().timestamp() <= markedVersion.timestamp()){
+
+                    if(logger.isDebugEnabled()){
+                        logger.debug("Candidate version {} is <= provided entity version {} for entityId {}",
+                            candidateResult.getVersion(),
+                            markedVersion,
+                            candidateResult.getId()
+                            );
+                    }
+
+                    candidates.add(candidateResult);
+
+                }else{
+                    if(logger.isDebugEnabled()){
+                        logger.debug("Candidate version {} is > provided entity version {} for entityId {}. Not" +
+                                "adding to candidate results",
+                            candidateResult.getVersion(),
+                            markedVersion,
+                            candidateResult.getId()
+                        );
+                    }
+                }
+
+            }else{
+                candidates.add(candidateResult);
+            }
         }
 
         logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );


[04/15] usergrid git commit: Update default properties that are known to work in a single and multi-region configuration.

Posted by md...@apache.org.
Update default properties that are known to work in a single and multi-region configuration.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4935377c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4935377c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4935377c

Branch: refs/heads/2.1-release
Commit: 4935377c8cb5be0c11ae39814a4c2d5cceb604d8
Parents: 2c1147e
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:27:24 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:27:24 2015 -0700

----------------------------------------------------------------------
 .../apache/usergrid/corepersistence/index/IndexProcessorFig.java   | 2 +-
 .../apache/usergrid/persistence/core/astyanax/CassandraFig.java    | 2 +-
 .../main/java/org/apache/usergrid/persistence/queue/QueueFig.java  | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..5871e58 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,7 +65,7 @@ public interface IndexProcessorFig extends GuicyFig {
      * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
      * If the timeout occurs, the messages will become visible again for re-processing.
      */
-    @Default( "5000" ) // 5 seconds
+    @Default( "30000" ) // 30 seconds
     @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
     int getIndexQueueVisibilityTimeout();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 79c198f..c82a683 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -71,7 +71,7 @@ public interface CassandraFig extends GuicyFig {
     int getConnections();
 
     @Key( "cassandra.timeout" )
-    @Default( "5000" )
+    @Default( "10000" )
     int getTimeout();
 
     @Key("cassandra.discovery")

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 6f3a3dc..ad38f6d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -59,7 +59,7 @@ public interface QueueFig extends GuicyFig {
      * The maximum number of attempts to attempt to deliver before failing into the DLQ
      */
     @Key( "usergrid.queue.deliveryLimit" )
-    @Default("20")
+    @Default("10")
     String getQueueDeliveryLimit();
 
     @Key("usergrid.use.default.queue")


[13/15] usergrid git commit: Address issues found during code review

Posted by md...@apache.org.
Address issues found during code review


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b1e1ed18
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b1e1ed18
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b1e1ed18

Branch: refs/heads/2.1-release
Commit: b1e1ed181d88f6a8e4b1b7bac76c99f936f66c8c
Parents: c9bf87c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 22 12:13:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 22 12:13:00 2015 -0600

----------------------------------------------------------------------
 .../graph/GraphManagerShardConsistencyIT.java            |  5 +++--
 .../impl/shard/impl/ShardGroupDeletionImplTest.java      | 11 ++++++++---
 2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1e1ed18/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 9ee7b95..c1917bb 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -55,6 +55,7 @@ import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -595,7 +596,7 @@ public class GraphManagerShardConsistencyIT {
 
             while ( groups.hasNext() ) {
 
-                group = groups.next();;
+                group = groups.next();
 
                 log.info( "Shard size for group is {}", group.getReadShards() );
 
@@ -604,7 +605,7 @@ public class GraphManagerShardConsistencyIT {
 
 
             //we're done, 1 shard remains, we have a group, and it's our default shard
-            if ( shardCount == 1 && group != null &&  group.getMinShard().getShardIndex() == 0  ) {
+            if ( shardCount == 1 && group != null &&  group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex()  ) {
                 log.info( "All compactions complete," );
 
                 break;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1e1ed18/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
index 9a3e407..f943e9b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -43,6 +43,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.same;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 
@@ -252,7 +254,7 @@ public class ShardGroupDeletionImplTest {
         //now check when marked we also retain them
 
         final Iterator<MarkedEdge> markedEdgeIterator = Collections.singleton(
-            ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+            ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, true ) )
                                                                    .iterator();
 
 
@@ -266,7 +268,7 @@ public class ShardGroupDeletionImplTest {
 
 
     @Test
-    public void testDeletion() throws ExecutionException, InterruptedException {
+    public void testDeletion() throws ExecutionException, InterruptedException, ConnectionException {
 
         final long createTime = 10000;
 
@@ -290,9 +292,10 @@ public class ShardGroupDeletionImplTest {
         //mock up returning a mutation
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
+        final MutationBatch batch = mock( MutationBatch.class );
 
         when( edgeShardSerialization.removeShardMeta( same( scope ), same( shard0 ), same( directedEdgeMeta ) ) )
-            .thenReturn( mock( MutationBatch.class ) );
+            .thenReturn( batch );
 
         final TimeService timeService = mock( TimeService.class );
 
@@ -310,6 +313,8 @@ public class ShardGroupDeletionImplTest {
         final ShardGroupDeletion.DeleteResult result = future.get();
 
         assertEquals( "should  delete", ShardGroupDeletion.DeleteResult.DELETED, result );
+
+        verify(batch).execute();
     }
 
 


[12/15] usergrid git commit: Change new endpoint to only require org access.

Posted by md...@apache.org.
Change new endpoint to only require org access.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b83a864
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b83a864
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b83a864

Branch: refs/heads/2.1-release
Commit: 1b83a864f97e0361266ebbe21e17c3a146bf46b4
Parents: c9bf87c
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Oct 22 10:50:35 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Oct 22 10:50:35 2015 -0700

----------------------------------------------------------------------
 .../org/apache/usergrid/rest/applications/ApplicationResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b83a864/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 4eaf09a..162565f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -606,7 +606,7 @@ public class ApplicationResource extends ServiceResource {
 
     // Specifically require superuser access as this is setting app properties directly (only way to currently do this
     // with Apigee's apigeeMobileConfig
-    @RequireSystemAccess
+    @RequireOrganizationAccess
     @POST
     @Path("apm/apigeeMobileConfig")
     @Consumes(APPLICATION_JSON)


[05/15] usergrid git commit: Going back to 5 seconds default visibility timeout.

Posted by md...@apache.org.
Going back to 5 seconds default visibility timeout.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5b01fc94
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5b01fc94
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5b01fc94

Branch: refs/heads/2.1-release
Commit: 5b01fc94c690bc03e1281915acb68780d15a910c
Parents: 4935377
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:32:17 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:32:17 2015 -0700

----------------------------------------------------------------------
 .../apache/usergrid/corepersistence/index/IndexProcessorFig.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/5b01fc94/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 5871e58..7d022e5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,7 +65,7 @@ public interface IndexProcessorFig extends GuicyFig {
      * Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
      * If the timeout occurs, the messages will become visible again for re-processing.
      */
-    @Default( "30000" ) // 30 seconds
+    @Default( "5000" ) // 5 seconds
     @Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
     int getIndexQueueVisibilityTimeout();
 


[03/15] usergrid git commit: Added an audit for shards when they are empty. They are only ever allocated when either a split + compaction needs to happen. Therefore if they don't have a compaction pending, they should never be empty.

Posted by md...@apache.org.
Added an audit for shards when they are empty.  They are only ever allocated when either a split + compaction needs to happen.  Therefore if they don't have a compaction pending, they should never be empty.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9fddd754
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9fddd754
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9fddd754

Branch: refs/heads/2.1-release
Commit: 9fddd7542572f00233dabff1890187faf89f9e86
Parents: c417099
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 15:21:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 15:21:23 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphFig.java    |   2 +-
 .../persistence/graph/guice/GraphModule.java    |  11 +-
 .../impl/EdgeSerializationImpl.java             |  17 +-
 .../impl/shard/AsyncTaskExecutor.java           |  34 +++
 .../impl/shard/ShardGroupCompaction.java        |   4 -
 .../impl/shard/ShardGroupDeletion.java          |  68 ++++++
 .../impl/shard/impl/AsyncTaskExecutorImpl.java  |  53 +++++
 .../shard/impl/ShardGroupColumnIterator.java    |  61 +++---
 .../shard/impl/ShardGroupCompactionImpl.java    |  10 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java | 206 +++++++++++++++++++
 .../impl/shard/ShardGroupCompactionTest.java    |  30 ++-
 11 files changed, 451 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 28aab40..f0df7ff 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -133,7 +133,7 @@ public interface GraphFig extends GuicyFig {
     int getShardCacheRefreshWorkerCount();
 
 
-    @Default( "10" )
+    @Default( "20" )
     @Key( SHARD_AUDIT_WORKERS )
     int getShardAuditWorkerCount();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6280c7c..d2476eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -54,7 +54,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservab
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.EdgeDataMigrationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigration;
 import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
-import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -62,14 +62,17 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardA
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -116,6 +119,12 @@ public abstract class GraphModule extends AbstractModule {
         bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
 
         /**
+         * Binding for task tracker
+         */
+        bind( AsyncTaskExecutor.class ).to( AsyncTaskExecutorImpl.class );
+        bind( ShardGroupDeletion.class ).to( ShardGroupDeletionImpl.class );
+
+        /**
          * Bind our strategies based on their internal annotations.
          */
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4c1ae79..9e25946 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
@@ -67,6 +68,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
     protected final EdgeColumnFamilies edgeColumnFamilies;
     protected final ShardedEdgeSerialization shardedEdgeSerialization;
     protected final TimeService timeService;
+    protected final ShardGroupDeletion shardGroupDeletion;
 
 
     @Inject
@@ -74,7 +76,8 @@ public class EdgeSerializationImpl implements EdgeSerialization {
                                   final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
                                   final EdgeColumnFamilies edgeColumnFamilies,
                                   final ShardedEdgeSerialization shardedEdgeSerialization,
-                                  final TimeService timeService ) {
+                                  final TimeService timeService, final ShardGroupDeletion shardGroupDeletion ) {
+
 
 
         checkNotNull( keyspace, "keyspace required" );
@@ -83,6 +86,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
         checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
         checkNotNull( timeService, "timeService required" );
+        checkNotNull( shardGroupDeletion, "shardGroupDeletion require");
 
 
         this.keyspace = keyspace;
@@ -92,6 +96,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.timeService = timeService;
+        this.shardGroupDeletion = shardGroupDeletion;
     }
 
 
@@ -293,7 +298,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         //now create a result iterator with our iterator of read shards
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, versionMetaData, shardGroupDeletion, readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
@@ -319,7 +324,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
@@ -346,7 +351,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization
@@ -372,7 +377,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
 
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
@@ -400,7 +405,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
         final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
 
 
-        return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+        return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
             @Override
             protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
                 return shardedEdgeSerialization

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
new file mode 100644
index 0000000..f4dc350
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+
+/**
+ * An interface for returning a singleton task executor
+ */
+public interface AsyncTaskExecutor {
+
+    /**
+     * Get the executor service for executing graph tasks
+     * @return
+     */
+    ListeningExecutorService getExecutorService();
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..0ff9fb1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,10 +22,6 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.util.concurrent.ListenableFuture;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
new file mode 100644
index 0000000..3d5a1ef
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+public interface ShardGroupDeletion {
+
+
+    /**
+     * Audit the shard entry group with the given NEW instance of the shardGroupColumnIterator. Returns the future with
+     * the outcome of the deletion task
+     *
+     * @param shardEntryGroup The group to evaluate
+     * @param edgeIterator The new instance of the edge iterator
+     *
+     * @return The delete result with the state of the delete operation
+     */
+    ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope applicationScope,
+                                                     final DirectedEdgeMeta directedEdgeMeta,
+                                                     final ShardEntryGroup shardEntryGroup,
+                                                     final Iterator<MarkedEdge> edgeIterator );
+
+
+    enum DeleteResult {
+        /**
+         * Returned if the shard was delete
+         */
+        DELETED,
+
+        /**
+         * The shard contains edges and cannot be deleted
+         */
+        CONTAINS_EDGES,
+
+        /**
+         * The shard is too new, and may not have been fully replicated, we can't delete it safely
+         */
+        TOO_NEW,
+
+        /**
+         * Our capacity was saturated, we didnt' check the shard
+         */
+        NOT_CHECKED;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
new file mode 100644
index 0000000..c534a22
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Implementation for a single task execution system within graph
+ */
+@Singleton
+public class AsyncTaskExecutorImpl implements AsyncTaskExecutor {
+
+
+    private final ListeningExecutorService taskExecutor;
+
+
+    @Inject
+    public AsyncTaskExecutorImpl(final GraphFig graphFig){
+        this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
+                    .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+                        graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+    }
+
+
+    @Override
+    public ListeningExecutorService getExecutorService() {
+        return this.taskExecutor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index a794a16..72b617f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -22,45 +22,52 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.NoSuchElementException;
 
+import javax.annotation.Nullable;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 
 
 /**
- *
  * Iterator to keep iterating over multiple shard groups to stream results
  *
  * @param <T> The parsed return type
  */
-public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
 
 
     private static final Logger logger = LoggerFactory.getLogger( ShardGroupColumnIterator.class );
 
+    private final ApplicationScope applicationScope;
+    private final DirectedEdgeMeta directedEdgeMeta;
+    private final ShardGroupDeletion shardGroupDeletion;
     private final Iterator<ShardEntryGroup> entryGroupIterator;
-    private Iterator<T> elements;
 
 
-    public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator ){
+    private Iterator<MarkedEdge> elements;
+
+
+    public ShardGroupColumnIterator( final ApplicationScope applicationScope, final DirectedEdgeMeta directedEdgeMeta,
+                                     final ShardGroupDeletion shardGroupDeletion,
+                                     final Iterator<ShardEntryGroup> entryGroupIterator ) {
+        this.applicationScope = applicationScope;
+        this.directedEdgeMeta = directedEdgeMeta;
+        this.shardGroupDeletion = shardGroupDeletion;
         this.entryGroupIterator = entryGroupIterator;
     }
 
@@ -69,16 +76,16 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
     public boolean hasNext() {
 
 
-        if(elements == null){
+        if ( elements == null ) {
             return advance();
         }
 
-        if(elements.hasNext()){
+        if ( elements.hasNext() ) {
             return true;
         }
 
         //we've exhausted our shard groups and we don't have a next, we can't continue
-        if(!entryGroupIterator.hasNext()){
+        if ( !entryGroupIterator.hasNext() ) {
             return false;
         }
 
@@ -88,7 +95,7 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
 
 
     @Override
-    public T next() {
+    public MarkedEdge next() {
         if ( !hasNext() ) {
             throw new NoSuchElementException( "There are no more rows or columns left to advance" );
         }
@@ -105,17 +112,17 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
 
     /**
      * Get an iterator for the shard entry group
+     *
      * @param readShards the read shards to use
-     * @return
      */
-    protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+    protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards );
 
 
-    public boolean advance(){
+    public boolean advance() {
 
         logger.trace( "Advancing from shard entry group iterator" );
 
-        while(entryGroupIterator.hasNext()){
+        while ( entryGroupIterator.hasNext() ) {
 
             final ShardEntryGroup group = entryGroupIterator.next();
 
@@ -126,16 +133,22 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
             /**
              * We're done, we have some columns to return
              */
-            if(elements.hasNext()){
+            if ( elements.hasNext() ) {
                 logger.trace( "Found edges in shard entry group {}", group );
                 return true;
             }
+            else {
+                logger.trace( "Our shard is empty, we need to perform an audit on shard group {}", group );
 
+                //fire and forget if we miss it here, we'll get it next read
+                shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIterator( group.getReadShards() ) );
+
+
+            }
         }
 
         logger.trace( "Completed iterating shard group iterator" );
 
         return false;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c9e389f..21f2d72 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -45,6 +45,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -108,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                                      final NodeShardAllocation nodeShardAllocation,
                                      final ShardedEdgeSerialization shardedEdgeSerialization,
                                      final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
-                                     final EdgeShardSerialization edgeShardSerialization) {
+                                     final EdgeShardSerialization edgeShardSerialization,
+                                     final AsyncTaskExecutor asyncTaskExecutor) {
 
         this.timeService = timeService;
         this.countAudits = new AtomicLong();
@@ -124,9 +126,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         this.shardAuditTaskTracker = new ShardAuditTaskTracker();
 
 
-        this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
-            .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
-                graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+        this.taskExecutor = asyncTaskExecutor.getExecutorService();
     }
 
 
@@ -305,7 +305,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         }
 
         countAudits.getAndIncrement();
-        
+
         if(LOG.isDebugEnabled()) {
             LOG.debug("Auditing shard group. count is {} ", countAudits.get());
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
new file mode 100644
index 0000000..c51a021
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Implementation of the shard group deletion task
+ */
+@Singleton
+public class ShardGroupDeletionImpl implements ShardGroupDeletion {
+
+    private static final Logger logger = LoggerFactory.getLogger( ShardGroupDeletionImpl.class );
+
+    private final ListeningExecutorService asyncTaskExecutor;
+    private final EdgeShardSerialization edgeShardSerialization;
+
+
+    @Inject
+    public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
+                                   final EdgeShardSerialization edgeShardSerialization ) {
+        this.edgeShardSerialization = edgeShardSerialization;
+        this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
+    }
+
+
+    @Override
+    public ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope applicationScope,
+                                                            final DirectedEdgeMeta directedEdgeMeta,
+                                                            final ShardEntryGroup shardEntryGroup,
+                                                            final Iterator<MarkedEdge> edgeIterator ) {
+
+
+        /**
+         * Try and submit.  During back pressure, we may not be able to submit, that's ok.  Better to drop than to
+         * hose the system
+         */
+        final ListenableFuture<DeleteResult> future;
+
+        try {
+            future = asyncTaskExecutor
+                .submit( new ShardDeleteTask( applicationScope, directedEdgeMeta, shardEntryGroup, edgeIterator ) );
+        }
+        catch ( RejectedExecutionException ree ) {
+
+            //ignore, if this happens we don't care, we're saturated, we can check later
+            logger.error( "Rejected shard delete check for group {}", edgeIterator );
+
+            return Futures.immediateFuture( DeleteResult.NOT_CHECKED );
+        }
+
+
+        /**
+         * Log our success or failures for debugging purposes
+         */
+        Futures.addCallback( future, new FutureCallback<DeleteResult>() {
+            @Override
+            public void onSuccess( @Nullable final ShardGroupDeletion.DeleteResult result ) {
+                logger.trace( "Successfully completed delete of task {}", result );
+            }
+
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                logger.error( "Unable to perform shard delete audit.  Exception is ", t );
+            }
+        } );
+
+        return future;
+    }
+
+
+    /**
+     * Execute the logic for the delete
+     */
+    private DeleteResult maybeDeleteShardInternal( final ApplicationScope applicationScope,
+                                                   final DirectedEdgeMeta directedEdgeMeta,
+                                                   final ShardEntryGroup shardEntryGroup,
+                                                   final Iterator<MarkedEdge> edgeIterator ) {
+
+        /**
+         * Compaction is pending, we cannot check it
+         */
+        if ( shardEntryGroup.isCompactionPending() ) {
+            return DeleteResult.NOT_CHECKED;
+        }
+
+
+        /**
+         * If any of the shards can't be deleted, then we can't delete it
+         */
+        for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+            if ( !shardEntryGroup.canBeDeleted( shard ) ) {
+                return DeleteResult.TOO_NEW;
+            }
+        }
+
+        /**
+         * We have edges, and therefore cannot delete them
+         */
+        if ( edgeIterator.hasNext() ) {
+            return DeleteResult.CONTAINS_EDGES;
+        }
+
+
+        //now we can proceed based on the shard meta state and we don't have any edge
+
+        MutationBatch rollup = null;
+
+        for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+            final MutationBatch shardRemovalMutation =
+                edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+
+            if ( rollup == null ) {
+                rollup = shardRemovalMutation;
+            }
+
+            else {
+                rollup.mergeShallow( shardRemovalMutation );
+            }
+        }
+
+
+        Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+
+        try {
+            rollup.execute();
+        }
+        catch ( ConnectionException e ) {
+            logger.error( "Unable to execute shard deletion", e );
+            throw new RuntimeException( "Unable to execute shard deletion", e );
+        }
+
+        return DeleteResult.DELETED;
+    }
+
+
+    /**
+     * Glue for executing the task
+     */
+    private final class ShardDeleteTask implements Callable<DeleteResult> {
+
+        private final ApplicationScope applicationScope;
+        private final DirectedEdgeMeta directedEdgeMeta;
+        private final ShardEntryGroup shardEntryGroup;
+        private final Iterator<MarkedEdge> edgeIterator;
+
+
+        private ShardDeleteTask( final ApplicationScope applicationScope, final DirectedEdgeMeta directedEdgeMeta,
+                                 final ShardEntryGroup shardEntryGroup, final Iterator<MarkedEdge> edgeIterator ) {
+            this.applicationScope = applicationScope;
+            this.directedEdgeMeta = directedEdgeMeta;
+            this.shardEntryGroup = shardEntryGroup;
+            this.edgeIterator = edgeIterator;
+        }
+
+
+        @Override
+        public DeleteResult call() throws Exception {
+            return maybeDeleteShardInternal( applicationScope, directedEdgeMeta, shardEntryGroup, edgeIterator );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 7d4b7f6..65f19ff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -26,16 +26,20 @@ import java.util.Collection;
 
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.core.util.IdGenerator;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.netflix.astyanax.Keyspace;
 
 import static org.junit.Assert.assertEquals;
@@ -47,6 +51,8 @@ import static org.mockito.Mockito.when;
 public class ShardGroupCompactionTest {
 
     protected GraphFig graphFig;
+    protected AsyncTaskExecutor asyncTaskExecutor;
+    protected ListeningExecutorService listeningExecutorService;
     protected ApplicationScope scope;
 
 
@@ -58,9 +64,25 @@ public class ShardGroupCompactionTest {
 
         when( graphFig.getShardAuditWorkerQueueSize() ).thenReturn( 1000 );
 
+
+
+        listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+            .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+                graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+
+        asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+        when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService );
+
+
         this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) );
     }
 
+    @After
+    public void shutDown(){
+        listeningExecutorService.shutdownNow();
+    }
+
 
     @Test
     public void shouldNotCompact() {
@@ -77,6 +99,7 @@ public class ShardGroupCompactionTest {
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
+
         final long delta = 10000;
 
         final long createTime = 20000;
@@ -92,9 +115,8 @@ public class ShardGroupCompactionTest {
         when( timeService.getCurrentTime() ).thenReturn( timeNow );
 
         ShardGroupCompactionImpl compaction =
-                new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
-                        edgeColumnFamilies, keyspace, edgeShardSerialization );
-
+            new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
+                edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor );
 
         DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
 
@@ -104,7 +126,7 @@ public class ShardGroupCompactionTest {
         }
         catch ( Throwable t ) {
             assertEquals( "Correct error message returned", "Compaction cannot be run yet.  Ignoring compaction.",
-                    t.getMessage() );
+                t.getMessage() );
         }
     }
 


[08/15] usergrid git commit: Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052

Posted by md...@apache.org.
Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10d0a070
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10d0a070
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10d0a070

Branch: refs/heads/2.1-release
Commit: 10d0a070758c2f1e499ff43c6efab866f8469d1e
Parents: beaed6f 5b01fc9
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:19:48 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:19:48 2015 -0600

----------------------------------------------------------------------
 .../persistence/core/astyanax/CassandraFig.java |  2 +-
 .../index/impl/EsEntityIndexImpl.java           | 45 +++++++++++++++-----
 .../usergrid/persistence/queue/QueueFig.java    |  2 +-
 3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[07/15] usergrid git commit: Fixes bug with deleting minimum shard. This is no longer allowed.

Posted by md...@apache.org.
Fixes bug with deleting minimum shard.  This is no longer allowed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/beaed6ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/beaed6ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/beaed6ff

Branch: refs/heads/2.1-release
Commit: beaed6ffe1239ad404823cc4864b299e4113e135
Parents: 0286dcc
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:19:40 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:19:40 2015 -0600

----------------------------------------------------------------------
 .../graph/serialization/impl/shard/Shard.java   |  15 +
 .../impl/shard/ShardEntryGroup.java             |   4 +-
 .../impl/shard/ShardGroupDeletion.java          |   7 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  81 ++--
 .../impl/shard/impl/ShardGroupDeletionImpl.java |  30 +-
 .../graph/GraphManagerShardConsistencyIT.java   | 370 ++++++++++++++-----
 .../impl/shard/ShardEntryGroupTest.java         |  14 +
 .../shard/impl/ShardGroupDeletionImplTest.java  |  50 ++-
 8 files changed, 419 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 9ca6cbe..472e0a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -21,6 +21,12 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 public class Shard implements Comparable<Shard> {
 
+
+    /**
+     * The minimum shard a shardIndex can possibly be set to
+     */
+    public static final Shard MIN_SHARD = new Shard(0, 0, true);
+
     private final long shardIndex;
     private final long createdTime;
     private final boolean compacted;
@@ -58,6 +64,15 @@ public class Shard implements Comparable<Shard> {
 
 
     /**
+     * Returns true if this is the minimum shard
+     * @return
+     */
+    public boolean isMinShard(){
+        return shardIndex == MIN_SHARD.shardIndex;
+    }
+
+
+    /**
      * Compare the shards based on the timestamp first, then the created time second
      */
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 555d467..543f1fb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -301,7 +301,7 @@ public class ShardEntryGroup {
      * @return
      */
     public boolean isNew(final long currentTime){
-        return currentTime - delta < maxCreatedTime;
+        return currentTime - delta <= maxCreatedTime;
     }
 
     /**
@@ -316,7 +316,7 @@ public class ShardEntryGroup {
         final Shard compactionTarget = getCompactionTarget();
 
 
-        return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+        return !shard.isCompacted() && !shard.isMinShard() &&  ( compactionTarget != null && compactionTarget.getShardIndex() != shard
                 .getShardIndex() );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 640e8bd..37aa9e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -68,6 +68,11 @@ public interface ShardGroupDeletion {
         /**
          * Our capacity was saturated, we didnt' check the shard
          */
-        NOT_CHECKED;
+        NOT_CHECKED,
+
+        /**
+         * We checked everything, but we didn't perform any operations.  Happens when only the min shard remains
+         */
+        NO_OP;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 7a7fb3f..b0875af 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -60,8 +60,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
 
-    private static final Shard MIN_SHARD = new Shard( 0, 0, true );
-
     private final EdgeShardSerialization edgeShardSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
     private final ShardedEdgeSerialization shardedEdgeSerialization;
@@ -99,29 +97,33 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         //its a new node, it doesn't need to check cassandra, it won't exist
         if ( isNewNode( directedEdgeMeta ) ) {
-            existingShards = Collections.singleton( MIN_SHARD ).iterator();
+            existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
         }
 
         else {
             existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
-        }
 
-        if ( existingShards == null || !existingShards.hasNext() ) {
+            /**
+             * We didn't get anything out of cassandra, so we need to create the minumum shard
+             */
+            if ( existingShards == null || !existingShards.hasNext() ) {
 
 
-            final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
-            try {
-                batch.execute();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to connect to casandra", e );
-            }
+                final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to casandra", e );
+                }
 
-            existingShards = Collections.singleton( MIN_SHARD ).iterator();
+                existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
+            }
         }
 
+
         return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
-                directedEdgeMeta );
+            directedEdgeMeta );
     }
 
 
@@ -173,17 +175,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             return false;
         }
 
-        if(LOG.isDebugEnabled()){
-            LOG.debug("Count of {} has exceeded shard config of {} will begin compacting", count, shardSize);
+        if ( LOG.isDebugEnabled() ) {
+            LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
         }
 
         /**
-         * We want to allocate a new shard as close to the max value as possible.  This way if we're filling up a shard rapidly, we split it near the head of the values.
-         * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node.
+         * We want to allocate a new shard as close to the max value as possible.  This way if we're filling up a
+         * shard rapidly, we split it near the head of the values.
+         * Further checks to this group will result in more splits, similar to creating a tree type structure and
+         * splitting each node.
          *
-         * This means that the lower shard can be re-split later if it is still too large.  We do the division to truncate
-         * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the
-         * lower bound and moved forward.  Doing this will stop the current shard from expanding and avoid a point where we cannot
+         * This means that the lower shard can be re-split later if it is still too large.  We do the division to
+         * truncate
+         * to a split point < what our current max is that would be approximately be our pivot ultimately if we split
+         * from the
+         * lower bound and moved forward.  Doing this will stop the current shard from expanding and avoid a point
+         * where we cannot
          * ultimately compact to the correct shard size.
          */
 
@@ -193,13 +200,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
-                        SearchByEdgeType.Order.ASCENDING );
+            .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+                SearchByEdgeType.Order.ASCENDING );
 
 
         if ( !edges.hasNext() ) {
-            LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
-                    + "but no max value could be found in that row", directedEdgeMeta );
+            LOG.warn(
+                "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
+                directedEdgeMeta );
             return false;
         }
 
@@ -214,12 +222,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
 
-        for(long i = 1;  edges.hasNext(); i++){
+        for ( long i = 1; edges.hasNext(); i++ ) {
             //we hit a pivot shard, set it since it could be the last one we encounter
-            if(i% shardSize == 0){
+            if ( i % shardSize == 0 ) {
                 marked = edges.next();
             }
-            else{
+            else {
                 edges.next();
             }
         }
@@ -228,7 +236,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
          */
-        if(marked == null){
+        if ( marked == null ) {
             LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
             return false;
         }
@@ -262,8 +270,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         if ( minDelta < minimumAllowed ) {
-            throw new GraphRuntimeException( String.format(
-                    "You must configure the property %s to be >= 2 x %s.  Otherwise you risk losing data",
+            throw new GraphRuntimeException( String
+                .format( "You must configure the property %s to be >= 2 x %s.  Otherwise you risk losing data",
                     GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
         }
 
@@ -279,8 +287,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         //TODO: TN this is broken....
-        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
-        final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units
+        // correct
+        final long timeoutDelta = graphFig.getShardCacheTimeout();
 
         final long timeNow = timeService.getCurrentTime();
 
@@ -289,16 +298,16 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
 
             //short circuit
-            if(!isNew || node.getId().getUuid().version() > 2){
+            if ( !isNew || node.getId().getUuid().version() > 2 ) {
                 return false;
             }
 
-            final long uuidTime =   TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+            final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() );
 
             final long newExpirationTimeout = uuidTime + timeoutDelta;
 
             //our expiration is after our current time, treat it as new
-            isNew = isNew && newExpirationTimeout >  timeNow;
+            isNew = isNew && newExpirationTimeout > timeNow;
         }
 
         return isNew;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 9c97d77..38a7834 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -149,9 +149,18 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
 
         //now we can proceed based on the shard meta state and we don't have any edge
 
+        DeleteResult result = DeleteResult.NO_OP;
+
         MutationBatch rollup = null;
 
         for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+
+            //skip the min shard
+            if(shard.isMinShard()){
+                continue;
+            }
+
+
             final MutationBatch shardRemovalMutation =
                 edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
 
@@ -162,20 +171,23 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
             else {
                 rollup.mergeShallow( shardRemovalMutation );
             }
+
+            result = DeleteResult.DELETED;
         }
 
 
-        Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+       if( rollup != null) {
 
-        try {
-            rollup.execute();
-        }
-        catch ( ConnectionException e ) {
-            logger.error( "Unable to execute shard deletion", e );
-            throw new RuntimeException( "Unable to execute shard deletion", e );
-        }
+           try {
+               rollup.execute();
+           }
+           catch ( ConnectionException e ) {
+               logger.error( "Unable to execute shard deletion", e );
+               throw new RuntimeException( "Unable to execute shard deletion", e );
+           }
+       }
 
-        return DeleteResult.DELETED;
+        return result;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2fb08a4..b82c28a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -71,10 +71,8 @@ import com.google.inject.Injector;
 import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
-import rx.functions.Action1;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -89,8 +87,8 @@ public class GraphManagerShardConsistencyIT {
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
 
     private static final Slf4jReporter reporter =
-            Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
-                         .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+        Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+                     .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
 
 
     protected ApplicationScope scope;
@@ -102,6 +100,8 @@ public class GraphManagerShardConsistencyIT {
 
     protected Object originalShardDelta;
 
+    protected ListeningExecutorService executor;
+
 
     @Before
     public void setupOrg() {
@@ -141,12 +141,19 @@ public class GraphManagerShardConsistencyIT {
     public void tearDown() {
         reporter.stop();
         reporter.report();
+
+        executor.shutdownNow();
+    }
+
+
+    private void createExecutor( final int size ) {
+        executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
     }
 
 
     @Test
     public void writeThousandsSingleSource()
-            throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+        throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
 
         final Id sourceId = IdGenerator.createId( "source" );
         final String edgeType = "test";
@@ -166,13 +173,13 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
-                        new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE,
-                                SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent() ) );
+                    new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.<Edge>absent() ) );
             }
         };
 
 
-//        final int numInjectors = 2;
+        //        final int numInjectors = 2;
         final int numInjectors = 1;
 
         /**
@@ -200,16 +207,13 @@ public class GraphManagerShardConsistencyIT {
         final long numberOfEdges = shardSize * 4;
 
 
-        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector;
-
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
 
 
-        final long expectedShardCount = numberOfEdges/shardSize;
+        final long expectedShardCount = numberOfEdges / shardSize;
 
 
-        final ListeningExecutorService
-                executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( numWorkersPerInjector ) );
-
+        createExecutor( numWorkersPerInjector );
 
         final AtomicLong writeCounter = new AtomicLong();
 
@@ -219,24 +223,22 @@ public class GraphManagerShardConsistencyIT {
 
 
         log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
-                numInjectors );
+            numInjectors );
 
 
         final List<Future<Boolean>> futures = new ArrayList<>();
 
 
-
         for ( Injector injector : injectors ) {
             final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
 
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
-                Future<Boolean> future = executor
-                        .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+                Future<Boolean> future =
+                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
 
                 futures.add( future );
             }
-
         }
 
         /**
@@ -262,11 +264,10 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Start reading continuously while we migrate data to ensure our view is always correct
          */
-        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
-
-        final List<Throwable> failures = new ArrayList<>(  );
-
+        final ListenableFuture<Long> future =
+            executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
 
+        final List<Throwable> failures = new ArrayList<>();
 
 
         //add the future
@@ -287,37 +288,31 @@ public class GraphManagerShardConsistencyIT {
         } );
 
 
-
         int compactedCount;
 
 
-
-
-
         //now start our readers
 
         while ( true ) {
 
-            if(!failures.isEmpty()){
+            if ( !failures.isEmpty() ) {
 
-                StringBuilder builder = new StringBuilder(  );
+                StringBuilder builder = new StringBuilder();
 
-                builder.append("Read runner failed!\n");
+                builder.append( "Read runner failed!\n" );
 
-                for(Throwable t: failures){
+                for ( Throwable t : failures ) {
                     builder.append( "Exception is: " );
-                    ByteArrayOutputStream output = new ByteArrayOutputStream(  );
+                    ByteArrayOutputStream output = new ByteArrayOutputStream();
 
                     t.printStackTrace( new PrintWriter( output ) );
 
-                    builder.append( output.toString( "UTF-8" ));
+                    builder.append( output.toString( "UTF-8" ) );
                     builder.append( "\n\n" );
-
                 }
 
 
-
-                fail(builder.toString());
+                fail( builder.toString() );
             }
 
             //reset our count.  Ultimately we'll have 4 groups once our compaction completes
@@ -344,25 +339,21 @@ public class GraphManagerShardConsistencyIT {
             if ( compactedCount >= expectedShardCount ) {
                 log.info( "All compactions complete, sleeping" );
 
-//                final Object mutex = new Object();
-//
-//                synchronized ( mutex ){
-//
-//                    mutex.wait();
-//                }
+                //                final Object mutex = new Object();
+                //
+                //                synchronized ( mutex ){
+                //
+                //                    mutex.wait();
+                //                }
 
                 break;
-
             }
 
 
             Thread.sleep( 2000 );
-
         }
 
         executor.shutdownNow();
-
-
     }
 
 
@@ -392,7 +383,236 @@ public class GraphManagerShardConsistencyIT {
     }
 
 
+    @Test
+    public void writeThousandsDelete()
+        throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+
+        final Id sourceId = IdGenerator.createId( "source" );
+        final String edgeType = "test";
+
+        final EdgeGenerator generator = new EdgeGenerator() {
+
+
+            @Override
+            public Edge newEdge() {
+                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) );
+
+
+                return edge;
+            }
+
+
+            @Override
+            public Observable<Edge> doSearch( final GraphManager manager ) {
+                return manager.loadEdgesFromSource(
+                    new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.<Edge>absent(), false ) );
+            }
+        };
+
+
+        //        final int numInjectors = 2;
+        final int numInjectors = 1;
+
+        /**
+         * create 3 injectors.  This way all the caches are independent of one another.  This is the same as
+         * multiple nodes
+         */
+        final List<Injector> injectors = createInjectors( numInjectors );
+
+
+        final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+        final long shardSize = graphFig.getShardSize();
+
+
+        //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
+        // power for writes
+        final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
+
+        final int numWorkersPerInjector = numProcessors / numInjectors;
+
+
+        /**
+         * Do 4x shard size so we should have approximately 4 shards
+         */
+        final long numberOfEdges = shardSize * 4;
+
+
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
+
+        createExecutor( numWorkersPerInjector );
+
+
+        final AtomicLong writeCounter = new AtomicLong();
+
 
+        //min stop time the min delta + 1 cache cycle timeout
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+
+
+        log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
+            numInjectors );
+
+
+        final List<Future<Boolean>> futures = new ArrayList<>();
+
+
+        for ( Injector injector : injectors ) {
+            final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+
+            for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+                Future<Boolean> future =
+                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+
+                futures.add( future );
+            }
+        }
+
+        /**
+         * Wait for all writes to complete
+         */
+        for ( Future<Boolean> future : futures ) {
+            future.get();
+        }
+
+        //now get all our shards
+        final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+
+        //now submit the readers.
+        final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
+
+
+        final long writeCount = writeCounter.get();
+        final Meter readMeter = registry.meter( "readThroughput" );
+
+
+        //check our shard state
+
+
+        final Iterator<ShardEntryGroup> existingShardGroups =
+            cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+        int shardCount = 0;
+
+        while ( existingShardGroups.hasNext() ) {
+            final ShardEntryGroup group = existingShardGroups.next();
+
+            shardCount++;
+
+            log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+        }
+
+
+        log.info( "found {} shard groups", shardCount );
+
+
+        //now mark and delete all the edges
+
+
+        final GraphManager manager = gmf.createEdgeManager( scope );
+
+        //sleep occasionally to stop pushing cassandra over
+
+        long count = Long.MAX_VALUE;
+
+        while(count != 0) {
+            //take 10000 then sleep
+            count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap( edge -> manager.markEdge( edge ) )
+                     .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+
+            Thread.sleep( 500 );
+        }
+
+
+        //now loop until with a reader until our shards are gone
+
+
+        /**
+         * Start reading continuously while we migrate data to ensure our view is always correct
+         */
+        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
+
+        final List<Throwable> failures = new ArrayList<>();
+
+
+        //add the future
+        Futures.addCallback( future, new FutureCallback<Long>() {
+
+            @Override
+            public void onSuccess( @Nullable final Long result ) {
+                log.info( "Successfully ran the read, re-running" );
+                executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+            }
+
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                failures.add( t );
+                log.error( "Failed test!", t );
+            }
+        } );
+
+
+        //now start our readers
+
+        while ( true ) {
+
+            if ( !failures.isEmpty() ) {
+
+                StringBuilder builder = new StringBuilder();
+
+                builder.append( "Read runner failed!\n" );
+
+                for ( Throwable t : failures ) {
+                    builder.append( "Exception is: " );
+                    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+                    t.printStackTrace( new PrintWriter( output ) );
+
+                    builder.append( output.toString( "UTF-8" ) );
+                    builder.append( "\n\n" );
+                }
+
+
+                fail( builder.toString() );
+            }
+
+            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
+            shardCount = 0;
+
+            //we have to get it from the cache, because this will trigger the compaction process
+            final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+            ShardEntryGroup group = null;
+
+            while ( groups.hasNext() ) {
+
+                group = groups.next();;
+
+                log.info( "Shard size for group is {}", group.getReadShards() );
+
+                shardCount += group.getReadShards().size();
+            }
+
+
+            //we're done, 1 shard remains, we have a group, and it's our default shard
+            if ( shardCount == 1 && group != null &&  group.getMinShard().getShardIndex() == 0  ) {
+                log.info( "All compactions complete," );
+
+                break;
+            }
+
+
+            Thread.sleep( 2000 );
+        }
+
+        //now that we have finished expanding s
+
+        executor.shutdownNow();
+    }
 
 
     private class Worker implements Callable<Boolean> {
@@ -436,7 +656,6 @@ public class GraphManagerShardConsistencyIT {
                 writeCounter.incrementAndGet();
 
 
-
                 if ( i % 1000 == 0 ) {
                     log.info( "   Wrote: " + i );
                 }
@@ -454,8 +673,9 @@ public class GraphManagerShardConsistencyIT {
         private final long writeCount;
         private final Meter readMeter;
 
+
         private ReadWorker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount,
-                            final Meter readMeter) {
+                            final Meter readMeter ) {
             this.factory = factory;
             this.generator = generator;
             this.writeCount = writeCount;
@@ -467,72 +687,28 @@ public class GraphManagerShardConsistencyIT {
         public Long call() throws Exception {
 
 
-
-
             GraphManager gm = factory.createEdgeManager( scope );
 
 
-
-            while(true) {
-
-//                final long[] count = {0};
-//                final long[] duplicate = {0};
-//                final HashSet<Edge >  seen = new HashSet<>((int)writeCount);
+            while ( true ) {
 
 
                 //do a read to eventually trigger our group compaction. Take 2 pages of columns
                 final long returnedEdgeCount = generator.doSearch( gm )
 
-                                                        .doOnNext( new Action1<Edge>() {
-
-
-                                                            //                    private Edge last;
-
-
-                                                            @Override
-                                                            public void call( final Edge edge ) {
-                                                                readMeter.mark();
-
-                                                                //                        count[0]++;
-                                                                //
-                                                                //                        /**
-                                                                //                         * Added this check as part
-                                                                // of the read
-                                                                //                         */
-                                                                //                        if ( last != null && last
-                                                                // .equals(edge) ) {
-                                                                //                            fail( String.format( "Expected edges to be in order, however last was %s and current is %s",
-                                                                //                                    last, edge ) );
-                                                                //                        }
-                                                                //
-                                                                //                        last = edge;
-                                                                //
-                                                                //                        if( seen.contains( edge ) ){
-                                                                //                            fail( String.format("Returned an edge that was already seen! Edge was %s, last edge was %s", edge, last) );
-                                                                //                            duplicate[0]++;
-                                                                //                        }
-                                                                //
-                                                                //                        seen.add( edge );
-
-                                                            }
-                                                        } )
+                                                        .doOnNext( edge -> readMeter.mark() )
 
                                                         .countLong().toBlocking().last();
 
-
-//                if(returnedEdgeCount != count[0]-duplicate[0]){
-//                    log.warn( "Missing entries from the initial put" );
-//                }
-
                 log.info( "Completed reading {} edges", returnedEdgeCount );
 
-                if(writeCount != returnedEdgeCount){
-                    log.warn( "Unexpected edge count returned!!!  Expected {} but was {}", writeCount, returnedEdgeCount );
+                if ( writeCount != returnedEdgeCount ) {
+                    log.warn( "Unexpected edge count returned!!!  Expected {} but was {}", writeCount,
+                        returnedEdgeCount );
                 }
 
                 assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount );
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index 9289340..30634e2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -392,6 +392,20 @@ public class ShardEntryGroupTest {
 
         assertFalse( "Shard added", result );
     }
+
+
+    @Test
+    public void minShardNotDeleted() {
+        Shard minShard = Shard.MIN_SHARD;
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1 );
+
+        shardEntryGroup.addShard( minShard );
+
+        assertTrue( minShard.isMinShard() );
+
+        assertFalse( shardEntryGroup.canBeDeleted( minShard ) );
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
index 70c9b01..9a3e407 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -120,6 +120,46 @@ public class ShardGroupDeletionImplTest {
 
 
     @Test
+    public void shardIsMinShard() throws ExecutionException, InterruptedException {
+
+
+        final long currentTime = 1000;
+
+        final Shard shard0 = Shard.MIN_SHARD;
+
+
+        //set a 1 delta for testing
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard0 );
+
+        assertTrue( "this should return false for our test to succeed", shard0.isMinShard() );
+
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should not delete min shard", ShardGroupDeletion.DeleteResult.NO_OP, result );
+    }
+
+
+    @Test
     public void shardTooNew() throws ExecutionException, InterruptedException {
 
         final long createTime = 10000;
@@ -232,7 +272,7 @@ public class ShardGroupDeletionImplTest {
 
         final long currentTime = createTime * 2;
 
-        final Shard shard0 = new Shard( 0, createTime, true );
+        final Shard shard0 = new Shard( 1000, createTime, true );
 
 
         ////set a delta for way in the future
@@ -245,15 +285,14 @@ public class ShardGroupDeletionImplTest {
         assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
 
 
-
         final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
 
         //mock up returning a mutation
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
 
-        when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta) )).thenReturn( mock(
-            MutationBatch.class) );
+        when( edgeShardSerialization.removeShardMeta( same( scope ), same( shard0 ), same( directedEdgeMeta ) ) )
+            .thenReturn( mock( MutationBatch.class ) );
 
         final TimeService timeService = mock( TimeService.class );
 
@@ -265,8 +304,6 @@ public class ShardGroupDeletionImplTest {
             new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
 
 
-
-
         final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
             shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
 
@@ -276,7 +313,6 @@ public class ShardGroupDeletionImplTest {
     }
 
 
-
     private DirectedEdgeMeta getDirectedEdgeMeta() {
 
         final Id sourceId = createId( "source" );


[14/15] usergrid git commit: Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052

Posted by md...@apache.org.
Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/48161a1a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/48161a1a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/48161a1a

Branch: refs/heads/2.1-release
Commit: 48161a1ac9bfbf3e8c3aefa9586fc104ff0ec664
Parents: b1e1ed1 1b83a86
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 22 12:13:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 22 12:13:08 2015 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/rest/applications/ApplicationResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------



[11/15] usergrid git commit: Fix issues with integrating with Apigee's mobile analytics.

Posted by md...@apache.org.
Fix issues with integrating with Apigee's mobile analytics.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c9bf87c6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c9bf87c6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c9bf87c6

Branch: refs/heads/2.1-release
Commit: c9bf87c68defd1cb84bd8b65220b1994f0e8b17e
Parents: 88d1179
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Oct 22 10:20:42 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Oct 22 10:20:42 2015 -0700

----------------------------------------------------------------------
 .../usergrid/management/cassandra/ManagementServiceImpl.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/c9bf87c6/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 03b798c..70d74fc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -1610,7 +1610,8 @@ public class ManagementServiceImpl implements ManagementService {
     @Override
     public ApplicationInfo createApplication( UUID organizationId, String applicationName ) throws Exception {
 
-        return createApplication( organizationId, applicationName,null, null );
+        // DO NOT CHANGE THIS AS SOME EXTERNAL CLASSES MAY RELY ON THIS BEHAVIOR WHEN EXTENDING
+        return createApplication( organizationId, applicationName, null );
     }
 
 


[15/15] usergrid git commit: Merge commit 'refs/pull/407/head' of apache.github.com:apache/usergrid into 2.1-release

Posted by md...@apache.org.
Merge commit 'refs/pull/407/head' of apache.github.com:apache/usergrid into 2.1-release


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0604247f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0604247f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0604247f

Branch: refs/heads/2.1-release
Commit: 0604247f690db09b1a25feef9532cb5e58929437
Parents: 1fe1d1a 48161a1
Author: Mike Dunker <md...@apigee.com>
Authored: Thu Oct 22 11:27:59 2015 -0700
Committer: Mike Dunker <md...@apigee.com>
Committed: Thu Oct 22 11:27:59 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/index/IndexServiceImpl.java |   3 +-
 .../persistence/core/astyanax/CassandraFig.java |   2 +-
 .../core/astyanax/MultiRowColumnIterator.java   |  46 +--
 .../usergrid/persistence/graph/GraphFig.java    |   2 +-
 .../persistence/graph/guice/GraphModule.java    |  11 +-
 .../impl/EdgeSerializationImpl.java             |  17 +-
 .../impl/shard/AsyncTaskExecutor.java           |  34 ++
 .../graph/serialization/impl/shard/Shard.java   |  15 +
 .../impl/shard/ShardEntryGroup.java             |  13 +-
 .../impl/shard/ShardGroupCompaction.java        |   4 -
 .../impl/shard/ShardGroupDeletion.java          |  78 ++++
 .../impl/shard/impl/AsyncTaskExecutorImpl.java  |  53 +++
 .../shard/impl/NodeShardAllocationImpl.java     |  81 ++--
 .../shard/impl/ShardGroupColumnIterator.java    |  72 ++--
 .../shard/impl/ShardGroupCompactionImpl.java    |  10 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java | 219 +++++++++++
 .../impl/shard/impl/ShardsColumnIterator.java   |  10 +
 .../graph/GraphManagerShardConsistencyIT.java   | 378 ++++++++++++++-----
 .../impl/shard/ShardEntryGroupTest.java         |  14 +
 .../impl/shard/ShardGroupCompactionTest.java    |  30 +-
 .../shard/impl/ShardGroupDeletionImplTest.java  | 341 +++++++++++++++++
 .../index/impl/EsEntityIndexImpl.java           |  45 ++-
 .../usergrid/persistence/queue/QueueFig.java    |   2 +-
 .../rest/applications/ApplicationResource.java  |  50 +++
 .../cassandra/ManagementServiceImpl.java        |   3 +-
 25 files changed, 1307 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/0604247f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------