You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/08/28 18:21:02 UTC
[1/5] usergrid git commit: Add additional logging statements.
Repository: usergrid
Updated Branches:
refs/heads/hotfix-20160819 cda2879c0 -> 44404248c
Add additional logging statements.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/dbdd2431
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/dbdd2431
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/dbdd2431
Branch: refs/heads/hotfix-20160819
Commit: dbdd2431a7b65f6f2f8383e6c5fd598cac768329
Parents: fd7a75e
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Aug 27 15:35:44 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Aug 27 15:35:44 2016 -0700
----------------------------------------------------------------------
.../corepersistence/pipeline/read/ResultsPage.java | 14 +++++++++++++-
.../read/traverse/AbstractReadGraphFilter.java | 7 +++++++
.../corepersistence/results/IdQueryExecutor.java | 10 ++++++++++
.../results/ObservableQueryExecutor.java | 3 +++
4 files changed, 33 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbdd2431/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
index 6b3a086..e4c3828 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/ResultsPage.java
@@ -23,7 +23,9 @@ package org.apache.usergrid.corepersistence.pipeline.read;
import java.util.List;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.corepersistence.results.ObservableQueryExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -34,6 +36,9 @@ import org.apache.usergrid.persistence.model.entity.Entity;
*/
public class ResultsPage<T> {
+ private static final Logger logger = LoggerFactory.getLogger( ResultsPage.class );
+
+
private final List<T> entityList;
private final int limit;
@@ -57,6 +62,13 @@ public class ResultsPage<T> {
* Return true if the results page is empty
*/
public boolean hasMoreResults() {
+ if(logger.isTraceEnabled()){
+ if(entityList != null && entityList.size() == limit){
+ logger.trace("hasMoreResults = true, entityList size: {}, limit: {}", entityList.size(), limit);
+ }else{
+ logger.trace("hasMoreResults = false, entityList size: {}, limit: {}", entityList!= null ? entityList.size() : "null", limit);
+ }
+ }
return entityList != null && entityList.size() == limit;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbdd2431/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
index f2aed89..62f6548 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/traverse/AbstractReadGraphFilter.java
@@ -198,9 +198,16 @@ public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id,
//if it's our first pass, there's no cursor to generate
if(cursorValue == null){
+ if(logger.isTraceEnabled()){
+ logger.trace("Cursor value is null, creating filter result with no cursor");
+ }
return new FilterResult<>( emit, parent );
}
+ if(logger.isTraceEnabled()){
+ logger.trace("Cursor value is not null, creating filter result with cursor: {}", cursorValue.toString());
+ }
+
return super.createFilterResult( emit, cursorValue, parent );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbdd2431/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
index 5a0bb3f..ea3d0ac 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/IdQueryExecutor.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observable;
@@ -45,6 +47,9 @@ import rx.Observable;
@Deprecated//Required for 1.0 compatibility
public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> {
+ private static final Logger logger = LoggerFactory.getLogger( IdQueryExecutor.class );
+
+
protected IdQueryExecutor( final Optional<String> startCursor ) {
super( startCursor );
@@ -53,6 +58,11 @@ public abstract class IdQueryExecutor extends ObservableQueryExecutor<Id> {
@Override
protected Results createResults( final ResultsPage resultsPage ) {
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Creating Id results from resultsPage");
+ }
+
final List<Id> ids = resultsPage.getEntityList();
List<UUID> uuids = ids.stream().map(id -> id.getUuid()).collect(Collectors.toList());
http://git-wip-us.apache.org/repos/asf/usergrid/blob/dbdd2431/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
index 007486f..e12178f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/ObservableQueryExecutor.java
@@ -107,6 +107,9 @@ public abstract class ObservableQueryExecutor<T> implements QueryExecutor {
public boolean hasNext() {
if ( !complete && results == null) {
+ if(logger.isTraceEnabled()){
+ logger.trace("Iterator not complete and there are results object is null, advancing");
+ }
advance();
}
[2/5] usergrid git commit: Move to shard deletion being a mark plus
read filtering strategy vs. deleting. This is so Usergrid can keep proper
shard end times and 're-activate' a shard by flipping the deleted flag when
past data is re-written to a collec
Posted by mr...@apache.org.
Move to shard deletion being a mark plus read filtering strategy vs. deleting. This is so Usergrid can keep proper shard end times and 're-activate' a shard by flipping the deleted flag when past data is re-written to a collection.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/42c5c4b8
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/42c5c4b8
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/42c5c4b8
Branch: refs/heads/hotfix-20160819
Commit: 42c5c4b8760b78ea1e8ef9b8d583a92f2a45e1d4
Parents: dbdd243
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Aug 27 23:24:01 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Aug 27 23:24:01 2016 -0700
----------------------------------------------------------------------
.../astyanax/MultiRowShardColumnIterator.java | 27 ++
.../persistence/core/shard/SmartShard.java | 9 +-
.../impl/shard/NodeShardAllocation.java | 5 +-
.../graph/serialization/impl/shard/Shard.java | 17 +-
.../impl/shard/impl/EdgeSearcher.java | 17 +-
.../shard/impl/NodeShardAllocationImpl.java | 9 +-
.../impl/shard/impl/NodeShardCacheImpl.java | 24 +-
.../shard/impl/ShardGroupCompactionImpl.java | 2 +-
.../impl/shard/impl/ShardGroupDeletionImpl.java | 24 +-
.../impl/ShardedEdgeSerializationImpl.java | 106 +++--
.../shard/impl/serialize/ShardSerializer.java | 11 +-
.../graph/GraphManagerShardConsistencyIT.java | 396 +++++++++++++++++--
.../impl/shard/NodeShardAllocationTest.java | 4 +-
.../impl/shard/NodeShardCacheTest.java | 4 +-
14 files changed, 559 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
index f3e8d4c..04266e1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -127,6 +127,23 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
// advance to the next shard
currentShard = currentShardIterator.next();
+ // handle marked deleted shards
+ if( currentShard.isDeleted() && currentShardIterator.hasNext()){
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Shard is marked deleted, advancing to next - {}", currentShard);
+ }
+
+ currentShard = currentShardIterator.next();
+ }else if ( currentShard.isDeleted() && !currentShardIterator.hasNext()){
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Shard is marked deleted, and there is no more - {}", currentShard);
+ }
+
+ return false;
+ }
+
if(logger.isTraceEnabled()){
logger.trace("Shard after advance: {}", currentShard);
@@ -221,6 +238,16 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
}
}
+ // skip over shards that are marked deleted
+ if( currentShard.isDeleted() && currentShardIterator.hasNext() ){
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Shard is marked deleted - {}", currentShard);
+ }
+
+ currentShard = currentShardIterator.next();
+ }
+
if(logger.isTraceEnabled()){
logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
index 39ddc35..dd6df34 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -25,13 +25,15 @@ public class SmartShard<R, T> {
final ScopedRowKey<R> rowKey;
final T shardEnd;
final long shardIndex;
+ final boolean isDeleted;
- public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd){
+ public SmartShard(final ScopedRowKey<R> rowKey, final long shardIndex, final T shardEnd, final boolean isDeleted){
this.rowKey = rowKey;
this.shardIndex = shardIndex;
this.shardEnd = shardEnd;
+ this.isDeleted = isDeleted;
}
@@ -48,12 +50,15 @@ public class SmartShard<R, T> {
return shardIndex;
}
+ public boolean isDeleted(){
+ return isDeleted;
+ }
@Override
public String toString(){
- return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }";
+ return "Shard { rowKey="+rowKey + ", shardIndex="+shardIndex+", shardEnd="+shardEnd+", isDeleted="+isDeleted+" }";
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index fcc0fc3..49fde9b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import com.google.common.base.Optional;
-
/**
* Interface used to create and retrieve shards
@@ -38,11 +36,10 @@ public interface NodeShardAllocation {
* Get all shards for the given info. If none exist, a default shard should be allocated. The nodeId is the source node
*
* @param scope The application scope
- * @param maxShardId The max value to start seeking from. Values <= this will be returned if specified
* @param directedEdgeMeta The directed edge metadata to use
* @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated
*/
- public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta );
+ public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta);
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 6394703..f92c37a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -32,8 +32,9 @@ public class Shard implements Comparable<Shard> {
private final long shardIndex;
private final long createdTime;
- private final boolean compacted;
+ private boolean compacted;
private Optional<DirectedEdge> shardEnd;
+ private boolean deleted;
public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
@@ -41,6 +42,7 @@ public class Shard implements Comparable<Shard> {
this.createdTime = createdTime;
this.compacted = compacted;
this.shardEnd = Optional.absent();
+ this.deleted = false;
}
@@ -67,6 +69,10 @@ public class Shard implements Comparable<Shard> {
return compacted;
}
+ public void setCompacted(final boolean compacted){
+ this.compacted = compacted;
+ }
+
/**
* Returns true if this is the minimum shard
@@ -84,6 +90,14 @@ public class Shard implements Comparable<Shard> {
return shardEnd;
}
+ public boolean isDeleted(){
+ return deleted;
+ }
+
+ public void setDeleted( final boolean deleted){
+ this.deleted = deleted;
+ }
+
/**
* Compare the shards based on the timestamp first, then the created time second
@@ -174,6 +188,7 @@ public class Shard implements Comparable<Shard> {
}else{
string.append("null");
}
+ string.append(", isDeleted=").append(deleted);
string.append(" }");
return string.toString();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 917e943..eb90866 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -80,6 +80,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
public List<ScopedRowKey<R>> getRowKeys() {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Shards: {}", shards);
+ }
+
List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
for(Shard shard : shards){
@@ -89,12 +93,18 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
rowKeys.add( rowKey );
}
+ if(logger.isTraceEnabled()) {
+ logger.trace("Resulting Shards: {}", rowKeys);
+ }
return rowKeys;
}
public List<SmartShard> getRowKeysWithShardEnd(){
+ if(logger.isTraceEnabled()) {
+ logger.trace("Shards: {}", shards);
+ }
final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size());
@@ -111,9 +121,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
shardEnd = null;
}
- rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd));
+ rowKeysWithShardEnd.add(new SmartShard(rowKey, shard.getShardIndex(), shardEnd, shard.isDeleted()));
}
+ if(logger.isTraceEnabled()) {
+ logger.trace("Resulting Smart Shards: {}", rowKeysWithShardEnd);
+ }
+
+
return rowKeysWithShardEnd;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 47b630f..cf9a51c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.Collections;
import java.util.Iterator;
+import com.google.common.base.Optional;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
@@ -78,11 +78,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId,
- final DirectedEdgeMeta directedEdgeMeta ) {
+ public Iterator<ShardEntryGroup> getShards(final ApplicationScope scope,
+ final DirectedEdgeMeta directedEdgeMeta) {
ValidationUtils.validateApplicationScope( scope );
- Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
Iterator<Shard> existingShards;
@@ -93,7 +92,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
else {
- existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+ existingShards = edgeShardSerialization.getShardMetaData( scope, Optional.absent(), directedEdgeMeta );
/**
* We didn't get anything out of cassandra, so we need to create the minimum shard
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 0a259f0..8e3be4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -21,42 +21,26 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.inject.Singleton;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
-import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
@@ -164,7 +148,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
} else {
- entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ));
+ entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta ));
}
@@ -330,7 +314,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
final Iterator<ShardEntryGroup> edges =
- nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+ nodeShardAllocation.getShards( key.scope, key.directedEdgeMeta );
final CacheEntry cacheEntry = new CacheEntry( edges );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 0853adb..8db491f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -346,7 +346,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
- compactedShard.setShardEnd(targetShard.getShardEnd());
+ compactedShard.setShardEnd(Optional.absent());
if(logger.isTraceEnabled()) {
logger.trace("Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 2fc0d50..7d91eda 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -191,21 +191,35 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
return DeleteResult.NO_OP;
}
+ if(shard.isDeleted()){
+ if(logger.isTraceEnabled()){
+ logger.trace("Shard {} already deleted. Short circuiting.", shard);
+ }
+ return DeleteResult.NO_OP;
+ }
+
+ shard.setDeleted(true);
+
+ final MutationBatch setShardDeletedFlagMutation =
+ edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta );
+
+ /* Previously the below was used for actually deleting the shard vs.a marking strategy with read filtering
- final MutationBatch shardRemovalMutation =
- edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+ final MutationBatch shardRemovalMutation =
+ edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );
+ */
if ( rollup == null ) {
- rollup = shardRemovalMutation;
+ rollup = setShardDeletedFlagMutation;
}
else {
- rollup.mergeShallow( shardRemovalMutation );
+ rollup.mergeShallow( setShardDeletedFlagMutation );
}
result = DeleteResult.DELETED;
- logger.info( "Removing shard {} in group {}", shard, shardEntryGroup );
+ logger.info( "{} - Marking shard {} as deleted in group {}", Thread.currentThread().getName()shard, shardEntryGroup );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 65a6f40..55eb172 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -43,15 +43,7 @@ import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
@@ -88,12 +80,15 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected final GraphFig graphFig;
protected final EdgeShardStrategy writeEdgeShardStrategy;
protected final TimeService timeService;
+ protected final EdgeShardSerialization edgeShardSerialization;
+
@Inject
public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
- final TimeService timeService ) {
+ final TimeService timeService,
+ final EdgeShardSerialization edgeShardSerialization ) {
checkNotNull( "keyspace required", keyspace );
@@ -101,6 +96,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
checkNotNull( "consistencyFig required", graphFig );
checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
checkNotNull( "timeService required", timeService );
+ checkNotNull( "edgeShardSerialization required", edgeShardSerialization );
+
this.keyspace = keyspace;
@@ -108,6 +105,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.graphFig = graphFig;
this.writeEdgeShardStrategy = writeEdgeShardStrategy;
this.timeService = timeService;
+ this.edgeShardSerialization = edgeShardSerialization;
}
@@ -119,7 +117,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
GraphValidation.validateEdge( markedEdge );
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- return new SourceWriteOp( columnFamilies, markedEdge ) {
+ return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -144,7 +142,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -168,7 +166,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- return new TargetWriteOp( columnFamilies, markedEdge ) {
+ return new TargetWriteOp( columnFamilies, markedEdge, targetEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -194,7 +192,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -219,7 +217,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- return new EdgeVersions( columnFamilies, markedEdge ) {
+ return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -238,7 +236,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final MarkedEdge markedEdge, final Collection<Shard> shards,
final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
- return new SourceWriteOp( columnFamilies, markedEdge ) {
+ return new SourceWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -258,7 +256,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Collection<Shard> shards,
final DirectedEdgeMeta directedEdgeMeta,
final UUID timestamp ) {
- return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -279,7 +277,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final MarkedEdge markedEdge, final Collection<Shard> shards,
final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
- return new TargetWriteOp( columnFamilies, markedEdge ) {
+ return new TargetWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -300,7 +298,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final DirectedEdgeMeta directedEdgeMeta,
final UUID timestamp ) {
- return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -320,7 +318,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final MarkedEdge markedEdge, final Collection<Shard> shards,
final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
- return new EdgeVersions( columnFamilies, markedEdge ) {
+ return new EdgeVersions( columnFamilies, markedEdge, directedEdgeMeta ) {
@Override
void writeEdge( final MutationBatch batch,
@@ -696,6 +694,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
*/
protected abstract boolean isDeleted();
+ /**
+ * Get the directed edge meta for the op
+ */
+ protected abstract DirectedEdgeMeta getDirectedEdgeMeta();
+
/**
* Write the edge with the given data
@@ -725,6 +728,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
for ( Shard shard : shards ) {
final R rowKey = getRowKey( shard );
writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+
+ // if an edge is being written to this shard, un-delete it in case it was previously marked
+ // don't un-delete if the edge write is to actually remove an edge
+ // Usergrid allows entities to be written with a UUID generated from the past (time)
+ if(shard.isDeleted() && !isDeleted) {
+ logger.info("Shard is deleted. Un-deleting as new data is being written to the shard - {}", shard);
+ shard.setDeleted(false);
+ batch.mergeShallow(edgeShardSerialization.writeShardMeta(scope, shard, getDirectedEdgeMeta()));
+ }
+
}
@@ -745,12 +758,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
private final String type;
private final boolean isDeleted;
private final DirectedEdge directedEdge;
+ private final DirectedEdgeMeta directedEdgeMeta;
/**
* Write the source write operation
*/
- private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+ final DirectedEdgeMeta directedEdgeMeta ) {
this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
this.sourceNodeId = markedEdge.getSourceNode();
@@ -759,6 +774,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.isDeleted = markedEdge.isDeleted();
this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -784,6 +800,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected boolean isDeleted() {
return isDeleted;
}
+
+ @Override
+ protected DirectedEdgeMeta getDirectedEdgeMeta() {
+ return directedEdgeMeta;
+ }
}
@@ -798,12 +819,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
private Id targetId;
private final boolean isDeleted;
private final DirectedEdge directedEdge;
+ private final DirectedEdgeMeta directedEdgeMeta;
/**
* Write the source write operation
*/
- private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+ final DirectedEdgeMeta directedEdgeMeta ) {
this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
this.sourceNodeId = markedEdge.getSourceNode();
@@ -813,6 +836,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.isDeleted = markedEdge.isDeleted();
this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -838,6 +862,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected boolean isDeleted() {
return isDeleted;
}
+
+ @Override
+ protected DirectedEdgeMeta getDirectedEdgeMeta() {
+ return directedEdgeMeta;
+ }
}
@@ -853,12 +882,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
private final String type;
private final boolean isDeleted;
private final DirectedEdge directedEdge;
+ private final DirectedEdgeMeta directedEdgeMeta;
/**
* Write the source write operation
*/
- private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+ final DirectedEdgeMeta directedEdgeMeta ) {
this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
this.targetNode = markedEdge.getTargetNode();
@@ -867,6 +898,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.isDeleted = markedEdge.isDeleted();
this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -892,6 +925,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected boolean isDeleted() {
return isDeleted;
}
+
+ @Override
+ protected DirectedEdgeMeta getDirectedEdgeMeta() {
+ return directedEdgeMeta;
+ }
}
@@ -909,12 +947,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final boolean isDeleted;
final DirectedEdge directedEdge;
+ final DirectedEdgeMeta directedEdgeMeta;
/**
* Write the source write operation
*/
- private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+ final DirectedEdgeMeta directedEdgeMeta ) {
this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
this.targetNode = markedEdge.getTargetNode();
@@ -924,6 +964,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.isDeleted = markedEdge.isDeleted();
this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -949,6 +991,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected boolean isDeleted() {
return isDeleted;
}
+
+ @Override
+ protected DirectedEdgeMeta getDirectedEdgeMeta() {
+ return directedEdgeMeta;
+ }
}
@@ -967,11 +1014,14 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final boolean isDeleted;
final Long edgeVersion;
+ final DirectedEdgeMeta directedEdgeMeta;
+
/**
* Write the source write operation
*/
- private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge,
+ final DirectedEdgeMeta directedEdgeMeta ) {
this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
this.targetNode = markedEdge.getTargetNode();
@@ -981,6 +1031,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
this.isDeleted = markedEdge.isDeleted();
this.edgeVersion = markedEdge.getTimestamp();
+ this.directedEdgeMeta = directedEdgeMeta;
}
@@ -1006,6 +1057,11 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected boolean isDeleted() {
return isDeleted;
}
+
+ @Override
+ protected DirectedEdgeMeta getDirectedEdgeMeta() {
+ return directedEdgeMeta;
+ }
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
index 8ab6288..b47f7cc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
@@ -58,6 +58,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
}
composite.addComponent( shard.isCompacted(), BOOLEAN_SERIALIZER);
+ composite.addComponent( shard.isDeleted(), BOOLEAN_SERIALIZER);
return composite.serialize();
}
@@ -67,7 +68,7 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
public Shard fromByteBuffer( final ByteBuffer byteBuffer ) {
DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
- Preconditions.checkArgument( composite.size() == 5, "Composite should 5 elements" );
+ Preconditions.checkArgument( composite.size() == 5 || composite.size() == 6, "Composite should 5 elements" );
final byte version = composite.get(0, BYTE_SERIALIZER);
@@ -76,9 +77,15 @@ public class ShardSerializer extends AbstractSerializer<Shard> {
final DirectedEdge shardEnd = composite.get( 3, EDGE_SERIALIZER);
final boolean isCompacted = composite.get( 4, BOOLEAN_SERIALIZER);
-
final Shard shard = new Shard(shardIndex, shardCreated, isCompacted);
shard.setShardEnd(Optional.fromNullable(shardEnd));
+
+ if( composite.size() == 6){
+ final boolean isDeleted = composite.get( 5, BOOLEAN_SERIALIZER);
+ shard.setDeleted(isDeleted);
+ }
+
+
return shard;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 0d6a27e..6e6abd8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -22,12 +22,7 @@ package org.apache.usergrid.persistence.graph;
import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -136,7 +131,7 @@ public class GraphManagerShardConsistencyIT {
// get the system property of the UUID to use. If one is not set, use the defualt
String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
- scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
+ scope = new ApplicationScopeImpl( IdGenerator.createId(UUIDGenerator.newTimeUUID(), "test" ) );
reporter =
@@ -149,19 +144,24 @@ public class GraphManagerShardConsistencyIT {
@After
- public void tearDown() {
+ public void tearDown() throws Exception {
reporter.stop();
reporter.report();
- if(writeExecutor != null){
+ if(writeExecutor != null && !writeExecutor.isShutdown()){
writeExecutor.shutdownNow();
+ writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
}
- if(deleteExecutor != null){
+ if(deleteExecutor != null && !deleteExecutor.isShutdown()){
deleteExecutor.shutdownNow();
+ deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+
}
+ Thread.sleep(3000);
+
}
@@ -251,7 +251,7 @@ public class GraphManagerShardConsistencyIT {
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future =
- writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
futures.add( future );
}
@@ -299,7 +299,9 @@ public class GraphManagerShardConsistencyIT {
@Override
public void onSuccess( @Nullable final Long result ) {
logger.info( "Successfully ran the read, re-running" );
- writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ if( !writeExecutor.isShutdown() ) {
+ writeExecutor.submit(new ReadWorker(gmf, generator, writeCount, readMeter));
+ }
}
@@ -383,11 +385,12 @@ public class GraphManagerShardConsistencyIT {
}
- //now continue reading everything for 30 seconds
+ //now continue reading everything for 30 seconds to make sure things are OK
Thread.sleep(30000);
- writeExecutor.shutdownNow();
+ //writeExecutor.shutdownNow();
+ //writeExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
}
@@ -494,7 +497,7 @@ public class GraphManagerShardConsistencyIT {
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future =
- deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
futures.add( future );
}
@@ -563,14 +566,18 @@ public class GraphManagerShardConsistencyIT {
.filter(markedEdge -> {
// if it's already been marked let's filter, move on as async deleteEdge()
- logger.trace("edge already marked, may indicated a problem with gm.deleteEdge(): {}", markedEdge);
- return !markedEdge.isDeleted();
+ if(markedEdge.isDeleted()) {
+ logger.info("Edge already marked, but gm.deleteEdge() is async, Edge: {}", markedEdge);
+ }
+ //return !markedEdge.isDeleted();
+ return true;
})
.flatMap( edge -> manager.markEdge( edge ))
.flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
totalDeleted += count;
- Thread.sleep( 500 );
+ logger.info("Sleeping 250ms second because deleteEdge() is async.");
+ Thread.sleep( 250 );
}
@@ -591,7 +598,9 @@ public class GraphManagerShardConsistencyIT {
@Override
public void onSuccess( @Nullable final Long result ) {
logger.info( "Successfully ran the read, re-running" );
- deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
+ if( !deleteExecutor.isShutdown() ) {
+ deleteExecutor.submit(new ReadWorker(gmf, generator, 0, readMeter));
+ }
}
@@ -641,7 +650,16 @@ public class GraphManagerShardConsistencyIT {
logger.info( "Shard size for group is {}", group.getReadShards() );
- shardCount += group.getReadShards().size();
+ Collection<Shard> shards = group.getReadShards();
+
+ for(Shard shard: shards){
+
+ if(!shard.isDeleted()){
+ shardCount++;
+ }
+ }
+
+ //shardCount += group.getReadShards().size();
}
@@ -657,12 +675,337 @@ public class GraphManagerShardConsistencyIT {
}
//now that we have finished deleting and shards are removed, shutdown
- deleteExecutor.shutdownNow();
+ //deleteExecutor.shutdownNow();
+ //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
- Thread.sleep( 3000 ); // sleep before the next test
}
+
+ @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup is async
+ @Category(StressTest.class)
+ public void writeThousandsDeleteWriteAgain()
+ throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
+
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 2000 );
+
+
+ final Id sourceId = IdGenerator.createId( "sourceDeleteRewrite" );
+ final String deleteEdgeType = "testDeleteRewrite";
+
+ final List<Edge> edges = new ArrayList<>();
+
+ final EdgeGenerator generator = new EdgeGenerator() {
+
+
+ @Override
+ public Edge newEdge() {
+ Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDeleteRewrite" ) );
+
+ edges.add(edge);
+
+ return edge;
+ }
+
+
+ @Override
+ public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
+ return manager.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ Optional.<Edge>absent(), false ) );
+ }
+ };
+
+
+ final int numInjectors = 3;
+
+ /**
+ * create injectors. This way all the caches are independent of one another. This is the same as
+ * multiple nodes if there are multiple injectors
+ */
+ final List<Injector> injectors = createInjectors( numInjectors );
+
+
+ final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+ final long shardSize = graphFig.getShardSize();
+
+
+ //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
+ // power for writes
+ final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
+
+ final int numWorkersPerInjector = numProcessors / numInjectors;
+
+ final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
+
+
+ final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
+
+ createDeleteExecutor( numWorkersPerInjector );
+
+
+ final AtomicLong writeCounter = new AtomicLong();
+
+
+ //min stop time the min delta + 1 cache cycle timeout
+ final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+
+
+ logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
+ numInjectors );
+
+
+ final List<Future<Boolean>> futures = new ArrayList<>();
+
+
+ for ( Injector injector : injectors ) {
+ final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+
+ for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+ Future<Boolean> future =
+ deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter) );
+
+ futures.add( future );
+ }
+ }
+
+ /**
+ * Wait for all writes to complete
+ */
+ for ( Future<Boolean> future : futures ) {
+ future.get();
+ }
+
+ // now get all our shards
+ final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType );
+
+ final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
+
+
+ final long writeCount = writeCounter.get();
+ final Meter readMeter = registry.meter( "readThroughput-deleteTestRewrite" );
+
+
+ //check our shard state
+
+
+ final Iterator<ShardEntryGroup> existingShardGroups =
+ cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+ int shardCount = 0;
+
+ while ( existingShardGroups.hasNext() ) {
+ final ShardEntryGroup group = existingShardGroups.next();
+
+ shardCount++;
+
+ logger.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+ }
+
+
+ logger.info( "Found {} shard groups", shardCount );
+
+
+ //now mark and delete all the edges
+
+
+ final GraphManager manager = gmf.createEdgeManager( scope );
+
+ //sleep occasionally to stop pushing cassandra over
+
+ long count = Long.MAX_VALUE;
+
+ Thread.sleep(3000); // let's make sure everything is written
+
+ long totalDeleted = 0;
+
+ // now do the deletes
+ while(count != 0) {
+
+ logger.info("total deleted: {}", totalDeleted);
+ if(count != Long.MAX_VALUE) { // count starts with Long.MAX
+ logger.info("deleted {} entities, continuing until count is 0", count);
+ }
+ //take 1000 then sleep
+ count = generator.doSearch( manager ).take( 1000 )
+ .filter(markedEdge -> {
+
+ // if it's already been marked let's filter, move on as async deleteEdge()
+ if(markedEdge.isDeleted()){
+ logger.info("Edge already marked, gm.deleteEdge() is Async, Edge: {}", markedEdge);
+ }
+ //return !markedEdge.isDeleted();
+ return true;
+ })
+ .flatMap( edge -> manager.markEdge( edge ))
+ .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+
+ totalDeleted += count;
+ logger.info("Sleeping 250ms second because deleteEdge() is an async process");
+ Thread.sleep( 250 );
+ }
+
+
+ logger.info("Sleeping before starting the read");
+ Thread.sleep(6000); // let the edge readers start
+
+ // loop with a reader until our shards are gone
+
+
+ GraphManager gm = gmf.createEdgeManager( scope );
+
+
+
+
+ //do a read to eventually trigger our group compaction. Take 2 pages of columns
+ long returnedEdgeCount = generator.doSearch( gm )
+
+ .doOnNext( edge -> readMeter.mark() )
+
+ .countLong().toBlocking().last();
+
+ logger.info( "Completed reading {} edges", returnedEdgeCount );
+
+ if ( 0 != returnedEdgeCount ) {
+
+ //logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", 0,
+ // returnedEdgeCount );
+
+ fail("Unexpected edge count returned!!! Expected 0 but was "+ returnedEdgeCount );
+ }
+
+ logger.info("Got expected read count of 0");
+
+
+ //we have to get it from the cache, because this will trigger the compaction process
+ final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+ ShardEntryGroup group = null;
+
+ while ( groups.hasNext() ) {
+
+ group = groups.next();
+ logger.info( "Shard size for group is {}", group.getReadShards() );
+ Collection<Shard> shards = group.getReadShards();
+
+ for(Shard shard: shards){
+ if(!shard.isDeleted()){
+ shardCount++;
+ }
+ }
+ }
+
+
+ // we're done, 1 shard remains, we have a group, and it's our default shard
+ if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) {
+ logger.info( "All compactions complete," );
+
+ }
+
+
+ Thread.sleep( 2000 );
+
+
+ logger.info( "Re-Writing same edges", workerWriteLimit, numWorkersPerInjector,
+ numInjectors );
+
+
+ int edgeCount = 0;
+ if ( edges.size() > 0) {
+
+ for (Edge edge : edges) {
+
+ Edge returned = gm.writeEdge(edge).toBlocking().last();
+
+ assertNotNull("Returned has a version", returned.getTimestamp());
+
+ edgeCount++;
+
+ writeMeter.mark();
+
+ writeCounter.incrementAndGet();
+
+
+ if (edgeCount % 100 == 0) {
+ logger.info("wrote: " + edgeCount);
+ }
+
+
+ }
+ logger.info("Re-wrote total: {}", edgeCount);
+ }
+
+ int retries = 2;
+ while ( retries > 0 ) {
+
+
+ //do a read to eventually trigger our group compaction. Take 2 pages of columns
+ returnedEdgeCount = generator.doSearch( gm )
+
+ .doOnNext( edge -> readMeter.mark() )
+
+ .countLong().toBlocking().last();
+
+ logger.info( "Completed reading {} edges", returnedEdgeCount );
+
+ if ( edgeCount != returnedEdgeCount ) {
+ logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", edgeCount,
+ returnedEdgeCount );
+ }
+
+ retries--;
+
+ if( returnedEdgeCount == edgeCount ){
+ logger.info("Got expected read count of {}", edgeCount);
+ break;
+
+ }
+ }
+
+ //we have to get it from the cache, because this will trigger the compaction process
+ final Iterator<ShardEntryGroup> finalgroups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+ ShardEntryGroup finalgroup = null;
+
+ // reset the shard count
+ shardCount = 0;
+ while ( finalgroups.hasNext() ) {
+
+ finalgroup = finalgroups.next();
+
+ logger.info( "Shard size for group is {}", finalgroup.getReadShards() );
+
+ Collection<Shard> shards = finalgroup.getReadShards();
+
+ for(Shard shard: shards){
+
+ if(!shard.isDeleted()){
+ shardCount++;
+ }
+ }
+
+ }
+
+
+ // we're done, 1 shard remains, we have a group, and it's our default shard
+ if ( shardCount > 1 ) {
+ logger.info( "All done. Final shard count: {}", shardCount );
+ assertEquals(1,1);
+
+ }else{
+ fail("There should be more than 1 shard");
+ }
+
+ //deleteExecutor.shutdownNow();
+ //deleteExecutor.awaitTermination(5000, TimeUnit.MILLISECONDS);
+
+ }
+
+
+
private class Worker implements Callable<Boolean> {
private final GraphManagerFactory factory;
private final EdgeGenerator generator;
@@ -671,8 +1014,8 @@ public class GraphManagerShardConsistencyIT {
private final AtomicLong writeCounter;
- private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
- final long minExecutionTime, final AtomicLong writeCounter ) {
+ private Worker(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
+ final long minExecutionTime, final AtomicLong writeCounter) {
this.factory = factory;
this.generator = generator;
this.writeLimit = writeLimit;
@@ -699,7 +1042,6 @@ public class GraphManagerShardConsistencyIT {
assertNotNull( "Returned has a version", returned.getTimestamp() );
- writeMeter.mark();
writeCounter.incrementAndGet();
@@ -738,7 +1080,7 @@ public class GraphManagerShardConsistencyIT {
GraphManager gm = factory.createEdgeManager( scope );
- while ( true ) {
+ while ( !Thread.currentThread().isInterrupted() ) {
//do a read to eventually trigger our group compaction. Take 2 pages of columns
@@ -757,6 +1099,8 @@ public class GraphManagerShardConsistencyIT {
assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount );
}
+
+ return 0L;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 00406c0..8a9dcfe 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -564,7 +564,7 @@ public class NodeShardAllocationTest {
final Iterator<ShardEntryGroup> result =
- approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+ approximation.getShards( scope, directedEdgeMeta );
assertTrue( "Shards present", result.hasNext() );
@@ -663,7 +663,7 @@ public class NodeShardAllocationTest {
final Iterator<ShardEntryGroup> result =
- approximation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+ approximation.getShards( scope, directedEdgeMeta );
ShardEntryGroup shardEntryGroup = result.next();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/42c5c4b8/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 3b717a4..ecf0091 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -102,7 +102,7 @@ public class NodeShardCacheTest {
/**
* Simulate returning no shards at all.
*/
- when( allocation.getShards( same( scope ), same( max ), same( directedEdgeMeta ) ) )
+ when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) )
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
@@ -191,7 +191,7 @@ public class NodeShardCacheTest {
/**
* Simulate returning no shards at all.
*/
- when( allocation.getShards( same( scope ), any( Optional.class ), same( directedEdgeMeta ) ) )
+ when( allocation.getShards( same( scope ), same( directedEdgeMeta ) ) )
//use "thenAnswer" so we always return the value, even if it's invoked more than 1 time.
.thenAnswer( new Answer<Iterator<ShardEntryGroup>>() {
[5/5] usergrid git commit: Merge branch 'hotfix-20160819' of
https://git-wip-us.apache.org/repos/asf/usergrid into hotfix-20160819
Posted by mr...@apache.org.
Merge branch 'hotfix-20160819' of https://git-wip-us.apache.org/repos/asf/usergrid into hotfix-20160819
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/44404248
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/44404248
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/44404248
Branch: refs/heads/hotfix-20160819
Commit: 44404248c377aa2986d5db62a08f9f604d7f8f8c
Parents: 34b1697 cda2879
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Aug 28 11:20:44 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Aug 28 11:20:44 2016 -0700
----------------------------------------------------------------------
.../usergrid/tools/CollectionIterator.java | 107 ++++++++++++++-----
1 file changed, 81 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
[4/5] usergrid git commit: Add ShardManager tool.
Posted by mr...@apache.org.
Add ShardManager tool.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/34b1697c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/34b1697c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/34b1697c
Branch: refs/heads/hotfix-20160819
Commit: 34b1697c507cc2d67cb2f9c6903b59e2080327c2
Parents: bf34a28
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Aug 28 11:07:31 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Aug 28 11:07:31 2016 -0700
----------------------------------------------------------------------
.../org/apache/usergrid/tools/ShardManager.java | 214 +++++++++++++++++++
1 file changed, 214 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/34b1697c/stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java
----------------------------------------------------------------------
diff --git a/stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java b/stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java
new file mode 100644
index 0000000..ecf447c
--- /dev/null
+++ b/stack/tools/src/main/java/org/apache/usergrid/tools/ShardManager.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.tools;
+
+
+import java.util.*;
+
+import com.google.common.base.Optional;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.*;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.model.entity.*;
+
+import org.apache.usergrid.utils.InflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import sun.security.provider.SHA;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isEmpty;
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
+
+public class ShardManager extends ToolBase {
+
+ private static final Logger logger = LoggerFactory.getLogger(ShardManager.class);
+
+ private static final String APPLICATION_ARG = "app";
+
+ private static final String ENTITY_TYPE_ARG = "entityType";
+
+ private static final String REPAIR_TASK = "repairTask";
+
+ private static final String SHARD_TYPE_ARG = "shardType";
+
+
+ @Override
+ @SuppressWarnings("static-access")
+ public Options createOptions() {
+
+
+ Options options = super.createOptions();
+
+
+ Option appOption = OptionBuilder.withArgName(APPLICATION_ARG).hasArg().isRequired(true)
+ .withDescription("application id").create(APPLICATION_ARG);
+
+ options.addOption(appOption);
+
+ Option collectionOption =
+ OptionBuilder.withArgName(ENTITY_TYPE_ARG).hasArg().isRequired(true).withDescription("singular collection name")
+ .create(ENTITY_TYPE_ARG);
+
+ options.addOption(collectionOption);
+
+ Option repairOption =
+ OptionBuilder.withArgName(REPAIR_TASK).hasArg().isRequired(false).withDescription("repair task to execute")
+ .create(REPAIR_TASK);
+
+ options.addOption(repairOption);
+
+ Option shardTypeOption =
+ OptionBuilder.withArgName(SHARD_TYPE_ARG).hasArg().isRequired(true).withDescription("either collection or connection")
+ .create(SHARD_TYPE_ARG);
+
+ options.addOption(shardTypeOption);
+
+ return options;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.usergrid.tools.ToolBase#runTool(org.apache.commons.cli.CommandLine)
+ */
+ @Override
+ public void runTool(CommandLine line) throws Exception {
+
+ startSpring();
+
+ if (line.getOptionValue(APPLICATION_ARG).isEmpty()) {
+ throw new RuntimeException("Application ID not provided.");
+ }
+ final UUID app = UUID.fromString(line.getOptionValue(APPLICATION_ARG));
+
+ String entityType = line.getOptionValue(ENTITY_TYPE_ARG);
+
+ String repairTask = line.getOptionValue(REPAIR_TASK);
+
+ String shardType = line.getOptionValue(SHARD_TYPE_ARG);
+
+ boolean repair = false;
+ if( isNotEmpty(repairTask) && (
+ repairTask.equalsIgnoreCase("removeAllShardEnds") || repairTask.equalsIgnoreCase("removeLastShardEnd") ||
+ repairTask.equalsIgnoreCase("resetAllCompactionStatus"))) {
+
+ repair = true;
+ }
+
+
+ logger.info("Starting Tool: ShardManager");
+ logger.info("Using Cassandra consistency level: {}", System.getProperty("usergrid.read.cl", "CL_LOCAL_QUORUM"));
+
+ EntityRef headEntity = new SimpleEntityRef("application", app);
+
+ ApplicationScope applicationScope = new ApplicationScopeImpl(new SimpleId(app, "application"));
+ EdgeShardSerialization edgeShardSerialization = injector.getInstance(EdgeShardSerialization.class);
+
+ String collectionName = InflectionUtils.pluralize(entityType);
+
+ // default to assume collection
+ String metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
+
+ if( isNotEmpty(shardType) ){
+
+ if( shardType.equalsIgnoreCase("collection")){
+ metaType = CpNamingUtils.getEdgeTypeFromCollectionName(collectionName);
+
+ }else if( shardType.equalsIgnoreCase("connection")){
+ metaType = CpNamingUtils.getEdgeTypeFromConnectionType(entityType);
+ }
+ }
+
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode(headEntity.asId(), metaType);
+
+ Iterator<Shard> shards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);
+
+ boolean firstShard = true;
+ while (shards.hasNext()) {
+ Shard shard = shards.next();
+
+ logger.info("Seeking over shard: {}", shard);
+
+ if(repair) {
+
+ logger.info("Repair enabled with task: {}", repairTask);
+
+ if( repairTask.equalsIgnoreCase("removeLastShardEnd") && firstShard){
+
+ logger.info("Removing shard end from shard: {}", shard);
+
+ shard.setShardEnd(Optional.absent());
+ edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
+
+ }else if ( repairTask.equalsIgnoreCase("removeAllShardEnds")){
+
+ logger.info("Removing shard end from shard: {}", shard);
+
+ shard.setShardEnd(Optional.absent());
+ edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
+
+ } else if ( repairTask.equalsIgnoreCase("resetAllCompactionStatus")){
+
+ logger.info("Setting compacted=false for shard: {}", shard);
+
+ shard.setCompacted(false);
+ edgeShardSerialization.writeShardMeta(applicationScope, shard, directedEdgeMeta).execute();
+
+ }
+
+ firstShard = false;
+
+
+ }
+
+ }
+
+
+ if(repair) {
+ // do a final walk-through so changes can be verified
+ Iterator<Shard> finalshards = edgeShardSerialization.getShardMetaData(applicationScope, Optional.absent(), directedEdgeMeta);
+
+ while (finalshards.hasNext()) {
+
+ Shard shard = finalshards.next();
+
+ logger.info("Shard after repair: {}", shard);
+
+ }
+ }
+
+
+ logger.info("ShardManager run complete");
+
+
+ }
+}
[3/5] usergrid git commit: Fix temp logging issue.
Posted by mr...@apache.org.
Fix temp logging issue.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bf34a288
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bf34a288
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bf34a288
Branch: refs/heads/hotfix-20160819
Commit: bf34a288a9c316adeed73c3786c985ba6782335d
Parents: 42c5c4b
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Aug 27 23:30:33 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Aug 27 23:30:33 2016 -0700
----------------------------------------------------------------------
.../serialization/impl/shard/impl/ShardGroupDeletionImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bf34a288/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 7d91eda..6259e02 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -219,7 +219,7 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
result = DeleteResult.DELETED;
- logger.info( "{} - Marking shard {} as deleted in group {}", Thread.currentThread().getName()shard, shardEntryGroup );
+ logger.info( "Marking shard {} as deleted in group {}", shard, shardEntryGroup );
}