You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by md...@apache.org on 2015/10/22 20:29:30 UTC
[01/15] usergrid git commit: Adds trace logging for debugging
purposes to diagnose issue
Repository: usergrid
Updated Branches:
refs/heads/2.1-release 1fe1d1a34 -> 0604247f6
Adds trace logging for debugging purposes to diagnose issue
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c417099d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c417099d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c417099d
Branch: refs/heads/2.1-release
Commit: c417099d6260163ad391c76162b673efb861bf3a
Parents: a09485a
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 10:19:28 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 10:19:28 2015 -0600
----------------------------------------------------------------------
.../corepersistence/index/IndexServiceImpl.java | 3 +-
.../core/astyanax/MultiRowColumnIterator.java | 46 +++++++-------------
.../shard/impl/ShardGroupColumnIterator.java | 11 +++++
.../impl/shard/impl/ShardsColumnIterator.java | 10 +++++
4 files changed, 38 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
index d160aac..ccb6221 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceImpl.java
@@ -204,8 +204,9 @@ public class IndexServiceImpl implements IndexService {
//If we get no search results, its possible that something was already deleted or
//that it wasn't indexed yet. In either case we can't delete anything and return an empty observable..
- if(crs.isEmpty())
+ if(crs.isEmpty()) {
return Observable.empty();
+ }
UUID timeUUID = UUIDUtils.isTimeBased(entityId.getUuid()) ? entityId.getUuid() : UUIDUtils.newTimeUUID();
//not actually sure about the timestamp but ah well. works.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 667992c..c6c1a12 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -50,7 +50,7 @@ import com.netflix.astyanax.util.RangeBuilder;
*/
public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
- private static final Logger LOG = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+ private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
private final int pageSize;
@@ -148,6 +148,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
public void advance() {
+
+ logger.trace( "Advancing multi row column iterator" );
+
/**
* If the edge is present, we need to being seeking from this
*/
@@ -156,7 +159,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
- //TODO, finalize why this isn't working as expected
final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
final RangeBuilder rangeBuilder = new RangeBuilder();
@@ -174,6 +176,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
rangeBuilder.setLimit( selectSize );
+ logger.trace( "Executing cassandra query" );
/**
* Get our list of slices
@@ -242,7 +245,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
currentColumnIterator = mergedResults.iterator();
- LOG.trace( "Finished parsing {} rows for results", rowKeys.size() );
+ logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
}
@@ -275,6 +278,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
*/
private List<T> singleRowResult( final Rows<R, C> result ) {
+ logger.trace( "Only a single row has columns. Parsing directly" );
for ( R key : result.getKeys() ) {
final ColumnList<C> columnList = result.getRow( key ).getColumns();
@@ -307,6 +311,8 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
*/
private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
+ logger.trace( "Multiple rows have columns. Merging" );
+
final List<T> mergedResults = new ArrayList<>(maxSize);
@@ -354,50 +360,28 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
continue;
}
+ logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+
mergedResults.add( insertIndex, returnedValue );
//prune the mergedResults
while ( mergedResults.size() > maxSize ) {
+
+ logger.trace( "Trimming results to size {}", maxSize );
+
//just remove from our tail until the size falls to the correct value
mergedResults.remove(mergedResults.size()-1);
}
}
- LOG.trace( "Candidate result set size is {}", mergedResults.size() );
+ logger.trace( "Candidate result set size is {}", mergedResults.size() );
}
return mergedResults;
}
- /**
- * Iterator wrapper that parses as it iterates for single row cases
- */
- private class SingleRowIterator implements Iterator<T> {
-
- private Iterator<Column<C>> columnIterator;
-
- private SingleRowIterator (final ColumnList<C> columns){
- this.columnIterator = columns.iterator();
- }
- @Override
- public boolean hasNext() {
- return columnIterator.hasNext();
- }
-
-
- @Override
- public T next() {
- return columnParser.parseColumn( columnIterator.next() );
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Unable to remove single row" );
- }
- }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index 8779b96..a794a16 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -28,6 +28,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
@@ -51,6 +54,8 @@ import com.netflix.astyanax.util.RangeBuilder;
public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+ private static final Logger logger = LoggerFactory.getLogger( ShardGroupColumnIterator.class );
+
private final Iterator<ShardEntryGroup> entryGroupIterator;
private Iterator<T> elements;
@@ -108,21 +113,27 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
public boolean advance(){
+ logger.trace( "Advancing from shard entry group iterator" );
+
while(entryGroupIterator.hasNext()){
final ShardEntryGroup group = entryGroupIterator.next();
+ logger.trace( "Shard entry group is {}. Searching for edges in the shard", group );
+
elements = getIterator( group.getReadShards() );
/**
* We're done, we have some columns to return
*/
if(elements.hasNext()){
+ logger.trace( "Found edges in shard entry group {}", group );
return true;
}
}
+ logger.trace( "Completed iterating shard group iterator" );
return false;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c417099d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index e35107c..4411c25 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -6,6 +6,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
@@ -28,6 +31,9 @@ import com.netflix.astyanax.util.RangeBuilder;
*/
public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardsColumnIterator.class );
+
private final EdgeSearcher<R, C, T> searcher;
private final MultiTennantColumnFamily<ScopedRowKey<R>, C> cf;
@@ -87,6 +93,8 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
*/
private void startIterator() {
+ logger.trace( "Starting shards column iterator" );
+
/**
* If the edge is present, we need to being seeking from this
@@ -105,6 +113,8 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
*/
final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys();
+ logger.trace( "Searching with row keys {}", rowKeys );
+
currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
[06/15] usergrid git commit: Added tests for mocking
Posted by md...@apache.org.
Added tests for mocking
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0286dcc6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0286dcc6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0286dcc6
Branch: refs/heads/2.1-release
Commit: 0286dcc65c322c0aa606b760efc3cfe9c59f5ed6
Parents: 9fddd75
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 16:15:51 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 16:15:51 2015 -0600
----------------------------------------------------------------------
.../impl/shard/ShardEntryGroup.java | 11 +-
.../impl/shard/ShardGroupDeletion.java | 5 +
.../impl/shard/impl/ShardGroupDeletionImpl.java | 19 +-
.../shard/impl/ShardGroupDeletionImplTest.java | 300 +++++++++++++++++++
4 files changed, 325 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 11bf7a4..555d467 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -291,11 +291,20 @@ public class ShardEntryGroup {
* cache refresh, we can't compact yet.
*/
- && currentTime - delta > maxCreatedTime;
+ && !isNew( currentTime );
}
/**
+ * Return true if our current time - delta is newer than our maxCreatedtime
+ * @param currentTime
+ * @return
+ */
+ public boolean isNew(final long currentTime){
+ return currentTime - delta < maxCreatedTime;
+ }
+
+ /**
* Return true if this shard can be deleted AFTER all of the data in it has been moved
*/
public boolean canBeDeleted( final Shard shard ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 3d5a1ef..640e8bd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -56,6 +56,11 @@ public interface ShardGroupDeletion {
CONTAINS_EDGES,
/**
+ * Compaction pending, we can't delete it
+ */
+ COMPACTION_PENDING,
+
+ /**
* The shard is too new, and may not have been fully replicated, we can't delete it safely
*/
TOO_NEW,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index c51a021..9c97d77 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
@@ -57,12 +58,15 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
private final ListeningExecutorService asyncTaskExecutor;
private final EdgeShardSerialization edgeShardSerialization;
+ private final TimeService timeService;
@Inject
public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
- final EdgeShardSerialization edgeShardSerialization ) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final TimeService timeService ) {
this.edgeShardSerialization = edgeShardSerialization;
+ this.timeService = timeService;
this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
}
@@ -125,17 +129,14 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
* Compaction is pending, we cannot check it
*/
if ( shardEntryGroup.isCompactionPending() ) {
- return DeleteResult.NOT_CHECKED;
+ return DeleteResult.COMPACTION_PENDING;
}
- /**
- * If any of the shards can't be deleted, then we can't delete it
- */
- for ( final Shard shard : shardEntryGroup.getReadShards() ) {
- if ( !shardEntryGroup.canBeDeleted( shard ) ) {
- return DeleteResult.TOO_NEW;
- }
+ final long currentTime = timeService.getCurrentTime();
+
+ if ( shardEntryGroup.isNew( currentTime ) ) {
+ return DeleteResult.TOO_NEW;
}
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0286dcc6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
new file mode 100644
index 0000000..70c9b01
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.netflix.astyanax.MutationBatch;
+
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ShardGroupDeletionImplTest {
+
+
+ protected AsyncTaskExecutor asyncTaskExecutor;
+ protected ListeningExecutorService listeningExecutorService;
+ private ApplicationScopeImpl scope;
+
+
+ @Before
+ public void setup() {
+
+
+ this.scope = new ApplicationScopeImpl( createId( "application" ) );
+ }
+
+
+ @After
+ public void shutDown() {
+ listeningExecutorService.shutdownNow();
+ }
+
+
+ @Test
+ public void shardCannotBeCompacted() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+ final Shard shard1 = new Shard( 1000, createTime, false );
+
+ //set a 1 delta for testing
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard1 );
+ group.addShard( shard0 );
+
+ assertTrue( "this should return true for our test to succeed", group.isCompactionPending() );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete with pending compaction", ShardGroupDeletion.DeleteResult.COMPACTION_PENDING,
+ result );
+ }
+
+
+ @Test
+ public void shardTooNew() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertTrue( "this should return true for our test to succeed", group.isNew( currentTime ) );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete within timeout period", ShardGroupDeletion.DeleteResult.TOO_NEW, result );
+ }
+
+
+ @Test
+ public void hasEdges() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime * 2;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final Iterator<MarkedEdge> notMarkedIterator = Collections.singleton(
+ ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+ .iterator();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, notMarkedIterator );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, result );
+
+ //now check when marked we also retain them
+
+ final Iterator<MarkedEdge> markedEdgeIterator = Collections.singleton(
+ ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+ .iterator();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> markedFuture =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, markedEdgeIterator );
+
+ final ShardGroupDeletion.DeleteResult markedResult = future.get();
+
+ assertEquals( "should not delete with edges", ShardGroupDeletion.DeleteResult.CONTAINS_EDGES, markedResult );
+ }
+
+
+ @Test
+ public void testDeletion() throws ExecutionException, InterruptedException {
+
+ final long createTime = 10000;
+
+ final long currentTime = createTime * 2;
+
+ final Shard shard0 = new Shard( 0, createTime, true );
+
+
+ ////set a delta for way in the future
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertFalse( "this should return false for our test to succeed", group.isCompactionPending() );
+
+ assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
+
+
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+ //mock up returning a mutation
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+
+ when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta) )).thenReturn( mock(
+ MutationBatch.class) );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should delete", ShardGroupDeletion.DeleteResult.DELETED, result );
+ }
+
+
+
+ private DirectedEdgeMeta getDirectedEdgeMeta() {
+
+ final Id sourceId = createId( "source" );
+ final String edgeType = "test";
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+
+ return directedEdgeMeta;
+ }
+
+
+ private void initExecutor( final int numberThreads, final int queueLength ) {
+ listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+ .createTaskExecutor( "GraphTaskExecutor", numberThreads, queueLength,
+ TaskExecutorFactory.RejectionAction.ABORT ) );
+
+ asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+ when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService );
+ }
+}
[09/15] usergrid git commit: Fixes reporter issue and marks test as
ignored. When run regularly it kills Cassandra. I will unignore once merged
with the new sharding logic.
Posted by md...@apache.org.
Fixes reporter issue and marks test as ignored. When run regularly it kills Cassandra. I will unignore once merged with the new sharding logic.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f3693c85
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f3693c85
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f3693c85
Branch: refs/heads/2.1-release
Commit: f3693c8570e018d924add2a4851a2db7f805ccb0
Parents: 10d0a07
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:48:16 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:48:16 2015 -0600
----------------------------------------------------------------------
.../graph/GraphManagerShardConsistencyIT.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f3693c85/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index b82c28a..9ee7b95 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -41,6 +41,7 @@ import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,9 +87,7 @@ public class GraphManagerShardConsistencyIT {
private static final Meter writeMeter = registry.meter( "writeThroughput" );
- private static final Slf4jReporter reporter =
- Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
- .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+ private Slf4jReporter reporter;
protected ApplicationScope scope;
@@ -133,6 +132,11 @@ public class GraphManagerShardConsistencyIT {
scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
+ reporter =
+ Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+
reporter.start( 10, TimeUnit.SECONDS );
}
@@ -383,7 +387,8 @@ public class GraphManagerShardConsistencyIT {
}
- @Test
+ @Test(timeout=120000)
+ @Ignore("This works, but is occasionally causing cassandra to fall over. Unignore when merged with new shard strategy")
public void writeThousandsDelete()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
[10/15] usergrid git commit: Add endpoint for setting apigee mobile
config properties on the application.
Posted by md...@apache.org.
Add endpoint for setting apigee mobile config properties on the application.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/88d11790
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/88d11790
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/88d11790
Branch: refs/heads/2.1-release
Commit: 88d11790c2d077140a71347f97ad5bc75346af2c
Parents: f3693c8
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 21:16:50 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 21:16:50 2015 -0700
----------------------------------------------------------------------
.../rest/applications/ApplicationResource.java | 50 ++++++++++++++++++++
1 file changed, 50 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/88d11790/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index c521d4f..4eaf09a 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.rest.applications;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.json.JSONWithPadding;
import com.sun.jersey.api.view.Viewable;
import org.apache.amber.oauth2.common.error.OAuthError;
@@ -47,6 +48,7 @@ import org.apache.usergrid.rest.exceptions.NotFoundExceptionMapper;
import org.apache.usergrid.rest.exceptions.RedirectionException;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
import org.apache.usergrid.security.oauth.AccessInfo;
import org.apache.usergrid.security.oauth.ClientCredentialsInfo;
import org.slf4j.Logger;
@@ -601,4 +603,52 @@ public class ApplicationResource extends ServiceResource {
}
return new JSONWithPadding( value, callback );
}
+
+ // Specifically require superuser access as this is setting app properties directly (only way to currently do this
+ // with Apigee's apigeeMobileConfig
+ @RequireSystemAccess
+ @POST
+ @Path("apm/apigeeMobileConfig")
+ @Consumes(APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public JSONWithPadding setAPMConfig( @Context UriInfo ui,
+ @QueryParam("callback") @DefaultValue("callback") String callback,
+ Map<String, Object> json) throws Exception {
+
+ if(json == null || json.size() < 1){
+ logger.error("Param {} cannot be null for POST apm/apigeeMobileConfig", APIGEE_MOBILE_APM_CONFIG_JSON_KEY);
+ throw new IllegalArgumentException("Request body cannot be empty and must include apigeeMobileConfig params");
+ }
+
+ final String requestAppUUID = (String) json.get("applicationUUID");
+ if(!requestAppUUID.equalsIgnoreCase(applicationId.toString())){
+ logger.error("Provided application UUID {} does not match actual application UUID {}",
+ requestAppUUID,
+ applicationId.toString());
+ throw new IllegalArgumentException(
+ String.format("Provided application UUID %s does not match actual application UUID %s",
+ requestAppUUID,
+ applicationId.toString())
+ );
+ }
+
+ final String apmConfig = new ObjectMapper().writeValueAsString(json);
+ if(logger.isDebugEnabled()){
+ logger.debug("Received request to set apigeeMobileConfig properties with: {}", apmConfig);
+ }
+
+
+ EntityManager em = emf.getEntityManager( applicationId );
+ em.setProperty(new SimpleEntityRef(Application.ENTITY_TYPE, applicationId),
+ APIGEE_MOBILE_APM_CONFIG_JSON_KEY,
+ apmConfig
+ );
+
+ logger.info("Successfully set apigeeMobileConfig properties with: {}", apmConfig);
+
+ return new JSONWithPadding(apmConfig, callback);
+
+ }
+
+
}
[02/15] usergrid git commit: USERGRID-1056 - Move the filtering of
older entity versions on index deletes to Usergrid and not in the
Elasticsearch query.
Posted by md...@apache.org.
USERGRID-1056 - Move the filtering of older entity versions on index deletes to Usergrid and not in the Elasticsearch query.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/2c1147e4
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/2c1147e4
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/2c1147e4
Branch: refs/heads/2.1-release
Commit: 2c1147e49f76702dcf03ecaef6ece1f30306aa5f
Parents: c417099
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:07:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:07:18 2015 -0700
----------------------------------------------------------------------
.../index/impl/EsEntityIndexImpl.java | 45 +++++++++++++++-----
1 file changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/2c1147e4/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index f6ebce2..9be591e 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -460,7 +460,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
while(true){
//add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse );
+ candidates = aggregateScrollResults( candidates, searchResponse, null );
SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
.getScrollBuilder( searchResponse.getScrollId() )
@@ -510,11 +510,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
FilterBuilder entityIdFilter = FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME,
IndexingUtils.entityId(entityId));
- FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte(markedVersion);
-
- FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter, entityVersionFilter);
-
- srb.setPostFilter(andFilter);
+ srb.setPostFilter(entityIdFilter);
@@ -536,7 +532,7 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
while(true){
//add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse );
+ candidates = aggregateScrollResults( candidates, searchResponse, markedVersion);
SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
.getScrollBuilder( searchResponse.getScrollId() )
@@ -649,8 +645,8 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
return candidateResults;
}
- private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
- final SearchResponse searchResponse ){
+ private List<CandidateResult> aggregateScrollResults(List<CandidateResult> candidates,
+ final SearchResponse searchResponse, final UUID markedVersion){
final SearchHits searchHits = searchResponse.getHits();
final SearchHit[] hits = searchHits.getHits();
@@ -659,7 +655,36 @@ public class EsEntityIndexImpl implements EntityIndex,VersionedData {
final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
- candidates.add( candidateResult );
+ // if comparing against the latestVersion, make sure we only add the candidateResult if it's
+ // older than or equal to the latest marked version
+ if (markedVersion != null) {
+
+ if(candidateResult.getVersion().timestamp() <= markedVersion.timestamp()){
+
+ if(logger.isDebugEnabled()){
+ logger.debug("Candidate version {} is <= provided entity version {} for entityId {}",
+ candidateResult.getVersion(),
+ markedVersion,
+ candidateResult.getId()
+ );
+ }
+
+ candidates.add(candidateResult);
+
+ }else{
+ if(logger.isDebugEnabled()){
+ logger.debug("Candidate version {} is > provided entity version {} for entityId {}. Not" +
+ "adding to candidate results",
+ candidateResult.getVersion(),
+ markedVersion,
+ candidateResult.getId()
+ );
+ }
+ }
+
+ }else{
+ candidates.add(candidateResult);
+ }
}
logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );
[04/15] usergrid git commit: Update default properties that are known
to work in a single and multi-region configuration.
Posted by md...@apache.org.
Update default properties that are known to work in a single and multi-region configuration.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4935377c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4935377c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4935377c
Branch: refs/heads/2.1-release
Commit: 4935377c8cb5be0c11ae39814a4c2d5cceb604d8
Parents: 2c1147e
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:27:24 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:27:24 2015 -0700
----------------------------------------------------------------------
.../apache/usergrid/corepersistence/index/IndexProcessorFig.java | 2 +-
.../apache/usergrid/persistence/core/astyanax/CassandraFig.java | 2 +-
.../main/java/org/apache/usergrid/persistence/queue/QueueFig.java | 2 +-
3 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 7d022e5..5871e58 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,7 +65,7 @@ public interface IndexProcessorFig extends GuicyFig {
* Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
* If the timeout occurs, the messages will become visible again for re-processing.
*/
- @Default( "5000" ) // 5 seconds
+ @Default( "30000" ) // 30 seconds
@Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
int getIndexQueueVisibilityTimeout();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
index 79c198f..c82a683 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
@@ -71,7 +71,7 @@ public interface CassandraFig extends GuicyFig {
int getConnections();
@Key( "cassandra.timeout" )
- @Default( "5000" )
+ @Default( "10000" )
int getTimeout();
@Key("cassandra.discovery")
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4935377c/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
index 6f3a3dc..ad38f6d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueFig.java
@@ -59,7 +59,7 @@ public interface QueueFig extends GuicyFig {
* The maximum number of attempts to attempt to deliver before failing into the DLQ
*/
@Key( "usergrid.queue.deliveryLimit" )
- @Default("20")
+ @Default("10")
String getQueueDeliveryLimit();
@Key("usergrid.use.default.queue")
[13/15] usergrid git commit: Address issues found during code review
Posted by md...@apache.org.
Address issues found during code review
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b1e1ed18
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b1e1ed18
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b1e1ed18
Branch: refs/heads/2.1-release
Commit: b1e1ed181d88f6a8e4b1b7bac76c99f936f66c8c
Parents: c9bf87c
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 22 12:13:00 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 22 12:13:00 2015 -0600
----------------------------------------------------------------------
.../graph/GraphManagerShardConsistencyIT.java | 5 +++--
.../impl/shard/impl/ShardGroupDeletionImplTest.java | 11 ++++++++---
2 files changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1e1ed18/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 9ee7b95..c1917bb 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -55,6 +55,7 @@ import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -595,7 +596,7 @@ public class GraphManagerShardConsistencyIT {
while ( groups.hasNext() ) {
- group = groups.next();;
+ group = groups.next();
log.info( "Shard size for group is {}", group.getReadShards() );
@@ -604,7 +605,7 @@ public class GraphManagerShardConsistencyIT {
//we're done, 1 shard remains, we have a group, and it's our default shard
- if ( shardCount == 1 && group != null && group.getMinShard().getShardIndex() == 0 ) {
+ if ( shardCount == 1 && group != null && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) {
log.info( "All compactions complete," );
break;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b1e1ed18/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
index 9a3e407..f943e9b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -43,6 +43,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -252,7 +254,7 @@ public class ShardGroupDeletionImplTest {
//now check when marked we also retain them
final Iterator<MarkedEdge> markedEdgeIterator = Collections.singleton(
- ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, false ) )
+ ( MarkedEdge ) new SimpleMarkedEdge( createId( "source" ), "type", createId( "target" ), 1000, true ) )
.iterator();
@@ -266,7 +268,7 @@ public class ShardGroupDeletionImplTest {
@Test
- public void testDeletion() throws ExecutionException, InterruptedException {
+ public void testDeletion() throws ExecutionException, InterruptedException, ConnectionException {
final long createTime = 10000;
@@ -290,9 +292,10 @@ public class ShardGroupDeletionImplTest {
//mock up returning a mutation
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ final MutationBatch batch = mock( MutationBatch.class );
when( edgeShardSerialization.removeShardMeta( same( scope ), same( shard0 ), same( directedEdgeMeta ) ) )
- .thenReturn( mock( MutationBatch.class ) );
+ .thenReturn( batch );
final TimeService timeService = mock( TimeService.class );
@@ -310,6 +313,8 @@ public class ShardGroupDeletionImplTest {
final ShardGroupDeletion.DeleteResult result = future.get();
assertEquals( "should delete", ShardGroupDeletion.DeleteResult.DELETED, result );
+
+ verify(batch).execute();
}
[12/15] usergrid git commit: Change new endpoint to only require org
access.
Posted by md...@apache.org.
Change new endpoint to only require org access.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1b83a864
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1b83a864
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1b83a864
Branch: refs/heads/2.1-release
Commit: 1b83a864f97e0361266ebbe21e17c3a146bf46b4
Parents: c9bf87c
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Oct 22 10:50:35 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Oct 22 10:50:35 2015 -0700
----------------------------------------------------------------------
.../org/apache/usergrid/rest/applications/ApplicationResource.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1b83a864/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
index 4eaf09a..162565f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ApplicationResource.java
@@ -606,7 +606,7 @@ public class ApplicationResource extends ServiceResource {
// Specifically require superuser access as this is setting app properties directly (only way to currently do this
// with Apigee's apigeeMobileConfig
- @RequireSystemAccess
+ @RequireOrganizationAccess
@POST
@Path("apm/apigeeMobileConfig")
@Consumes(APPLICATION_JSON)
[05/15] usergrid git commit: Going back to 5 seconds default
visibility timeout.
Posted by md...@apache.org.
Going back to 5 seconds default visibility timeout.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5b01fc94
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5b01fc94
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5b01fc94
Branch: refs/heads/2.1-release
Commit: 5b01fc94c690bc03e1281915acb68780d15a910c
Parents: 4935377
Author: Michael Russo <mi...@gmail.com>
Authored: Wed Oct 21 14:32:17 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Wed Oct 21 14:32:17 2015 -0700
----------------------------------------------------------------------
.../apache/usergrid/corepersistence/index/IndexProcessorFig.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5b01fc94/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 5871e58..7d022e5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -65,7 +65,7 @@ public interface IndexProcessorFig extends GuicyFig {
* Received messages will remain 'in flight' until they are ack'd(deleted) or this timeout occurs.
* If the timeout occurs, the messages will become visible again for re-processing.
*/
- @Default( "30000" ) // 30 seconds
+ @Default( "5000" ) // 5 seconds
@Key( INDEX_QUEUE_VISIBILITY_TIMEOUT )
int getIndexQueueVisibilityTimeout();
[03/15] usergrid git commit: Added an audit for shards when they are
empty. They are only ever allocated when either a split + compaction needs to
happen. Therefore if they don't have a compaction pending,
they should never be empty.
Posted by md...@apache.org.
Added an audit for shards when they are empty. They are only ever allocated when either a split + compaction needs to happen. Therefore if they don't have a compaction pending, they should never be empty.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9fddd754
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9fddd754
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9fddd754
Branch: refs/heads/2.1-release
Commit: 9fddd7542572f00233dabff1890187faf89f9e86
Parents: c417099
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 15:21:23 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 15:21:23 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/graph/GraphFig.java | 2 +-
.../persistence/graph/guice/GraphModule.java | 11 +-
.../impl/EdgeSerializationImpl.java | 17 +-
.../impl/shard/AsyncTaskExecutor.java | 34 +++
.../impl/shard/ShardGroupCompaction.java | 4 -
.../impl/shard/ShardGroupDeletion.java | 68 ++++++
.../impl/shard/impl/AsyncTaskExecutorImpl.java | 53 +++++
.../shard/impl/ShardGroupColumnIterator.java | 61 +++---
.../shard/impl/ShardGroupCompactionImpl.java | 10 +-
.../impl/shard/impl/ShardGroupDeletionImpl.java | 206 +++++++++++++++++++
.../impl/shard/ShardGroupCompactionTest.java | 30 ++-
11 files changed, 451 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 28aab40..f0df7ff 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -133,7 +133,7 @@ public interface GraphFig extends GuicyFig {
int getShardCacheRefreshWorkerCount();
- @Default( "10" )
+ @Default( "20" )
@Key( SHARD_AUDIT_WORKERS )
int getShardAuditWorkerCount();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6280c7c..d2476eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -54,7 +54,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.TargetIdObservab
import org.apache.usergrid.persistence.graph.serialization.impl.migration.EdgeDataMigrationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigration;
import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphMigrationPlugin;
-import org.apache.usergrid.persistence.graph.serialization.impl.migration.GraphNode;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -62,14 +62,17 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardA
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupDeletionImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -116,6 +119,12 @@ public abstract class GraphModule extends AbstractModule {
bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
/**
+ * Binding for task tracker
+ */
+ bind( AsyncTaskExecutor.class ).to( AsyncTaskExecutorImpl.class );
+ bind( ShardGroupDeletion.class ).to( ShardGroupDeletionImpl.class );
+
+ /**
* Bind our strategies based on their internal annotations.
*/
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 4c1ae79..9e25946 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
@@ -67,6 +68,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
protected final EdgeColumnFamilies edgeColumnFamilies;
protected final ShardedEdgeSerialization shardedEdgeSerialization;
protected final TimeService timeService;
+ protected final ShardGroupDeletion shardGroupDeletion;
@Inject
@@ -74,7 +76,8 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
final EdgeColumnFamilies edgeColumnFamilies,
final ShardedEdgeSerialization shardedEdgeSerialization,
- final TimeService timeService ) {
+ final TimeService timeService, final ShardGroupDeletion shardGroupDeletion ) {
+
checkNotNull( keyspace, "keyspace required" );
@@ -83,6 +86,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
checkNotNull( timeService, "timeService required" );
+ checkNotNull( shardGroupDeletion, "shardGroupDeletion require");
this.keyspace = keyspace;
@@ -92,6 +96,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
this.timeService = timeService;
+ this.shardGroupDeletion = shardGroupDeletion;
}
@@ -293,7 +298,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
//now create a result iterator with our iterator of read shards
- return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ return new ShardGroupColumnIterator( scope, versionMetaData, shardGroupDeletion, readShards ) {
@Override
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
@@ -319,7 +324,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
@Override
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
@@ -346,7 +351,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
@Override
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization
@@ -372,7 +377,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
@Override
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
@@ -400,7 +405,7 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ return new ShardGroupColumnIterator( scope, directedEdgeMeta, shardGroupDeletion, readShards ) {
@Override
protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
return shardedEdgeSerialization
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
new file mode 100644
index 0000000..f4dc350
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/AsyncTaskExecutor.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+
+/**
+ * An interface for returning a singleton task executor
+ */
+public interface AsyncTaskExecutor {
+
+ /**
+ * Get the executor service for executing graph tasks
+ * @return
+ */
+ ListeningExecutorService getExecutorService();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 4fe1a63..0ff9fb1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,10 +22,6 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.util.concurrent.ListenableFuture;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
new file mode 100644
index 0000000..3d5a1ef
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+
+public interface ShardGroupDeletion {
+
+
+ /**
+ * Audit the shard entry group with the given NEW instance of the shardGroupColumnIterator. Returns the future with
+ * the outcome of the deletion task
+ *
+ * @param shardEntryGroup The group to evaluate
+ * @param edgeIterator The new instance of the edge iterator
+ *
+ * @return The delete result with the state of the delete operation
+ */
+ ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope applicationScope,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup,
+ final Iterator<MarkedEdge> edgeIterator );
+
+
+ enum DeleteResult {
+ /**
+ * Returned if the shard was delete
+ */
+ DELETED,
+
+ /**
+ * The shard contains edges and cannot be deleted
+ */
+ CONTAINS_EDGES,
+
+ /**
+ * The shard is too new, and may not have been fully replicated, we can't delete it safely
+ */
+ TOO_NEW,
+
+ /**
+ * Our capacity was saturated, we didnt' check the shard
+ */
+ NOT_CHECKED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
new file mode 100644
index 0000000..c534a22
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/AsyncTaskExecutorImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+
+/**
+ * Implementation for a single task execution system within graph
+ */
+@Singleton
+public class AsyncTaskExecutorImpl implements AsyncTaskExecutor {
+
+
+ private final ListeningExecutorService taskExecutor;
+
+
+ @Inject
+ public AsyncTaskExecutorImpl(final GraphFig graphFig){
+ this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
+ .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+ graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+ }
+
+
+ @Override
+ public ListeningExecutorService getExecutorService() {
+ return this.taskExecutor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
index a794a16..72b617f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -22,45 +22,52 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
-import java.util.List;
import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
/**
- *
* Iterator to keep iterating over multiple shard groups to stream results
*
* @param <T> The parsed return type
*/
-public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+public abstract class ShardGroupColumnIterator implements Iterator<MarkedEdge> {
private static final Logger logger = LoggerFactory.getLogger( ShardGroupColumnIterator.class );
+ private final ApplicationScope applicationScope;
+ private final DirectedEdgeMeta directedEdgeMeta;
+ private final ShardGroupDeletion shardGroupDeletion;
private final Iterator<ShardEntryGroup> entryGroupIterator;
- private Iterator<T> elements;
- public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator ){
+ private Iterator<MarkedEdge> elements;
+
+
+ public ShardGroupColumnIterator( final ApplicationScope applicationScope, final DirectedEdgeMeta directedEdgeMeta,
+ final ShardGroupDeletion shardGroupDeletion,
+ final Iterator<ShardEntryGroup> entryGroupIterator ) {
+ this.applicationScope = applicationScope;
+ this.directedEdgeMeta = directedEdgeMeta;
+ this.shardGroupDeletion = shardGroupDeletion;
this.entryGroupIterator = entryGroupIterator;
}
@@ -69,16 +76,16 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
public boolean hasNext() {
- if(elements == null){
+ if ( elements == null ) {
return advance();
}
- if(elements.hasNext()){
+ if ( elements.hasNext() ) {
return true;
}
//we've exhausted our shard groups and we don't have a next, we can't continue
- if(!entryGroupIterator.hasNext()){
+ if ( !entryGroupIterator.hasNext() ) {
return false;
}
@@ -88,7 +95,7 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
@Override
- public T next() {
+ public MarkedEdge next() {
if ( !hasNext() ) {
throw new NoSuchElementException( "There are no more rows or columns left to advance" );
}
@@ -105,17 +112,17 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
/**
* Get an iterator for the shard entry group
+ *
* @param readShards the read shards to use
- * @return
*/
- protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+ protected abstract Iterator<MarkedEdge> getIterator( Collection<Shard> readShards );
- public boolean advance(){
+ public boolean advance() {
logger.trace( "Advancing from shard entry group iterator" );
- while(entryGroupIterator.hasNext()){
+ while ( entryGroupIterator.hasNext() ) {
final ShardEntryGroup group = entryGroupIterator.next();
@@ -126,16 +133,22 @@ public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
/**
* We're done, we have some columns to return
*/
- if(elements.hasNext()){
+ if ( elements.hasNext() ) {
logger.trace( "Found edges in shard entry group {}", group );
return true;
}
+ else {
+ logger.trace( "Our shard is empty, we need to perform an audit on shard group {}", group );
+ //fire and forget if we miss it here, we'll get it next read
+ shardGroupDeletion.maybeDeleteShard(this.applicationScope, this.directedEdgeMeta, group, getIterator( group.getReadShards() ) );
+
+
+ }
}
logger.trace( "Completed iterating shard group iterator" );
return false;
-
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c9e389f..21f2d72 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -45,6 +45,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -108,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
- final EdgeShardSerialization edgeShardSerialization) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final AsyncTaskExecutor asyncTaskExecutor) {
this.timeService = timeService;
this.countAudits = new AtomicLong();
@@ -124,9 +126,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.shardAuditTaskTracker = new ShardAuditTaskTracker();
- this.taskExecutor = MoreExecutors.listeningDecorator( TaskExecutorFactory
- .createTaskExecutor( "ShardCompaction", graphFig.getShardAuditWorkerCount(),
- graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+ this.taskExecutor = asyncTaskExecutor.getExecutorService();
}
@@ -305,7 +305,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
countAudits.getAndIncrement();
-
+
if(LOG.isDebugEnabled()) {
LOG.debug("Auditing shard group. count is {} ", countAudits.get());
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
new file mode 100644
index 0000000..c51a021
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Implementation of the shard group deletion task
+ */
+@Singleton
+public class ShardGroupDeletionImpl implements ShardGroupDeletion {
+
+ private static final Logger logger = LoggerFactory.getLogger( ShardGroupDeletionImpl.class );
+
+ private final ListeningExecutorService asyncTaskExecutor;
+ private final EdgeShardSerialization edgeShardSerialization;
+
+
+ @Inject
+ public ShardGroupDeletionImpl( final AsyncTaskExecutor asyncTaskExecutor,
+ final EdgeShardSerialization edgeShardSerialization ) {
+ this.edgeShardSerialization = edgeShardSerialization;
+ this.asyncTaskExecutor = asyncTaskExecutor.getExecutorService();
+ }
+
+
+ @Override
+ public ListenableFuture<DeleteResult> maybeDeleteShard( final ApplicationScope applicationScope,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup,
+ final Iterator<MarkedEdge> edgeIterator ) {
+
+
+ /**
+ * Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
+ * hose the system
+ */
+ final ListenableFuture<DeleteResult> future;
+
+ try {
+ future = asyncTaskExecutor
+ .submit( new ShardDeleteTask( applicationScope, directedEdgeMeta, shardEntryGroup, edgeIterator ) );
+ }
+ catch ( RejectedExecutionException ree ) {
+
+ //ignore, if this happens we don't care, we're saturated, we can check later
+ logger.error( "Rejected shard delete check for group {}", edgeIterator );
+
+ return Futures.immediateFuture( DeleteResult.NOT_CHECKED );
+ }
+
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<DeleteResult>() {
+ @Override
+ public void onSuccess( @Nullable final ShardGroupDeletion.DeleteResult result ) {
+ logger.trace( "Successfully completed delete of task {}", result );
+ }
+
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ logger.error( "Unable to perform shard delete audit. Exception is ", t );
+ }
+ } );
+
+ return future;
+ }
+
+
+ /**
+ * Execute the logic for the delete
+ */
+ private DeleteResult maybeDeleteShardInternal( final ApplicationScope applicationScope,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup,
+ final Iterator<MarkedEdge> edgeIterator ) {
+
+ /**
+ * Compaction is pending, we cannot check it
+ */
+ if ( shardEntryGroup.isCompactionPending() ) {
+ return DeleteResult.NOT_CHECKED;
+ }
+
+
+ /**
+ * If any of the shards can't be deleted, then we can't delete it
+ */
+ for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+ if ( !shardEntryGroup.canBeDeleted( shard ) ) {
+ return DeleteResult.TOO_NEW;
+ }
+ }
+
+ /**
+ * We have edges, and therefore cannot delete them
+ */
+ if ( edgeIterator.hasNext() ) {
+ return DeleteResult.CONTAINS_EDGES;
+ }
+
+
+ //now we can proceed based on the shard meta state and we don't have any edge
+
+ MutationBatch rollup = null;
+
+ for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+ final MutationBatch shardRemovalMutation =
+ edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+
+ if ( rollup == null ) {
+ rollup = shardRemovalMutation;
+ }
+
+ else {
+ rollup.mergeShallow( shardRemovalMutation );
+ }
+ }
+
+
+ Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+
+ try {
+ rollup.execute();
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Unable to execute shard deletion", e );
+ throw new RuntimeException( "Unable to execute shard deletion", e );
+ }
+
+ return DeleteResult.DELETED;
+ }
+
+
+ /**
+ * Glue for executing the task
+ */
+ private final class ShardDeleteTask implements Callable<DeleteResult> {
+
+ private final ApplicationScope applicationScope;
+ private final DirectedEdgeMeta directedEdgeMeta;
+ private final ShardEntryGroup shardEntryGroup;
+ private final Iterator<MarkedEdge> edgeIterator;
+
+
+ private ShardDeleteTask( final ApplicationScope applicationScope, final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup, final Iterator<MarkedEdge> edgeIterator ) {
+ this.applicationScope = applicationScope;
+ this.directedEdgeMeta = directedEdgeMeta;
+ this.shardEntryGroup = shardEntryGroup;
+ this.edgeIterator = edgeIterator;
+ }
+
+
+ @Override
+ public DeleteResult call() throws Exception {
+ return maybeDeleteShardInternal( applicationScope, directedEdgeMeta, shardEntryGroup, edgeIterator );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9fddd754/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 7d4b7f6..65f19ff 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -26,16 +26,20 @@ import java.util.Collection;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.executor.TaskExecutorFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.util.IdGenerator;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.netflix.astyanax.Keyspace;
import static org.junit.Assert.assertEquals;
@@ -47,6 +51,8 @@ import static org.mockito.Mockito.when;
public class ShardGroupCompactionTest {
protected GraphFig graphFig;
+ protected AsyncTaskExecutor asyncTaskExecutor;
+ protected ListeningExecutorService listeningExecutorService;
protected ApplicationScope scope;
@@ -58,9 +64,25 @@ public class ShardGroupCompactionTest {
when( graphFig.getShardAuditWorkerQueueSize() ).thenReturn( 1000 );
+
+
+ listeningExecutorService = MoreExecutors.listeningDecorator( TaskExecutorFactory
+ .createTaskExecutor( "GraphTaskExecutor", graphFig.getShardAuditWorkerCount(),
+ graphFig.getShardAuditWorkerQueueSize(), TaskExecutorFactory.RejectionAction.ABORT ) );
+
+ asyncTaskExecutor = mock( AsyncTaskExecutor.class );
+
+ when( asyncTaskExecutor.getExecutorService() ).thenReturn( listeningExecutorService );
+
+
this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" ) );
}
+ @After
+ public void shutDown(){
+ listeningExecutorService.shutdownNow();
+ }
+
@Test
public void shouldNotCompact() {
@@ -77,6 +99,7 @@ public class ShardGroupCompactionTest {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
final long delta = 10000;
final long createTime = 20000;
@@ -92,9 +115,8 @@ public class ShardGroupCompactionTest {
when( timeService.getCurrentTime() ).thenReturn( timeNow );
ShardGroupCompactionImpl compaction =
- new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
- edgeColumnFamilies, keyspace, edgeShardSerialization );
-
+ new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
+ edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor );
DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
@@ -104,7 +126,7 @@ public class ShardGroupCompactionTest {
}
catch ( Throwable t ) {
assertEquals( "Correct error message returned", "Compaction cannot be run yet. Ignoring compaction.",
- t.getMessage() );
+ t.getMessage() );
}
}
[08/15] usergrid git commit: Merge branch 'USERGRID-1052' of
https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052
Posted by md...@apache.org.
Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10d0a070
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10d0a070
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10d0a070
Branch: refs/heads/2.1-release
Commit: 10d0a070758c2f1e499ff43c6efab866f8469d1e
Parents: beaed6f 5b01fc9
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:19:48 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:19:48 2015 -0600
----------------------------------------------------------------------
.../persistence/core/astyanax/CassandraFig.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 45 +++++++++++++++-----
.../usergrid/persistence/queue/QueueFig.java | 2 +-
3 files changed, 37 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[07/15] usergrid git commit: Fixes bug with deleting minimum shard.
This is no longer allowed.
Posted by md...@apache.org.
Fixes bug with deleting minimum shard. This is no longer allowed.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/beaed6ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/beaed6ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/beaed6ff
Branch: refs/heads/2.1-release
Commit: beaed6ffe1239ad404823cc4864b299e4113e135
Parents: 0286dcc
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Oct 21 18:19:40 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Oct 21 18:19:40 2015 -0600
----------------------------------------------------------------------
.../graph/serialization/impl/shard/Shard.java | 15 +
.../impl/shard/ShardEntryGroup.java | 4 +-
.../impl/shard/ShardGroupDeletion.java | 7 +-
.../shard/impl/NodeShardAllocationImpl.java | 81 ++--
.../impl/shard/impl/ShardGroupDeletionImpl.java | 30 +-
.../graph/GraphManagerShardConsistencyIT.java | 370 ++++++++++++++-----
.../impl/shard/ShardEntryGroupTest.java | 14 +
.../shard/impl/ShardGroupDeletionImplTest.java | 50 ++-
8 files changed, 419 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 9ca6cbe..472e0a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -21,6 +21,12 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
public class Shard implements Comparable<Shard> {
+
+ /**
+ * The minimum shard a shardIndex can possibly be set to
+ */
+ public static final Shard MIN_SHARD = new Shard(0, 0, true);
+
private final long shardIndex;
private final long createdTime;
private final boolean compacted;
@@ -58,6 +64,15 @@ public class Shard implements Comparable<Shard> {
/**
+ * Returns true if this is the minimum shard
+ * @return
+ */
+ public boolean isMinShard(){
+ return shardIndex == MIN_SHARD.shardIndex;
+ }
+
+
+ /**
* Compare the shards based on the timestamp first, then the created time second
*/
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 555d467..543f1fb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -301,7 +301,7 @@ public class ShardEntryGroup {
* @return
*/
public boolean isNew(final long currentTime){
- return currentTime - delta < maxCreatedTime;
+ return currentTime - delta <= maxCreatedTime;
}
/**
@@ -316,7 +316,7 @@ public class ShardEntryGroup {
final Shard compactionTarget = getCompactionTarget();
- return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+ return !shard.isCompacted() && !shard.isMinShard() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
.getShardIndex() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 640e8bd..37aa9e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -68,6 +68,11 @@ public interface ShardGroupDeletion {
/**
* Our capacity was saturated, we didnt' check the shard
*/
- NOT_CHECKED;
+ NOT_CHECKED,
+
+ /**
+ * We checked everything, but we didn't perform any operations. Happens when only the min shard remains
+ */
+ NO_OP;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 7a7fb3f..b0875af 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -60,8 +60,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
- private static final Shard MIN_SHARD = new Shard( 0, 0, true );
-
private final EdgeShardSerialization edgeShardSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
private final ShardedEdgeSerialization shardedEdgeSerialization;
@@ -99,29 +97,33 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
//its a new node, it doesn't need to check cassandra, it won't exist
if ( isNewNode( directedEdgeMeta ) ) {
- existingShards = Collections.singleton( MIN_SHARD ).iterator();
+ existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
}
else {
existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
- }
- if ( existingShards == null || !existingShards.hasNext() ) {
+ /**
+ * We didn't get anything out of cassandra, so we need to create the minumum shard
+ */
+ if ( existingShards == null || !existingShards.hasNext() ) {
- final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
- }
+ final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
- existingShards = Collections.singleton( MIN_SHARD ).iterator();
+ existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
+ }
}
+
return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
- directedEdgeMeta );
+ directedEdgeMeta );
}
@@ -173,17 +175,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return false;
}
- if(LOG.isDebugEnabled()){
- LOG.debug("Count of {} has exceeded shard config of {} will begin compacting", count, shardSize);
+ if ( LOG.isDebugEnabled() ) {
+ LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
}
/**
- * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a shard rapidly, we split it near the head of the values.
- * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node.
+ * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a
+ * shard rapidly, we split it near the head of the values.
+ * Further checks to this group will result in more splits, similar to creating a tree type structure and
+ * splitting each node.
*
- * This means that the lower shard can be re-split later if it is still too large. We do the division to truncate
- * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the
- * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point where we cannot
+ * This means that the lower shard can be re-split later if it is still too large. We do the division to
+ * truncate
+ * to a split point < what our current max is that would be approximately be our pivot ultimately if we split
+ * from the
+ * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point
+ * where we cannot
* ultimately compact to the correct shard size.
*/
@@ -193,13 +200,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
final Iterator<MarkedEdge> edges = directedEdgeMeta
- .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
- SearchByEdgeType.Order.ASCENDING );
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+ SearchByEdgeType.Order.ASCENDING );
if ( !edges.hasNext() ) {
- LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
- + "but no max value could be found in that row", directedEdgeMeta );
+ LOG.warn(
+ "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
+ directedEdgeMeta );
return false;
}
@@ -214,12 +222,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
- for(long i = 1; edges.hasNext(); i++){
+ for ( long i = 1; edges.hasNext(); i++ ) {
//we hit a pivot shard, set it since it could be the last one we encounter
- if(i% shardSize == 0){
+ if ( i % shardSize == 0 ) {
marked = edges.next();
}
- else{
+ else {
edges.next();
}
}
@@ -228,7 +236,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
* Sanity check in case our counters become severely out of sync with our edge state in cassandra.
*/
- if(marked == null){
+ if ( marked == null ) {
LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
return false;
}
@@ -262,8 +270,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
if ( minDelta < minimumAllowed ) {
- throw new GraphRuntimeException( String.format(
- "You must configure the property %s to be >= 2 x %s. Otherwise you risk losing data",
+ throw new GraphRuntimeException( String
+ .format( "You must configure the property %s to be >= 2 x %s. Otherwise you risk losing data",
GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
}
@@ -279,8 +287,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
//TODO: TN this is broken....
- //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
- final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+ //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units
+ // correct
+ final long timeoutDelta = graphFig.getShardCacheTimeout();
final long timeNow = timeService.getCurrentTime();
@@ -289,16 +298,16 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
//short circuit
- if(!isNew || node.getId().getUuid().version() > 2){
+ if ( !isNew || node.getId().getUuid().version() > 2 ) {
return false;
}
- final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+ final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() );
final long newExpirationTimeout = uuidTime + timeoutDelta;
//our expiration is after our current time, treat it as new
- isNew = isNew && newExpirationTimeout > timeNow;
+ isNew = isNew && newExpirationTimeout > timeNow;
}
return isNew;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 9c97d77..38a7834 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -149,9 +149,18 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
//now we can proceed based on the shard meta state and we don't have any edge
+ DeleteResult result = DeleteResult.NO_OP;
+
MutationBatch rollup = null;
for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+
+ //skip the min shard
+ if(shard.isMinShard()){
+ continue;
+ }
+
+
final MutationBatch shardRemovalMutation =
edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
@@ -162,20 +171,23 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
else {
rollup.mergeShallow( shardRemovalMutation );
}
+
+ result = DeleteResult.DELETED;
}
- Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+ if( rollup != null) {
- try {
- rollup.execute();
- }
- catch ( ConnectionException e ) {
- logger.error( "Unable to execute shard deletion", e );
- throw new RuntimeException( "Unable to execute shard deletion", e );
- }
+ try {
+ rollup.execute();
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Unable to execute shard deletion", e );
+ throw new RuntimeException( "Unable to execute shard deletion", e );
+ }
+ }
- return DeleteResult.DELETED;
+ return result;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2fb08a4..b82c28a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -71,10 +71,8 @@ import com.google.inject.Injector;
import com.netflix.config.ConfigurationManager;
import rx.Observable;
-import rx.functions.Action1;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
@@ -89,8 +87,8 @@ public class GraphManagerShardConsistencyIT {
private static final Meter writeMeter = registry.meter( "writeThroughput" );
private static final Slf4jReporter reporter =
- Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
- .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+ Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
protected ApplicationScope scope;
@@ -102,6 +100,8 @@ public class GraphManagerShardConsistencyIT {
protected Object originalShardDelta;
+ protected ListeningExecutorService executor;
+
@Before
public void setupOrg() {
@@ -141,12 +141,19 @@ public class GraphManagerShardConsistencyIT {
public void tearDown() {
reporter.stop();
reporter.report();
+
+ executor.shutdownNow();
+ }
+
+
+ private void createExecutor( final int size ) {
+ executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
}
@Test
public void writeThousandsSingleSource()
- throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+ throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
final Id sourceId = IdGenerator.createId( "source" );
final String edgeType = "test";
@@ -166,13 +173,13 @@ public class GraphManagerShardConsistencyIT {
@Override
public Observable<Edge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource(
- new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, Optional.<Edge>absent() ) );
+ new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.<Edge>absent() ) );
}
};
-// final int numInjectors = 2;
+ // final int numInjectors = 2;
final int numInjectors = 1;
/**
@@ -200,16 +207,13 @@ public class GraphManagerShardConsistencyIT {
final long numberOfEdges = shardSize * 4;
- final long workerWriteLimit = numberOfEdges / numWorkersPerInjector;
-
+ final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
- final long expectedShardCount = numberOfEdges/shardSize;
+ final long expectedShardCount = numberOfEdges / shardSize;
- final ListeningExecutorService
- executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( numWorkersPerInjector ) );
-
+ createExecutor( numWorkersPerInjector );
final AtomicLong writeCounter = new AtomicLong();
@@ -219,24 +223,22 @@ public class GraphManagerShardConsistencyIT {
log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
- numInjectors );
+ numInjectors );
final List<Future<Boolean>> futures = new ArrayList<>();
-
for ( Injector injector : injectors ) {
final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
- Future<Boolean> future = executor
- .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ Future<Boolean> future =
+ executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
futures.add( future );
}
-
}
/**
@@ -262,11 +264,10 @@ public class GraphManagerShardConsistencyIT {
/**
* Start reading continuously while we migrate data to ensure our view is always correct
*/
- final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
-
- final List<Throwable> failures = new ArrayList<>( );
-
+ final ListenableFuture<Long> future =
+ executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ final List<Throwable> failures = new ArrayList<>();
//add the future
@@ -287,37 +288,31 @@ public class GraphManagerShardConsistencyIT {
} );
-
int compactedCount;
-
-
-
//now start our readers
while ( true ) {
- if(!failures.isEmpty()){
+ if ( !failures.isEmpty() ) {
- StringBuilder builder = new StringBuilder( );
+ StringBuilder builder = new StringBuilder();
- builder.append("Read runner failed!\n");
+ builder.append( "Read runner failed!\n" );
- for(Throwable t: failures){
+ for ( Throwable t : failures ) {
builder.append( "Exception is: " );
- ByteArrayOutputStream output = new ByteArrayOutputStream( );
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
t.printStackTrace( new PrintWriter( output ) );
- builder.append( output.toString( "UTF-8" ));
+ builder.append( output.toString( "UTF-8" ) );
builder.append( "\n\n" );
-
}
-
- fail(builder.toString());
+ fail( builder.toString() );
}
//reset our count. Ultimately we'll have 4 groups once our compaction completes
@@ -344,25 +339,21 @@ public class GraphManagerShardConsistencyIT {
if ( compactedCount >= expectedShardCount ) {
log.info( "All compactions complete, sleeping" );
-// final Object mutex = new Object();
-//
-// synchronized ( mutex ){
-//
-// mutex.wait();
-// }
+ // final Object mutex = new Object();
+ //
+ // synchronized ( mutex ){
+ //
+ // mutex.wait();
+ // }
break;
-
}
Thread.sleep( 2000 );
-
}
executor.shutdownNow();
-
-
}
@@ -392,7 +383,236 @@ public class GraphManagerShardConsistencyIT {
}
+ @Test
+ public void writeThousandsDelete()
+ throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+
+ final Id sourceId = IdGenerator.createId( "source" );
+ final String edgeType = "test";
+
+ final EdgeGenerator generator = new EdgeGenerator() {
+
+
+ @Override
+ public Edge newEdge() {
+ Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) );
+
+
+ return edge;
+ }
+
+
+ @Override
+ public Observable<Edge> doSearch( final GraphManager manager ) {
+ return manager.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.<Edge>absent(), false ) );
+ }
+ };
+
+
+ // final int numInjectors = 2;
+ final int numInjectors = 1;
+
+ /**
+ * create 3 injectors. This way all the caches are independent of one another. This is the same as
+ * multiple nodes
+ */
+ final List<Injector> injectors = createInjectors( numInjectors );
+
+
+ final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+ final long shardSize = graphFig.getShardSize();
+
+
+ //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
+ // power for writes
+ final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
+
+ final int numWorkersPerInjector = numProcessors / numInjectors;
+
+
+ /**
+ * Do 4x shard size so we should have approximately 4 shards
+ */
+ final long numberOfEdges = shardSize * 4;
+
+
+ final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
+
+ createExecutor( numWorkersPerInjector );
+
+
+ final AtomicLong writeCounter = new AtomicLong();
+
+ //min stop time the min delta + 1 cache cycle timeout
+ final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+
+
+ log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
+ numInjectors );
+
+
+ final List<Future<Boolean>> futures = new ArrayList<>();
+
+
+ for ( Injector injector : injectors ) {
+ final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+
+ for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+ Future<Boolean> future =
+ executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+
+ futures.add( future );
+ }
+ }
+
+ /**
+ * Wait for all writes to complete
+ */
+ for ( Future<Boolean> future : futures ) {
+ future.get();
+ }
+
+ //now get all our shards
+ final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+
+ //now submit the readers.
+ final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
+
+
+ final long writeCount = writeCounter.get();
+ final Meter readMeter = registry.meter( "readThroughput" );
+
+
+ //check our shard state
+
+
+ final Iterator<ShardEntryGroup> existingShardGroups =
+ cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+ int shardCount = 0;
+
+ while ( existingShardGroups.hasNext() ) {
+ final ShardEntryGroup group = existingShardGroups.next();
+
+ shardCount++;
+
+ log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+ }
+
+
+ log.info( "found {} shard groups", shardCount );
+
+
+ //now mark and delete all the edges
+
+
+ final GraphManager manager = gmf.createEdgeManager( scope );
+
+ //sleep occasionally to stop pushing cassandra over
+
+ long count = Long.MAX_VALUE;
+
+ while(count != 0) {
+ //take 10000 then sleep
+ count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap( edge -> manager.markEdge( edge ) )
+ .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+
+ Thread.sleep( 500 );
+ }
+
+
+ //now loop until with a reader until our shards are gone
+
+
+ /**
+ * Start reading continuously while we migrate data to ensure our view is always correct
+ */
+ final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
+
+ final List<Throwable> failures = new ArrayList<>();
+
+
+ //add the future
+ Futures.addCallback( future, new FutureCallback<Long>() {
+
+ @Override
+ public void onSuccess( @Nullable final Long result ) {
+ log.info( "Successfully ran the read, re-running" );
+ executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ }
+
+
+ @Override
+ public void onFailure( final Throwable t ) {
+ failures.add( t );
+ log.error( "Failed test!", t );
+ }
+ } );
+
+
+ //now start our readers
+
+ while ( true ) {
+
+ if ( !failures.isEmpty() ) {
+
+ StringBuilder builder = new StringBuilder();
+
+ builder.append( "Read runner failed!\n" );
+
+ for ( Throwable t : failures ) {
+ builder.append( "Exception is: " );
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ t.printStackTrace( new PrintWriter( output ) );
+
+ builder.append( output.toString( "UTF-8" ) );
+ builder.append( "\n\n" );
+ }
+
+
+ fail( builder.toString() );
+ }
+
+ //reset our count. Ultimately we'll have 4 groups once our compaction completes
+ shardCount = 0;
+
+ //we have to get it from the cache, because this will trigger the compaction process
+ final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+ ShardEntryGroup group = null;
+
+ while ( groups.hasNext() ) {
+
+ group = groups.next();;
+
+ log.info( "Shard size for group is {}", group.getReadShards() );
+
+ shardCount += group.getReadShards().size();
+ }
+
+
+ //we're done, 1 shard remains, we have a group, and it's our default shard
+ if ( shardCount == 1 && group != null && group.getMinShard().getShardIndex() == 0 ) {
+ log.info( "All compactions complete," );
+
+ break;
+ }
+
+
+ Thread.sleep( 2000 );
+ }
+
+ //now that we have finished expanding s
+
+ executor.shutdownNow();
+ }
private class Worker implements Callable<Boolean> {
@@ -436,7 +656,6 @@ public class GraphManagerShardConsistencyIT {
writeCounter.incrementAndGet();
-
if ( i % 1000 == 0 ) {
log.info( " Wrote: " + i );
}
@@ -454,8 +673,9 @@ public class GraphManagerShardConsistencyIT {
private final long writeCount;
private final Meter readMeter;
+
private ReadWorker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount,
- final Meter readMeter) {
+ final Meter readMeter ) {
this.factory = factory;
this.generator = generator;
this.writeCount = writeCount;
@@ -467,72 +687,28 @@ public class GraphManagerShardConsistencyIT {
public Long call() throws Exception {
-
-
GraphManager gm = factory.createEdgeManager( scope );
-
- while(true) {
-
-// final long[] count = {0};
-// final long[] duplicate = {0};
-// final HashSet<Edge > seen = new HashSet<>((int)writeCount);
+ while ( true ) {
//do a read to eventually trigger our group compaction. Take 2 pages of columns
final long returnedEdgeCount = generator.doSearch( gm )
- .doOnNext( new Action1<Edge>() {
-
-
- // private Edge last;
-
-
- @Override
- public void call( final Edge edge ) {
- readMeter.mark();
-
- // count[0]++;
- //
- // /**
- // * Added this check as part
- // of the read
- // */
- // if ( last != null && last
- // .equals(edge) ) {
- // fail( String.format( "Expected edges to be in order, however last was %s and current is %s",
- // last, edge ) );
- // }
- //
- // last = edge;
- //
- // if( seen.contains( edge ) ){
- // fail( String.format("Returned an edge that was already seen! Edge was %s, last edge was %s", edge, last) );
- // duplicate[0]++;
- // }
- //
- // seen.add( edge );
-
- }
- } )
+ .doOnNext( edge -> readMeter.mark() )
.countLong().toBlocking().last();
-
-// if(returnedEdgeCount != count[0]-duplicate[0]){
-// log.warn( "Missing entries from the initial put" );
-// }
-
log.info( "Completed reading {} edges", returnedEdgeCount );
- if(writeCount != returnedEdgeCount){
- log.warn( "Unexpected edge count returned!!! Expected {} but was {}", writeCount, returnedEdgeCount );
+ if ( writeCount != returnedEdgeCount ) {
+ log.warn( "Unexpected edge count returned!!! Expected {} but was {}", writeCount,
+ returnedEdgeCount );
}
assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount );
}
-
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index 9289340..30634e2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -392,6 +392,20 @@ public class ShardEntryGroupTest {
assertFalse( "Shard added", result );
}
+
+
+ @Test
+ public void minShardNotDeleted() {
+ Shard minShard = Shard.MIN_SHARD;
+
+ ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1 );
+
+ shardEntryGroup.addShard( minShard );
+
+ assertTrue( minShard.isMinShard() );
+
+ assertFalse( shardEntryGroup.canBeDeleted( minShard ) );
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
index 70c9b01..9a3e407 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -120,6 +120,46 @@ public class ShardGroupDeletionImplTest {
@Test
+ public void shardIsMinShard() throws ExecutionException, InterruptedException {
+
+
+ final long currentTime = 1000;
+
+ final Shard shard0 = Shard.MIN_SHARD;
+
+
+ //set a 1 delta for testing
+ final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+ group.addShard( shard0 );
+
+ assertTrue( "this should return false for our test to succeed", shard0.isMinShard() );
+
+
+ final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+ final TimeService timeService = mock( TimeService.class );
+
+ when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+ initExecutor( 1, 1 );
+
+ final ShardGroupDeletionImpl shardGroupDeletion =
+ new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
+
+ final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+ final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+ shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
+
+ final ShardGroupDeletion.DeleteResult result = future.get();
+
+ assertEquals( "should not delete min shard", ShardGroupDeletion.DeleteResult.NO_OP, result );
+ }
+
+
+ @Test
public void shardTooNew() throws ExecutionException, InterruptedException {
final long createTime = 10000;
@@ -232,7 +272,7 @@ public class ShardGroupDeletionImplTest {
final long currentTime = createTime * 2;
- final Shard shard0 = new Shard( 0, createTime, true );
+ final Shard shard0 = new Shard( 1000, createTime, true );
////set a delta for way in the future
@@ -245,15 +285,14 @@ public class ShardGroupDeletionImplTest {
assertFalse( "this should return false for our test to succeed", group.isNew( currentTime ) );
-
final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
//mock up returning a mutation
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
- when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta) )).thenReturn( mock(
- MutationBatch.class) );
+ when( edgeShardSerialization.removeShardMeta( same( scope ), same( shard0 ), same( directedEdgeMeta ) ) )
+ .thenReturn( mock( MutationBatch.class ) );
final TimeService timeService = mock( TimeService.class );
@@ -265,8 +304,6 @@ public class ShardGroupDeletionImplTest {
new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService );
-
-
final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator() );
@@ -276,7 +313,6 @@ public class ShardGroupDeletionImplTest {
}
-
private DirectedEdgeMeta getDirectedEdgeMeta() {
final Id sourceId = createId( "source" );
[14/15] usergrid git commit: Merge branch 'USERGRID-1052' of
https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052
Posted by md...@apache.org.
Merge branch 'USERGRID-1052' of https://git-wip-us.apache.org/repos/asf/usergrid into USERGRID-1052
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/48161a1a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/48161a1a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/48161a1a
Branch: refs/heads/2.1-release
Commit: 48161a1ac9bfbf3e8c3aefa9586fc104ff0ec664
Parents: b1e1ed1 1b83a86
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Oct 22 12:13:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Oct 22 12:13:08 2015 -0600
----------------------------------------------------------------------
.../org/apache/usergrid/rest/applications/ApplicationResource.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[11/15] usergrid git commit: Fix issues with integrating with
Apigee's mobile analytics.
Posted by md...@apache.org.
Fix issues with integrating with Apigee's mobile analytics.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/c9bf87c6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/c9bf87c6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/c9bf87c6
Branch: refs/heads/2.1-release
Commit: c9bf87c68defd1cb84bd8b65220b1994f0e8b17e
Parents: 88d1179
Author: Michael Russo <mi...@gmail.com>
Authored: Thu Oct 22 10:20:42 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Thu Oct 22 10:20:42 2015 -0700
----------------------------------------------------------------------
.../usergrid/management/cassandra/ManagementServiceImpl.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c9bf87c6/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
index 03b798c..70d74fc 100644
--- a/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
+++ b/stack/services/src/main/java/org/apache/usergrid/management/cassandra/ManagementServiceImpl.java
@@ -1610,7 +1610,8 @@ public class ManagementServiceImpl implements ManagementService {
@Override
public ApplicationInfo createApplication( UUID organizationId, String applicationName ) throws Exception {
- return createApplication( organizationId, applicationName,null, null );
+ // DO NOT CHANGE THIS AS SOME EXTERNAL CLASSES MAY RELY ON THIS BEHAVIOR WHEN EXTENDING
+ return createApplication( organizationId, applicationName, null );
}
[15/15] usergrid git commit: Merge commit 'refs/pull/407/head' of
apache.github.com:apache/usergrid into 2.1-release
Posted by md...@apache.org.
Merge commit 'refs/pull/407/head' of apache.github.com:apache/usergrid into 2.1-release
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/0604247f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/0604247f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/0604247f
Branch: refs/heads/2.1-release
Commit: 0604247f690db09b1a25feef9532cb5e58929437
Parents: 1fe1d1a 48161a1
Author: Mike Dunker <md...@apigee.com>
Authored: Thu Oct 22 11:27:59 2015 -0700
Committer: Mike Dunker <md...@apigee.com>
Committed: Thu Oct 22 11:27:59 2015 -0700
----------------------------------------------------------------------
.../corepersistence/index/IndexServiceImpl.java | 3 +-
.../persistence/core/astyanax/CassandraFig.java | 2 +-
.../core/astyanax/MultiRowColumnIterator.java | 46 +--
.../usergrid/persistence/graph/GraphFig.java | 2 +-
.../persistence/graph/guice/GraphModule.java | 11 +-
.../impl/EdgeSerializationImpl.java | 17 +-
.../impl/shard/AsyncTaskExecutor.java | 34 ++
.../graph/serialization/impl/shard/Shard.java | 15 +
.../impl/shard/ShardEntryGroup.java | 13 +-
.../impl/shard/ShardGroupCompaction.java | 4 -
.../impl/shard/ShardGroupDeletion.java | 78 ++++
.../impl/shard/impl/AsyncTaskExecutorImpl.java | 53 +++
.../shard/impl/NodeShardAllocationImpl.java | 81 ++--
.../shard/impl/ShardGroupColumnIterator.java | 72 ++--
.../shard/impl/ShardGroupCompactionImpl.java | 10 +-
.../impl/shard/impl/ShardGroupDeletionImpl.java | 219 +++++++++++
.../impl/shard/impl/ShardsColumnIterator.java | 10 +
.../graph/GraphManagerShardConsistencyIT.java | 378 ++++++++++++++-----
.../impl/shard/ShardEntryGroupTest.java | 14 +
.../impl/shard/ShardGroupCompactionTest.java | 30 +-
.../shard/impl/ShardGroupDeletionImplTest.java | 341 +++++++++++++++++
.../index/impl/EsEntityIndexImpl.java | 45 ++-
.../usergrid/persistence/queue/QueueFig.java | 2 +-
.../rest/applications/ApplicationResource.java | 50 +++
.../cassandra/ManagementServiceImpl.java | 3 +-
25 files changed, 1307 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/0604247f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraFig.java
----------------------------------------------------------------------