You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/10/22 02:19:55 UTC
[2/4] usergrid git commit: Added tests for mocking
Added tests for mocking
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0286dcc6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0286dcc6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0286dcc6
Branch: refs/heads/USERGRID-1052
Commit: 0286dcc65c322c0aa606b760efc3cfe9c59f5ed6
Parents: 9fddd75
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 16:15:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 16:15:51 2015 -0600
----------------------------------------------------------------------
.../impl/shard/ShardEntryGroup.java | 11 +-
.../impl/shard/ShardGroupDeletion.java | 5 +
.../impl/shard/impl/ShardGroupDeletionImpl.java | 19 +-
.../shard/impl/ShardGroupDeletionImplTest.java | 300 +++++++++++++++++++
4 files changed, 325 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 11bf7a4..555d467 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -291,11 +291,20 @@ public class ShardEntryGroup {
* cache refresh, we can't compact yet.
*/
- && currentTime - delta > maxCreatedTime;
+ && !isNew( currentTime );
}
/**
+ * Return true if our current time - delta is newer than our maxCreatedtime
+ * @param currentTime
+ * @return
+ */
+ public boolean isNew(final long currentTime){
+ return currentTime - delta < maxCreatedTime;
+ }
+
+ /**
* Return true if this shard can be deleted AFTER all of the data in it has been moved
*/
public boolean canBeDeleted( final Shard shard ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 3d5a1ef..640e8bd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -56,6 +56,11 @@ public interface ShardGroupDeletion {
CONTAINS_EDGES,
/**
+ * Compaction pending, we can't delete it
+ */
+ COMPACTION_PENDING,
+
+ /**
* The shard is too new, and may not have been fully replicated, we can't delete it safely
*/
TOO_NEW,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index c51a021..9c97d77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
@@ -57,12 +58,15 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
private final ListeningExecutorService asyncTaskExecutor;
private final EdgeShardSerialization edgeShardSerialization;
+ private final TimeService timeService;
@Inject
public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
- final EdgeShardSerialization edgeShardSerialization ) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final TimeService timeService ) {
this.edgeShardSerialization = edgeShardSerialization;
+ this.timeService = timeService;
this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
}
@@ -125,17 +129,14 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
* Compaction is pending, we cannot check it
*/
if ( shardEntryGroup.isCompactionPending() ) {
- return DeleteResult.NOT_CHECKED;
+ return DeleteResult.COMPACTION_PENDING;
}
- /**
- * If any of the shards can't be deleted, then we can't delete it
- */
- for ( final Shard shard : shardEntryGroup.getReadShards() ) {
- if ( !shardEntryGroup.canBeDeleted( shard ) ) {
- return DeleteResult.TOO_NEW;
- }
+ final long currentTime = timeService.getCurrentTime();
+
+ if ( shardEntryGroup.isNew( currentTime ) ) {
+ return DeleteResult.TOO_NEW;
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
new file mode 100644
index 0000000..70c9b01
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.netflix.astyanax.MutationBatch;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ShardGroupDeletionImplTest {
+
+
+ protected AsyncTaskExecutor asyncTaskExecutor;
+ protected ListeningExecutorService listeningExecutorService;
+ private ApplicationScopeImpl scope;
+
+
+ @Before
+ public void setup() {
+
+
+ this.scope = new ApplicationScopeImpl( createId( "application" ) );
+ }
+
+
+ @After
+ public void shutDown() {
+ listeningExecutorService.shutdownNow();
+ }
+
+
+ @Test
+ public void shardCannotBeCompacted() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+ final Shard shard1 = new Shard( 1000, createTime, false );
+
+ //set a 1 delta for testing
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard1 );
+ group.addShard( shard0 );
+
+ assertTrue( "this should return true for our test to succeed", group.isCompactionPending() );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete with pending compaction", ShardGroupDeletion.DeleteResult.COMPACTION_PENDING,
+ result );
+ }
+
+
+ @Test
+ public void shardTooNew() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertTrue( "this should return true for our test to succeed", group.isNew( currentTime ) );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete within timeout period", ShardGroupDeletion.DeleteResult.TOO_NEW, result );
+ }
+
+
+ @Test
+ public void hasEdges() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime * 2;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final Iterator<MarkedEdge> notMarkedIterator = Collections.singleton(
+ ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+ .iterator();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, notMarkedIterator );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, result );
+
+ //now check when marked we also retain them
+
+ final Iterator<MarkedEdge> markedEdgeIterator = Collections.singleton(
+ ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+ .iterator();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> markedFuture =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, markedEdgeIterator );
+
+ final ShardGroupDeletion.DeleteResult markedResult = future.get();
+
+ assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, markedResult );
+ }
+
+
+ @Test
+ public void testDeletion() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime * 2;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+ //mock up returning a mutation
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+
+ when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta) )).thenReturn( mock(
+ MutationBatch.class) );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should delete", ShardGroupDeletion.DeleteResult.DELETED, result );
+ }
+
+
+
+ private DirectedEdgeMeta getDirectedEdgeMeta() {
+
+ final Id sourceId = createId( "source" );
+ final String edgeType = "test";
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+
+ return directedEdgeMeta;
+ }
+
+
+ private void initExecutor( final int numberThreads, final int queueLength ) {
+ listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+ .createTaskExecutor( "GraphTaskExecutor", numberThreads, queueLength,
+ TaskExecutorFactory.RejectionAction.ABORT ) );
+
+ asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+ when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService );
+ }
+}