You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/03/23 18:34:29 UTC
[01/20] usergrid git commit: Initial work to iterate over the shards
with more context so we don't fetch all rows (shards) at the same time
always.
Repository: usergrid
Updated Branches:
refs/heads/release-2.1.1 843578310 -> e64fa3503
Initial work to iterate over the shards with more context so we don't fetch all rows (shards) at the same time always.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8c725f19
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8c725f19
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8c725f19
Branch: refs/heads/release-2.1.1
Commit: 8c725f19aa30a1cca5b71017c8a43586b6e4d544
Parents: 8435783
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 14 16:07:19 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 14 16:07:19 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 12 +-
.../usergrid/corepersistence/index/RxTest.java | 129 ++++++++++
.../core/astyanax/MultiRowColumnIterator.java | 253 +++++++++++++++----
.../persistence/core/shard/SmartShard.java | 52 ++++
.../graph/serialization/impl/shard/Shard.java | 33 ++-
.../impl/shard/impl/EdgeSearcher.java | 86 ++++++-
.../shard/impl/NodeShardAllocationImpl.java | 4 +-
.../impl/shard/impl/NodeShardCacheImpl.java | 19 +-
.../shard/impl/ShardGroupCompactionImpl.java | 19 ++
.../impl/ShardedEdgeSerializationImpl.java | 9 +
.../impl/shard/impl/ShardsColumnIterator.java | 30 ++-
.../graph/GraphManagerShardConsistencyIT.java | 17 +-
12 files changed, 583 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 4d78340..7e368c7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
import org.apache.usergrid.persistence.index.impl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -519,6 +520,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
}
+ // don't let this continue if there's nothing to index
+ if (indexOperationMessage == null || indexOperationMessage.isEmpty()){
+ throw new RuntimeException(
+ "IndexOperationMessage cannot be null or empty after retrieving from map persistence");
+ }
+
+
// always do a check to ensure the indexes are initialized for the index requests
initializeEntityIndexes(indexOperationMessage);
@@ -739,9 +747,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
*/
private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
- // if nothing came back then return null
+ // if nothing came back then return empty list
if(indexEventResults==null){
- return null;
+ return new ArrayList<>(0);
}
IndexOperationMessage combined = new IndexOperationMessage();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index 6bb8947..f44c028 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -20,14 +20,21 @@
package org.apache.usergrid.corepersistence.index;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericData;
import org.apache.usergrid.ExperimentalTest;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.observables.ConnectableObservable;
@@ -42,6 +49,9 @@ import static org.junit.Assert.assertTrue;
*/
public class RxTest {
+ private static final Logger logger = LoggerFactory.getLogger(RxTest.class);
+
+
@Test
@Category(ExperimentalTest.class )
public void testPublish() throws InterruptedException {
@@ -107,5 +117,124 @@ public class RxTest {
assertEquals(0, result);
}
+ @Test
+ public void testStreamWithinObservable(){
+
+ List<Integer> numbers = new ArrayList<Integer>(5){{
+ add(1);
+ add(2);
+ add(3);
+ add(4);
+ add(5);
+ }};
+
+ Observable.just(numbers).map( integers -> {
+
+ try{
+
+ logger.info("Starting size: {}", String.valueOf(numbers.size()));
+
+ List<StreamResult> results = callStream(integers);
+
+ logger.info("In process size: {}", String.valueOf(results.size()));
+
+ List<Integer> checked = checkResults(results);
+
+ logger.info("Resulting Size: {}", String.valueOf(checked.size()));
+
+ return results;
+
+ }
+ catch(Exception e){
+
+ logger.info("Caught exception in observable: {}", e.getMessage());
+ return null;
+
+
+ }
+
+ }).subscribe();
+
+
+
+
+
+
+
+ }
+
+ private List<StreamResult> callStream (final List<Integer> input){
+
+ Stream<StreamResult> results = input.stream().map(integer -> {
+
+ try{
+
+
+
+ if(integer.equals(1) || integer.equals(2)){
+ throwSomeException("Ah integer not what we want!");
+ }
+
+ return new StreamResult(integer);
+
+ }
+ catch(Exception e){
+
+ logger.info("Caught exception in stream: '{}'", e.getMessage());
+ return new StreamResult(0);
+
+ }
+
+ });
+
+ return results.collect(Collectors.toList());
+
+ }
+
+
+ private List<Integer> checkResults(final List<StreamResult> streamResults){
+
+ List<Integer> combined = new ArrayList<>();
+ List<Integer> integers = streamResults.stream().filter( streamResult -> streamResult.getNumber() > 0)
+ .map(streamResult -> {
+
+ combined.add(streamResult.getNumber());
+
+ return streamResult.getNumber();
+ })
+ .collect(Collectors.toList());
+
+ Observable.from(combined).map( s -> {
+ logger.info("Doing work in another observable with Integer: {}", s);
+ return s;
+ }).toBlocking().last();
+
+
+ return integers;
+
+ }
+
+
+ public class StreamResult {
+
+ private int number;
+
+ public StreamResult( final int number){
+
+ this.number = number;
+ }
+
+ public int getNumber(){
+ return number;
+ }
+
+
+ }
+
+ public void throwSomeException(String message){
+
+ throw new RuntimeException(message);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index a120fda..9971fba 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -1,35 +1,29 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.usergrid.persistence.core.astyanax;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
+import org.apache.avro.generic.GenericData;
+import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +73,14 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
private Iterator<T> currentColumnIterator;
+ private Iterator<SmartShard> currentShardIterator;
+
+ private List<SmartShard> rowKeysWithShardEnd;
+
+ private SmartShard currentShard;
+
+ private List<T> resultsTracking;
+
/**
* Remove after finding bug
@@ -110,6 +112,28 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.moreToReturn = true;
+ this.resultsTracking = new ArrayList<>();
+
+ // seenResults = new HashMap<>( pageSize * 10 );
+ }
+
+ public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
+ final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+ final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+ final Collection<R> rowKeys, final int pageSize,
+ final List<SmartShard> rowKeysWithShardEnd) {
+ this.cf = cf;
+ this.pageSize = pageSize;
+ this.columnParser = columnParser;
+ this.columnSearch = columnSearch;
+ this.comparator = comparator;
+ this.rowKeys = rowKeys;
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.moreToReturn = true;
+ this.rowKeysWithShardEnd = rowKeysWithShardEnd;
+ this.resultsTracking = new ArrayList<>();
+
// seenResults = new HashMap<>( pageSize * 10 );
}
@@ -117,12 +141,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
@Override
public boolean hasNext() {
+ //logger.info(Thread.currentThread().getName()+" - calling hasNext()");
+ if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){
+ if(currentShardIterator.hasNext()) {
+ logger.info(Thread.currentThread().getName()+" - advancing shard iterator");
+ //logger.info(Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+ logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+ //Collections.reverse(rowKeysWithShardEnd);
+ logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+
+ logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ currentShard = currentShardIterator.next();
+ logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ startColumn = null;
+
+ advance();
+ }
+ }
if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
- advance();
- }
+ if(currentColumnIterator != null) {
+ logger.info(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+ }
+ logger.info(Thread.currentThread().getName()+" - moreToReturn={}", moreToReturn);
+ logger.info(Thread.currentThread().getName()+" - going into advance()");
+ advance();
+ }
return currentColumnIterator.hasNext();
}
@@ -148,7 +194,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
public void advance() {
-
+ logger.info( "Advancing multi row column iterator" );
if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
/**
@@ -161,11 +207,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
+ //final int selectSize = pageSize;
+
final RangeBuilder rangeBuilder = new RangeBuilder();
- //set the range into the search
+
+ if(currentShardIterator == null){
+ currentShardIterator = rowKeysWithShardEnd.iterator();
+
+ }
+
+ if(currentShard == null){
+ Collections.reverse(rowKeysWithShardEnd); // ranges are ascending
+ logger.info(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+ currentShard = currentShardIterator.next();
+ logger.info(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
+ logger.info(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+
+ }
+
+
+
+
+
+ //set the range into the search
+ logger.info(Thread.currentThread().getName()+" - startColumn={}", startColumn);
if ( startColumn == null ) {
columnSearch.buildRange( rangeBuilder );
}
@@ -181,9 +249,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
/**
* Get our list of slices
*/
+ //logger.info("shard: {}, end: {}",currentShard.getRowKey().getKey(), currentShard.getShardEnd());
final RowSliceQuery<R, C> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
- .withColumnRange( rangeBuilder.build() );
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+ .withColumnRange( rangeBuilder.build() );
final Rows<R, C> result;
try {
@@ -194,6 +263,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
+
+// List<RowSliceQuery<R, C>> queries = new ArrayList<>();
+//
+// rowKeys.forEach( rowkey -> {
+//
+// queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
+// .withColumnRange( rangeBuilder.build() ));
+//
+// });
+//
+//
+// final List<Rows<R,C>> combinedResults = new ArrayList<>();
+//
+// queries.forEach(query ->{
+//
+// try {
+// combinedResults.add(query.execute().getResult());
+// }
+// catch ( ConnectionException e ) {
+// throw new RuntimeException( "Unable to connect to casandra", e );
+// }
+//
+// });
+
+
+
+
//now aggregate them together
//this is an optimization. It's faster to see if we only have values for one row,
@@ -201,14 +297,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//do a merge if only one row has data.
+
final List<T> mergedResults;
- if ( containsSingleRowOnly( result ) ) {
- mergedResults = singleRowResult( result );
- }
- else {
- mergedResults = mergeResults( result, selectSize );
- }
+ mergedResults = mergeResults( result, selectSize );
+
+// if ( containsSingleRowOnly( result ) ) {
+// mergedResults = singleRowResult( result );
+// }
+// else {
+// mergedResults = mergeResults( result, selectSize );
+// }
+
+
+
+// final List<T> mergedResults = new ArrayList<>();
+//
+// combinedResults.forEach(rows -> {
+//
+// if ( containsSingleRowOnly( rows ) ) {
+// mergedResults.addAll(singleRowResult( rows ));
+// }
+// else {
+// mergedResults.addAll(mergeResults( rows, selectSize ));
+// }
+//
+// });
+
+
@@ -223,8 +339,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final int size = mergedResults.size();
+
+
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size);
+
+ }
+
+ logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
moreToReturn = size == selectSize;
+// if(selectSize == 1001 && mergedResults.size() == 1000){
+// moreToReturn = true;
+// }
+
//we have a first column to to check
if( size > 0) {
@@ -232,6 +360,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//The search has either told us to skip the first element, or it matches our last, therefore we disregard it
if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
+ logger.info("removing an entry");
mergedResults.remove( 0 );
}
@@ -240,10 +369,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
if(moreToReturn && mergedResults.size() > 0){
startColumn = mergedResults.get( mergedResults.size() - 1 );
+
}
+ logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+
+
+// if(mergedResults.size() == 0 && currentShardIterator.hasNext()){
+// //currentShard = currentShardIterator.next();
+//
+// }
+
currentColumnIterator = mergedResults.iterator();
+ //logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+ logger.info(
+ Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
+ "moreToReturn={}, currentShardIterator.hasNext()={}",
+ currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
}
@@ -328,7 +472,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final T returnedValue = columnParser.parseColumn( column );
//Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
- int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator );
+ int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
/**
* DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
@@ -350,29 +494,37 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//we've already seen it, no-op
if(searchIndex > -1){
+ logger.info("skipping column as it was already retrieved before");
continue;
}
- final int insertIndex = (searchIndex+1)*-1;
+// final int insertIndex = (searchIndex+1)*-1;
+//
+// //it's at the end of the list, don't bother inserting just to remove it
+// if(insertIndex >= maxSize){
+// logger.info("skipping column as it was at the end of the list");
+// continue;
+// }
- //it's at the end of the list, don't bother inserting just to remove it
- if(insertIndex >= maxSize){
- continue;
- }
+ resultsTracking.add(returnedValue);
- if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+ //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
- mergedResults.add( insertIndex, returnedValue );
+ //mergedResults.add( insertIndex, returnedValue );
+ mergedResults.add(returnedValue );
- //prune the mergedResults
- while ( mergedResults.size() > maxSize ) {
- if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
-
- //just remove from our tail until the size falls to the correct value
- mergedResults.remove(mergedResults.size()-1);
- }
+ //prune the mergedResults
+// while ( mergedResults.size() > maxSize ) {
+//
+// if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
+//
+// //just remove from our tail until the size falls to the correct value
+// mergedResults.remove(mergedResults.size()-1);
+// resultsTracking.remove(resultsTracking.size()-1);
+//
+// }
}
if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
@@ -381,7 +533,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
return mergedResults;
}
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
new file mode 100644
index 0000000..8a1bee8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.shard;
+
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+
+public class SmartShard<R, C> {
+
+ final ScopedRowKey<R> rowKey;
+ final C shardEnd;
+
+
+ public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+
+ this.rowKey = rowKey;
+ this.shardEnd = shardEnd;
+ }
+
+
+ public ScopedRowKey<R> getRowKey(){
+ return rowKey;
+ }
+
+ public C getShardEnd(){
+ return shardEnd;
+ }
+
+ @Override
+ public String toString(){
+
+ return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }";
+
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 472e0a2..92793cb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.graph.Edge;
+
public class Shard implements Comparable<Shard> {
@@ -30,12 +33,14 @@ public class Shard implements Comparable<Shard> {
private final long shardIndex;
private final long createdTime;
private final boolean compacted;
+ private Optional<Edge> shardEnd;
public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
this.shardIndex = shardIndex;
this.createdTime = createdTime;
this.compacted = compacted;
+ this.shardEnd = Optional.absent();
}
@@ -71,6 +76,14 @@ public class Shard implements Comparable<Shard> {
return shardIndex == MIN_SHARD.shardIndex;
}
+ public void setShardEnd(final Optional<Edge> shardEnd) {
+ this.shardEnd = shardEnd;
+ }
+
+ public Optional<Edge> getShardEnd() {
+ return shardEnd;
+ }
+
/**
* Compare the shards based on the timestamp first, then the created time second
@@ -149,10 +162,20 @@ public class Shard implements Comparable<Shard> {
@Override
public String toString() {
- return "Shard{" +
- "shardIndex=" + shardIndex +
- ", createdTime=" + createdTime +
- ", compacted=" + compacted +
- '}';
+
+ StringBuilder string = new StringBuilder();
+ string.append("Shard{ ");
+ string.append("shardIndex=").append(shardIndex);
+ string.append(", createdTime=").append(createdTime);
+ string.append(", compacted=").append(compacted);
+ string.append(", shardEndTimestamp=");
+ if(shardEnd.isPresent()){
+ string.append(shardEnd.get().getTimestamp());
+ }else{
+ string.append("null");
+ }
+ string.append(" }");
+
+ return string.toString();
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 4d02ba9..e0ba3ec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -1,16 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;
+import org.apache.http.cookie.SM;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
+import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -22,6 +40,10 @@ import com.google.common.base.Preconditions;
import com.netflix.astyanax.Serializer;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator.*;
/**
@@ -34,6 +56,9 @@ import com.netflix.astyanax.util.RangeBuilder;
*/
public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
+ private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
+
+
protected final Optional<T> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
@@ -52,6 +77,8 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
this.shards = shards;
this.last = last;
this.comparator = comparator;
+
+ //logger.info("initializing with shards: {}", shards);
}
@@ -59,6 +86,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
public List<ScopedRowKey<R>> getRowKeys() {
List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
+ //logger.info("shards: {}", shards);
for(Shard shard : shards){
@@ -72,6 +100,33 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
return rowKeys;
}
+ public List<SmartShard> getRowKeysWithShardEnd(){
+
+
+ final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size());
+
+ for(Shard shard : shards){
+
+ final ScopedRowKey< R> rowKey = ScopedRowKey
+ .fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
+
+ final C shardEnd;
+ if(shard.getShardEnd().isPresent()){
+ shardEnd = createColumn((T) shard.getShardEnd().get());
+
+ }else{
+ shardEnd = null;
+ }
+
+
+
+ rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd));
+ }
+
+ return rowKeysWithShardEnd;
+
+ }
+
@Override
public boolean skipFirst( final T first ) {
@@ -127,6 +182,29 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
}
+// public class SmartShard {
+//
+// final ScopedRowKey<R> rowKey;
+// final C shardEnd;
+//
+//
+// public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+//
+// this.rowKey = rowKey;
+// this.shardEnd = shardEnd;
+// }
+//
+//
+// public ScopedRowKey<R> getRowKey(){
+// return rowKey;
+// }
+//
+// public C getShardEnd(){
+// return shardEnd;
+// }
+//
+// }
+
/**
* Get the comparator
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index a79b91a..6f95cf5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -98,13 +98,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
else {
existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+ //logger.info("existing shards has something: {}", existingShards.hasNext());
/**
* We didn't get anything out of cassandra, so we need to create the minumum shard
*/
if ( existingShards == null || !existingShards.hasNext() ) {
-
+ //logger.info("writing min shard");
final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
try {
batch.execute();
@@ -117,6 +118,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
}
+ //logger.info("getShards existing shards: {}", existingShards);
return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
directedEdgeMeta );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 1a88ebb..3ff9d47 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -164,14 +164,19 @@ public class NodeShardCacheImpl implements NodeShardCache {
final CacheKey key = new CacheKey( scope, directedEdgeMeta );
CacheEntry entry;
- try {
- entry = this.graphs.get( key );
- }
- catch ( ExecutionException e ) {
- throw new GraphRuntimeException( "Unable to load shard key for graph", e );
- }
+// try {
+// entry = this.graphs.get( key );
+// }
+// catch ( ExecutionException e ) {
+// throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+// }
+
+ final Iterator<ShardEntryGroup> edges =
+ nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+
+ final CacheEntry cacheEntry = new CacheEntry( edges );
- Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
+ Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp );
if ( iterator == null ) {
return Collections.<ShardEntryGroup>emptyList().iterator();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index f0b0ac9..80b63ec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.graph.Edge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -169,6 +171,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
+ final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
/**
* As we move edges, we want to keep track of it
@@ -181,10 +184,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
+ MarkedEdge shardEnd = null;
+
while ( edges.hasNext() ) {
final MarkedEdge edge = edges.next();
final long edgeTimestamp = edge.getTimestamp();
+ shardEnd = edge;
/**
* The edge is within a different shard, break
@@ -202,6 +208,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
.deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
timestamp ) );
+
edgeCount++;
//if we're at our count, execute the mutation of writing the edges to the new row, then remove them
@@ -217,12 +224,21 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
}
}
+
+ Shard updatedShard = new Shard( sourceShard.getShardIndex(), sourceShard.getCreatedTime(), sourceShard.isCompacted() );
+ updatedShard.setShardEnd(Optional.fromNullable(shardEnd));
+ logger.info("updating with shard end: {}", shardEnd );
+ updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, updatedShard, edgeMeta));
+
}
+
+
try {
newRowBatch.execute();
deleteRowBatch.execute();
+ updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -232,6 +248,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if (logger.isTraceEnabled()) {
logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
}
+ logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+
resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
@@ -276,6 +294,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
updateMark.execute();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index ce0953c..c3e0cc0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -67,6 +67,8 @@ import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.Serializer;
import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -77,6 +79,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Singleton
public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+ private static final Logger logger = LoggerFactory.getLogger( ShardedEdgeSerializationImpl.class );
+
+
protected final Keyspace keyspace;
protected final CassandraConfig cassandraConfig;
protected final GraphFig graphFig;
@@ -401,6 +406,10 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdgeType( search );
+ if(logger.isTraceEnabled()){
+ logger.info("getEdgesFromSource shards: {}", shards);
+ }
+
final Id sourceId = search.getNode();
final String type = search.getType();
final long maxTimestamp = search.getMaxTimestamp();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index d1000fb..af9d979 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -1,10 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
+import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,14 +127,19 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
*/
final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys();
+ final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd();
+
if (logger.isTraceEnabled()) {
logger.trace("Searching with row keys {}", rowKeys);
}
- currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
+ //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
+ currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd);
+
}
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 3ae3ff1..b131e95 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -203,7 +203,10 @@ public class GraphManagerShardConsistencyIT {
// power for writes
final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
- final int numWorkersPerInjector = numProcessors / numInjectors;
+ //final int numWorkersPerInjector = numProcessors / numInjectors;
+
+ final int numWorkersPerInjector = 1;
+
/**
@@ -268,7 +271,9 @@ public class GraphManagerShardConsistencyIT {
final List<Throwable> failures = new ArrayList<>();
- for(int i = 0; i < 2; i ++) {
+ Thread.sleep(5000);
+
+ for(int i = 0; i < 1; i ++) {
/**
@@ -656,7 +661,7 @@ public class GraphManagerShardConsistencyIT {
final long startTime = System.currentTimeMillis();
- for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
+ for ( long i = 1; i < writeLimit +1 && System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
Edge edge = generator.newEdge();
@@ -671,8 +676,8 @@ public class GraphManagerShardConsistencyIT {
writeCounter.incrementAndGet();
- if ( i % 1000 == 0 ) {
- logger.info( " Wrote: " + i );
+ if ( i % 100 == 0 ) {
+ logger.info( Thread.currentThread().getName()+" wrote: " + i );
}
}
@@ -718,7 +723,7 @@ public class GraphManagerShardConsistencyIT {
logger.info( "Completed reading {} edges", returnedEdgeCount );
if ( writeCount != returnedEdgeCount ) {
- logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", writeCount,
+ logger.warn( Thread.currentThread().getName()+" - Unexpected edge count returned!!! Expected {} but was {}", writeCount,
returnedEdgeCount );
}
[05/20] usergrid git commit: Make the shard consistency tests a
little smarter. Update shard compaction to be a safer background process by
updating the edge writes to be atomic,
and the deletes delayed (ensures data will always be available for seeking
Posted by mr...@apache.org.
Make the shard consistency tests a little smarter. Update shard compaction to be a safer background process by updating the edge writes to be atomic, and the deletes delayed (ensures data will always be available for seeking and we filter any dupes).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b112488d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b112488d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b112488d
Branch: refs/heads/release-2.1.1
Commit: b112488db1ef01ee8417e35e48e428c40d0aa206
Parents: 4bbebc5
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Mar 16 17:25:22 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Mar 16 17:25:22 2016 -0700
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 8 +-
.../shard/impl/ShardGroupCompactionImpl.java | 79 +++++++++++---
.../graph/GraphManagerShardConsistencyIT.java | 108 ++++++++++++-------
.../graph/src/test/resources/log4j.properties | 9 +-
4 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 10786f7..6c91aca 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -169,9 +169,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
- // reset the start column as we'll be seeking a new row, any duplicates will be filtered out
- startColumn = null;
-
advance();
}
@@ -312,8 +309,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
- // if a whole page is skipped, this is likely during a shard transition and we should assume there is more to read
- if( skipSize == selectSize || skipSize == selectSize - 1){
+ // if a whole page is skipped OR the result size equals the the difference of what's skipped,
+ // it is likely during a shard transition and we should assume there is more to read
+ if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize ){
moreToReturn = true;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 80b63ec..f644380 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -23,20 +23,17 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.nio.charset.Charset;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import com.google.common.base.Optional;
+import com.netflix.astyanax.connectionpool.OperationResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +69,8 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import rx.Observable;
+import rx.schedulers.Schedulers;
/**
@@ -199,7 +198,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
break;
}
-
newRowBatch.mergeShallow( edgeMeta
.writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, targetShard, edge,
timestamp ) );
@@ -216,8 +214,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if ( edgeCount % maxWorkSize == 0 ) {
try {
- newRowBatch.execute();
- deleteRowBatch.execute();
+
+ // write the edges into the new shard atomically so we know they all succeed
+ newRowBatch.withAtomicBatch(true).execute();
+
+ List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
+ {{
+ add(deleteRowBatch);
+ }};
+
+ // fire the mutation in the background after 1 second delay
+ if(logger.isTraceEnabled()){
+ logger.trace("scheduling shard compaction delete");
+
+ }
+
+ // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
+ Observable.from(deleteMutations)
+ .delay(1000, TimeUnit.MILLISECONDS)
+ .map(deleteRowBatchSingle -> {
+ try {
+ return deleteRowBatchSingle.execute();
+ } catch (ConnectionException e) {
+ logger.error("Unable to remove edges from old shards");
+ throw new RuntimeException("Unable to remove edges from old shards");
+ }
+ })
+ .subscribeOn(Schedulers.io())
+ .toBlocking().last();
+
}
catch ( Throwable t ) {
logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
@@ -236,9 +261,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
try {
- newRowBatch.execute();
- deleteRowBatch.execute();
- updateShardMetaBatch.execute();
+
+ // write the edges into the new shard atomically so we know they all succeed
+ newRowBatch.withAtomicBatch(true).execute();
+
+ List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
+ {{
+ add(deleteRowBatch);
+ }};
+
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("scheduling shard compaction delete");
+ }
+
+ // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
+ Observable.from(deleteMutations)
+ .delay(1000, TimeUnit.MILLISECONDS)
+ .map(deleteRowBatchSingle -> {
+ try {
+ return deleteRowBatchSingle.execute();
+ } catch (ConnectionException e) {
+ logger.error("Unable to remove edges from old shards");
+ throw new RuntimeException("Unable to remove edges from old shards");
+ }
+ })
+ .subscribeOn(Schedulers.io())
+ .toBlocking().last();
+
+ //updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -286,7 +337,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
shardRemovalRollup.execute();
}
catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
+ throw new RuntimeException( "Unable to connect to cassandra", e );
}
@@ -300,7 +351,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
updateMark.execute();
}
catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to casandra", e );
+ throw new RuntimeException( "Unable to connect to cassandra", e );
}
resultBuilder.withCompactedShard( compactedShard );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 82b0879..439553c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.usergrid.StressTest;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -78,7 +79,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
-
public class GraphManagerShardConsistencyIT {
private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
@@ -98,7 +98,10 @@ public class GraphManagerShardConsistencyIT {
protected Object originalShardDelta;
- protected ListeningExecutorService executor;
+ protected ListeningExecutorService writeExecutor;
+
+ protected ListeningExecutorService deleteExecutor;
+
@Before
@@ -112,7 +115,7 @@ public class GraphManagerShardConsistencyIT {
originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
- ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 5000 );
final long cacheTimeout = 2000;
@@ -145,28 +148,39 @@ public class GraphManagerShardConsistencyIT {
reporter.stop();
reporter.report();
- executor.shutdownNow();
+ if(writeExecutor != null){
+ writeExecutor.shutdownNow();
+
+ }
+ if(deleteExecutor != null){
+ deleteExecutor.shutdownNow();
+
+ }
+
}
- private void createExecutor( final int size ) {
- executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
+ private void createWriteExecutor( final int size ) {
+ writeExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
}
+ private void createDeleteExecutor( final int size ) {
+ deleteExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
+ }
@Test
public void writeThousandsSingleSource()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
- final Id sourceId = IdGenerator.createId( "source" );
- final String edgeType = "test";
+ final Id sourceId = IdGenerator.createId( "sourceWrite" );
+ final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString();
final EdgeGenerator generator = new EdgeGenerator() {
@Override
public Edge newEdge() {
- Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) );
+ Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) );
return edge;
@@ -182,12 +196,12 @@ public class GraphManagerShardConsistencyIT {
};
- // final int numInjectors = 2;
+ //final int numInjectors = 2;
final int numInjectors = 1;
/**
- * create 3 injectors. This way all the caches are independent of one another. This is the same as
- * multiple nodes
+ * create injectors. This way all the caches are independent of one another. This is the same as
+ * multiple nodes if there are multiple injectors
*/
final List<Injector> injectors = createInjectors( numInjectors );
@@ -214,7 +228,7 @@ public class GraphManagerShardConsistencyIT {
final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
- createExecutor( numWorkersPerInjector );
+ createWriteExecutor( numWorkersPerInjector );
final AtomicLong writeCounter = new AtomicLong();
@@ -236,7 +250,7 @@ public class GraphManagerShardConsistencyIT {
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future =
- executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
futures.add( future );
}
@@ -260,20 +274,20 @@ public class GraphManagerShardConsistencyIT {
final long writeCount = writeCounter.get();
final long expectedShardCount = writeCount / shardSize;
- final Meter readMeter = registry.meter( "readThroughput" );
+ final Meter readMeter = registry.meter( "readThroughput-writeTest" );
final List<Throwable> failures = new ArrayList<>();
- //Thread.sleep(5000);
+ Thread.sleep(3000); // let's make sure everything is written
- for(int i = 0; i < 2; i ++) {
+ for(int i = 0; i < 1; i ++) {
/**
* Start reading continuously while we migrate data to ensure our view is always correct
*/
final ListenableFuture<Long> future =
- executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
//add the future
@@ -282,7 +296,7 @@ public class GraphManagerShardConsistencyIT {
@Override
public void onSuccess( @Nullable final Long result ) {
logger.info( "Successfully ran the read, re-running" );
- executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
}
@@ -360,7 +374,7 @@ public class GraphManagerShardConsistencyIT {
Thread.sleep(30000);
- executor.shutdownNow();
+ writeExecutor.shutdownNow();
}
@@ -390,20 +404,20 @@ public class GraphManagerShardConsistencyIT {
}
- @Test(timeout=120000)
+ @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup is async
@Category(StressTest.class)
public void writeThousandsDelete()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
- final Id sourceId = IdGenerator.createId( "source" );
- final String edgeType = "test";
+ final Id sourceId = IdGenerator.createId( "sourceDelete" );
+ final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString();
final EdgeGenerator generator = new EdgeGenerator() {
@Override
public Edge newEdge() {
- Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target" ) );
+ Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDelete" ) );
return edge;
@@ -413,18 +427,17 @@ public class GraphManagerShardConsistencyIT {
@Override
public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
return manager.loadEdgesFromSource(
- new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent(), false ) );
}
};
- // final int numInjectors = 2;
final int numInjectors = 1;
/**
- * create 3 injectors. This way all the caches are independent of one another. This is the same as
- * multiple nodes
+ * create injectors. This way all the caches are independent of one another. This is the same as
+ * multiple nodes if there are multiple injectors
*/
final List<Injector> injectors = createInjectors( numInjectors );
@@ -449,7 +462,7 @@ public class GraphManagerShardConsistencyIT {
final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
- createExecutor( numWorkersPerInjector );
+ createDeleteExecutor( numWorkersPerInjector );
final AtomicLong writeCounter = new AtomicLong();
@@ -472,7 +485,7 @@ public class GraphManagerShardConsistencyIT {
for ( int i = 0; i < numWorkersPerInjector; i++ ) {
Future<Boolean> future =
- executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+ deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
futures.add( future );
}
@@ -488,14 +501,14 @@ public class GraphManagerShardConsistencyIT {
//now get all our shards
final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType );
//now submit the readers.
final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
final long writeCount = writeCounter.get();
- final Meter readMeter = registry.meter( "readThroughput" );
+ final Meter readMeter = registry.meter( "readThroughput-deleteTest" );
//check our shard state
@@ -526,11 +539,28 @@ public class GraphManagerShardConsistencyIT {
long count = Long.MAX_VALUE;
+ Thread.sleep(3000); // let's make sure everything is written
+
+ long totalDeleted = 0;
+
while(count != 0) {
- //take 10000 then sleep
- count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap( edge -> manager.markEdge( edge ) )
+
+ logger.info("total deleted: {}", totalDeleted);
+ if(count != Long.MAX_VALUE) { // count starts with Long.MAX
+ logger.info("deleted {} entities, continuing until count is 0", count);
+ }
+ //take 1000 then sleep
+ count = generator.doSearch( manager ).take( 1000 )
+ .filter(markedEdge -> {
+
+ // if it's already been marked let's filter, move on as async deleteEdge()
+ logger.trace("edge already marked, may indicated a problem with gm.deleteEdge(): {}", markedEdge);
+ return !markedEdge.isDeleted();
+ })
+ .flatMap( edge -> manager.markEdge( edge ))
.flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+ totalDeleted += count;
Thread.sleep( 500 );
}
@@ -541,7 +571,7 @@ public class GraphManagerShardConsistencyIT {
/**
* Start reading continuously while we migrate data to ensure our view is always correct
*/
- final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
+ final ListenableFuture<Long> future = deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
final List<Throwable> failures = new ArrayList<>();
@@ -552,7 +582,7 @@ public class GraphManagerShardConsistencyIT {
@Override
public void onSuccess( @Nullable final Long result ) {
logger.info( "Successfully ran the read, re-running" );
- executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ deleteExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
}
@@ -606,8 +636,8 @@ public class GraphManagerShardConsistencyIT {
}
- //we're done, 1 shard remains, we have a group, and it's our default shard
- if ( shardCount == 1 && group != null && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) {
+ // we're done, 1 shard remains, we have a group, and it's our default shard
+ if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex() ) {
logger.info( "All compactions complete," );
break;
@@ -619,7 +649,7 @@ public class GraphManagerShardConsistencyIT {
//now that we have finished expanding s
- executor.shutdownNow();
+ deleteExecutor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 79401c3..5afc288 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,8 +37,9 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
#log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
-#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
[19/20] usergrid git commit: Update the cache invalidation to be
specific for the key and not invalidate the whole cache.
Posted by mr...@apache.org.
Update the cache invalidation to be specific for the key and not invalidate the whole cache.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/39351f1e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/39351f1e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/39351f1e
Branch: refs/heads/release-2.1.1
Commit: 39351f1eb59008348dc2b1812b2cf5159f693721
Parents: 317b182
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 22 20:14:22 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 22 20:14:22 2016 -0700
----------------------------------------------------------------------
.../graph/serialization/impl/shard/NodeShardCache.java | 2 +-
.../impl/shard/impl/NodeShardAllocationImpl.java | 2 +-
.../serialization/impl/shard/impl/NodeShardCacheImpl.java | 7 +++----
.../impl/shard/impl/ShardGroupCompactionImpl.java | 4 ++--
4 files changed, 7 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/39351f1e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 23c2c25..91c180f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -52,6 +52,6 @@ public interface NodeShardCache {
Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
- void invalidate();
+ void invalidate( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/39351f1e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index a6cf378..0c65912 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -252,7 +252,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
// invalidate the shard cache so we can be sure that all read shards are up to date
- nodeShardCache.invalidate();
+ nodeShardCache.invalidate(scope, directedEdgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to casandra", e );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/39351f1e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 5eaaaa0..bbc0431 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -72,8 +72,6 @@ import com.google.inject.Inject;
@Singleton
public class NodeShardCacheImpl implements NodeShardCache {
- private static final Logger logger = LoggerFactory.getLogger( NodeShardCacheImpl.class );
-
/**
* Only cache shards that have < 10k groups. This is an arbitrary amount, and may change with profiling and
* testing
@@ -186,9 +184,10 @@ public class NodeShardCacheImpl implements NodeShardCache {
}
@Override
- public void invalidate(){
+ public void invalidate( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta ){
- graphs.invalidateAll();
+ final CacheKey cacheKey = new CacheKey(scope, directedEdgeMeta);
+ graphs.invalidate(cacheKey);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/39351f1e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 7854c3b..b26ee46 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -341,7 +341,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
shardRemovalRollup.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
- nodeShardCache.invalidate();
+ nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
@@ -359,7 +359,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
updateMark.execute();
// invalidate the shard cache so we can be sure that all read shards are up to date
- nodeShardCache.invalidate();
+ nodeShardCache.invalidate(scope, edgeMeta);
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
[04/20] usergrid git commit: Clean up the logging,
ensure the order of shard iterator within MultiRowColumnIterator is
correct. Restore NodeShardCache logic.
Posted by mr...@apache.org.
Clean up the logging, ensure the order of shard iterator within MultiRowColumnIterator is correct. Restore NodeShardCache logic.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4bbebc5f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4bbebc5f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4bbebc5f
Branch: refs/heads/release-2.1.1
Commit: 4bbebc5fd759efe59bae612c9f47e36589750982
Parents: 92fae0d
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 15 21:59:21 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 15 21:59:21 2016 -0700
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 201 ++++++-------------
.../impl/shard/impl/EdgeSearcher.java | 36 +---
.../impl/shard/impl/NodeShardCacheImpl.java | 19 +-
.../shard/impl/ShardEntryGroupIterator.java | 12 ++
.../impl/shard/impl/ShardsColumnIterator.java | 10 +-
.../graph/GraphManagerShardConsistencyIT.java | 54 ++---
.../graph/src/test/resources/log4j.properties | 4 +
7 files changed, 111 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index c384899..10786f7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.persistence.core.astyanax;
import java.util.*;
-import org.apache.avro.generic.GenericData;
import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +80,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
private List<T> resultsTracking;
+ private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
+
+ private boolean ascending = false;
+
/**
* Remove after finding bug
@@ -114,14 +117,15 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
this.moreToReturn = true;
this.resultsTracking = new ArrayList<>();
- // seenResults = new HashMap<>( pageSize * 10 );
}
+ // temporarily use a new constructor for specific searches until we update each caller of this class
public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
final Collection<R> rowKeys, final int pageSize,
- final List<SmartShard> rowKeysWithShardEnd) {
+ final List<SmartShard> rowKeysWithShardEnd,
+ final boolean ascending) {
this.cf = cf;
this.pageSize = pageSize;
this.columnParser = columnParser;
@@ -133,54 +137,45 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
this.moreToReturn = true;
this.rowKeysWithShardEnd = rowKeysWithShardEnd;
this.resultsTracking = new ArrayList<>();
+ this.ascending = ascending;
-
- // seenResults = new HashMap<>( pageSize * 10 );
}
@Override
public boolean hasNext() {
- if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){
- if(currentShardIterator.hasNext()) {
+ // if column iterator is null, initialize with first call to advance()
+ // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c*
+ if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
+ advance();
+ }
+
+ // when there are no more columns, nothing reported to return, but more shards available, go to the next shard
+ if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
+ !moreToReturn && currentShardIterator.hasNext()){
if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - advancing shard iterator");
- logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
- logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
- logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ logger.trace("Advancing shard iterator");
+ logger.trace("Shard before advance: {}", currentShard);
}
+ // advance to the next shard
currentShard = currentShardIterator.next();
if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ logger.trace("Shard after advance: {}", currentShard);
}
+ // reset the start column as we'll be seeking a new row, any duplicates will be filtered out
startColumn = null;
advance();
- }
- }
-
- if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
- if(currentColumnIterator != null) {
- if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
-
- }
- }
- if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - going into advance()");
-
- }
-
- advance();
}
+
return currentColumnIterator.hasNext();
}
@@ -214,24 +209,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final boolean skipFirstColumn = startColumn != null;
-
-
final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
- //final int selectSize = pageSize;
-
final RangeBuilder rangeBuilder = new RangeBuilder();
if(currentShardIterator == null){
+
+ // flip the order of our shards if ascending
+ if(ascending){
+ Collections.reverse(rowKeysWithShardEnd);
+ }
+
currentShardIterator = rowKeysWithShardEnd.iterator();
}
if(currentShard == null){
- Collections.reverse(rowKeysWithShardEnd); // ranges are ascending
if(logger.isTraceEnabled()){
logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
@@ -266,7 +262,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
rangeBuilder.setLimit( selectSize );
- if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" );
+ if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
/**
* Get our list of slices
@@ -285,65 +281,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
-// List<RowSliceQuery<R, C>> queries = new ArrayList<>();
-//
-// rowKeys.forEach( rowkey -> {
-//
-// queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
-// .withColumnRange( rangeBuilder.build() ));
-//
-// });
-//
-//
-// final List<Rows<R,C>> combinedResults = new ArrayList<>();
-//
-// queries.forEach(query ->{
-//
-// try {
-// combinedResults.add(query.execute().getResult());
-// }
-// catch ( ConnectionException e ) {
-// throw new RuntimeException( "Unable to connect to casandra", e );
-// }
-//
-// });
-
-
-
-
- //now aggregate them together
-
- //this is an optimization. It's faster to see if we only have values for one row,
- // then return the iterator of those columns than
- //do a merge if only one row has data.
-
-
final List<T> mergedResults;
- mergedResults = mergeResults( result, selectSize );
-
-// if ( containsSingleRowOnly( result ) ) {
-// mergedResults = singleRowResult( result );
-// }
-// else {
-// mergedResults = mergeResults( result, selectSize );
-// }
+ skipSize = 0;
+ mergedResults = processResults( result, selectSize );
+ if(logger.isTraceEnabled()){
+ logger.trace("skipped amount: {}", skipSize);
+ }
-// final List<T> mergedResults = new ArrayList<>();
-//
-// combinedResults.forEach(rows -> {
-//
-// if ( containsSingleRowOnly( rows ) ) {
-// mergedResults.addAll(singleRowResult( rows ));
-// }
-// else {
-// mergedResults.addAll(mergeResults( rows, selectSize ));
-// }
-//
-// });
final int size = mergedResults.size();
@@ -363,6 +311,12 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
moreToReturn = true;
}
+
+ // if a whole page is skipped, this is likely during a shard transition and we should assume there is more to read
+ if( skipSize == selectSize || skipSize == selectSize - 1){
+ moreToReturn = true;
+ }
+
//we have a first column to to check
if( size > 0) {
@@ -380,21 +334,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
+ // set the start column for the enxt query
if(moreToReturn && mergedResults.size() > 0){
startColumn = mergedResults.get( mergedResults.size() - 1 );
}
- if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
- logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
- }
-
-
+ currentColumnIterator = mergedResults.iterator();
- currentColumnIterator = mergedResults.iterator();
+ //force an advance of this iterator when there are still shards to read but result set on current shard is 0
+ if(size == 0 && currentShardIterator.hasNext()){
+ hasNext();
+ }
if(logger.isTraceEnabled()){
logger.trace(
@@ -404,7 +357,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
- if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
}
@@ -464,20 +416,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
/**
- * Multiple rows are present, merge them into a single result set
+ * Process the result set and filter any duplicates that may have already been seen in previous shards. During
+ * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for
+ * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
+ *
* @param result
* @return
*/
- private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
-
- if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns. Merging" );
-
+ private List<T> processResults(final Rows<R, C> result, final int maxSize ) {
final List<T> mergedResults = new ArrayList<>(maxSize);
-
-
-
for ( final R key : result.getKeys() ) {
final ColumnList<C> columns = result.getRow( key ).getColumns();
@@ -486,62 +435,24 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final T returnedValue = columnParser.parseColumn( column );
- //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
+ // use an O(log n) search, same as a tree, but with fast access to indexes for later operations
int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
- /**
- * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
- * you'll
- * need to enable this
- */
- //
- // if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) {
- // throw new RuntimeException( String.format(
- // "Cassandra returned 2 unique columns,
- // but your comparator marked them as equal. This " +
- // "indicates a bug in your comparator. Previous value was %s and
- // current value is " +
- // "%s",
- // previous, returnedValue ) );
- // }
- //
- // previous = returnedValue;
-
- //we've already seen it, no-op
+
+ //we've already seen the column, filter it out as we might be in a shard transition or our start column
if(searchIndex > -1){
if(logger.isTraceEnabled()){
logger.trace("skipping column as it was already retrieved before");
}
+ skipSize++;
continue;
}
-// final int insertIndex = (searchIndex+1)*-1;
-//
-// //it's at the end of the list, don't bother inserting just to remove it
-// if(insertIndex >= maxSize){
-// logger.info("skipping column as it was at the end of the list");
-// continue;
-// }
resultsTracking.add(returnedValue);
-
- //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
-
- //mergedResults.add( insertIndex, returnedValue );
mergedResults.add(returnedValue );
-
- //prune the mergedResults
-// while ( mergedResults.size() > maxSize ) {
-//
-// if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
-//
-// //just remove from our tail until the size falls to the correct value
-// mergedResults.remove(mergedResults.size()-1);
-// resultsTracking.remove(resultsTracking.size()-1);
-//
-// }
}
if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index e0ba3ec..2f5817d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -56,9 +56,6 @@ import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterat
*/
public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
- private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
-
-
protected final Optional<T> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
@@ -78,7 +75,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
this.last = last;
this.comparator = comparator;
- //logger.info("initializing with shards: {}", shards);
}
@@ -86,7 +82,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
public List<ScopedRowKey<R>> getRowKeys() {
List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
- //logger.info("shards: {}", shards);
for(Shard shard : shards){
@@ -175,37 +170,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
}
private void setRangeOptions(final RangeBuilder rangeBuilder){
- //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag
+
+ //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag
final boolean reversed = order == SearchByEdgeType.Order.ASCENDING;
rangeBuilder.setReversed( reversed );
}
-// public class SmartShard {
-//
-// final ScopedRowKey<R> rowKey;
-// final C shardEnd;
-//
-//
-// public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
-//
-// this.rowKey = rowKey;
-// this.shardEnd = shardEnd;
-// }
-//
-//
-// public ScopedRowKey<R> getRowKey(){
-// return rowKey;
-// }
-//
-// public C getShardEnd(){
-// return shardEnd;
-// }
-//
-// }
-
-
/**
* Get the comparator
* @return
@@ -214,6 +186,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
return comparator;
}
+ public SearchByEdgeType.Order getOrder(){
+ return order;
+ }
+
/**
* Get the column's serializer
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 3ff9d47..1a88ebb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -164,19 +164,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
final CacheKey key = new CacheKey( scope, directedEdgeMeta );
CacheEntry entry;
-// try {
-// entry = this.graphs.get( key );
-// }
-// catch ( ExecutionException e ) {
-// throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-// }
-
- final Iterator<ShardEntryGroup> edges =
- nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
-
- final CacheEntry cacheEntry = new CacheEntry( edges );
+ try {
+ entry = this.graphs.get( key );
+ }
+ catch ( ExecutionException e ) {
+ throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+ }
- Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp );
+ Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
if ( iterator == null ) {
return Collections.<ShardEntryGroup>emptyList().iterator();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index f1b5108..b64bb58 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -14,6 +14,8 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup
import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.schedulers.Schedulers;
@@ -23,6 +25,9 @@ import rx.schedulers.Schedulers;
*/
public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
+ private static final Logger logger = LoggerFactory.getLogger( ShardEntryGroupIterator.class );
+
+
private final ShardGroupCompaction shardGroupCompaction;
private final PushbackIterator<Shard> sourceIterator;
@@ -106,11 +111,18 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
//we can't add this one to the entries, it doesn't fit within the delta, allocate a new one and break
if ( next.addShard( shard ) ) {
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("adding shard: {}", shard);
+ }
continue;
}
sourceIterator.pushback( shard );
+ if(logger.isTraceEnabled()) {
+ logger.trace("unable to add shard: {}, pushing back and stopping", shard);
+ }
break;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index af9d979..e609d33 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.*;
import org.apache.usergrid.persistence.core.shard.SmartShard;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,15 +130,14 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd();
+ final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING;
+
if (logger.isTraceEnabled()) {
logger.trace("Searching with row keys {}", rowKeys);
}
- //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
- currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd);
-
-
-
+ currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher,
+ searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2602e88..82b0879 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -1,22 +1,20 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.usergrid.persistence.graph;
@@ -205,8 +203,6 @@ public class GraphManagerShardConsistencyIT {
final int numWorkersPerInjector = numProcessors / numInjectors;
- //final int numWorkersPerInjector = 1;
-
/**
@@ -218,16 +214,13 @@ public class GraphManagerShardConsistencyIT {
final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
- final long expectedShardCount = numberOfEdges / shardSize;
-
-
createExecutor( numWorkersPerInjector );
final AtomicLong writeCounter = new AtomicLong();
//min stop time the min delta + 1 cache cycle timeout
- final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+ final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 60000;
logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
@@ -266,14 +259,14 @@ public class GraphManagerShardConsistencyIT {
final long writeCount = writeCounter.get();
+ final long expectedShardCount = writeCount / shardSize;
final Meter readMeter = registry.meter( "readThroughput" );
final List<Throwable> failures = new ArrayList<>();
+ //Thread.sleep(5000);
- Thread.sleep(5000);
-
- for(int i = 0; i < 1; i ++) {
+ for(int i = 0; i < 2; i ++) {
/**
@@ -351,14 +344,9 @@ public class GraphManagerShardConsistencyIT {
//we're done
if ( compactedCount >= expectedShardCount ) {
- logger.info( "All compactions complete, sleeping" );
-
- // final Object mutex = new Object();
- //
- // synchronized ( mutex ){
- //
- // mutex.wait();
- // }
+
+ logger.info( "All compactions complete, sleeping. Compacted shard count={}, expected shard count={}",
+ compactedCount, expectedShardCount );
break;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 608ee03..79401c3 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,4 +37,8 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
#log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO
[14/20] usergrid git commit: Update variable reference for async
event creation time.
Posted by mr...@apache.org.
Update variable reference for async event creation time.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/aef26b9b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/aef26b9b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/aef26b9b
Branch: refs/heads/release-2.1.1
Commit: aef26b9bc0d3af7131aec16b49fe27f4af081e41
Parents: 4d828e1
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 21 11:40:21 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 21 11:40:21 2016 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AsyncEventServiceImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/aef26b9b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index cc1246f..1b32b48 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -354,13 +354,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
if(logger.isDebugEnabled()){
logger.debug(e.getMessage());
}
- return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
} catch (Exception e) {
// if the event fails to process, log and return empty message result so it doesn't get ack'd
logger.error("{}. Failed to process message: {}", e.getMessage(), message.getStringBody().trim() );
- return new IndexEventResult(Optional.absent(), Optional.absent(), event.getCreationTime());
+ return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
}
});
[18/20] usergrid git commit: Increase test readers to 2.
Posted by mr...@apache.org.
Increase test readers to 2.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/317b1827
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/317b1827
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/317b1827
Branch: refs/heads/release-2.1.1
Commit: 317b1827491bd481eba6839076c702f142178b41
Parents: 9771968
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 22 18:32:37 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 22 18:32:37 2016 -0700
----------------------------------------------------------------------
.../usergrid/persistence/graph/GraphManagerShardConsistencyIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/317b1827/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 652c8d6..0d6a27e 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -283,7 +283,7 @@ public class GraphManagerShardConsistencyIT {
logger.info("Sleeping {}ms before reading to ensure all compactions have completed", POST_WRITE_SLEEP);
Thread.sleep(POST_WRITE_SLEEP); // let's make sure everything is written
- for(int i = 0; i < 1; i ++) {
+ for(int i = 0; i < 2; i ++) {
/**
[07/20] usergrid git commit: Throttle the compactions and auditing
such that the 'type' can only be compacted one at a time (source or target
node shard(s))
Posted by mr...@apache.org.
Throttle the compactions and auditing such that the 'type' can only be compacted one at a time (source or target node shard(s))
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58ae197e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58ae197e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58ae197e
Branch: refs/heads/release-2.1.1
Commit: 58ae197ea581f271de644760531a9cd45287c7c9
Parents: 4e407ff
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Mar 18 14:31:30 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Mar 18 14:31:30 2016 -0700
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 2 +-
.../shard/impl/ShardGroupCompactionImpl.java | 54 ++++++++++++++++----
.../graph/GraphManagerShardConsistencyIT.java | 25 ++++-----
3 files changed, 54 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index d8b9097..6049c1f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -375,7 +375,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
// if a whole page is skipped OR the result size equals the the difference of what's skipped,
// it is likely during a shard transition and we should assume there is more to read
- if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize ){
+ if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
moreToReturn = true;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 1890d53..8728c6c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Optional;
import com.netflix.astyanax.connectionpool.OperationResult;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
@@ -200,37 +201,61 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
edgeCount++;
- shardEnd = edge;
+
// if we're at our count, execute the mutation of writing the edges to the new row, then remove them
// from the old rows
if ( edgeCount % maxWorkSize == 0 ) {
+
+
try {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
+ // set the shardEnd after the write is known to be successful
+ shardEnd = edge;
+
+ // Update the shard end after each batch so any reads during transition stay as close to current
+ sourceShard.setShardEnd(
+ Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+ );
+
+ logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, edgeMeta.getNodes(), shardEnd );
+ updateShardMetaBatch.mergeShallow(
+ edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+
+
+
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
+ logger.info("Deleting batch of {} from old shard", maxWorkSize);
deleteRowBatch.execute();
+
}
catch ( Throwable t ) {
logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
}
+ }else {
+
+ shardEnd = edge;
+
}
+
+
+
}
- if (shardEnd != null){
+ if (shardEnd != null && edgeCount > 0){
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
-
- logger.info("Updating shard {} with shardEnd: {}", sourceShard, shardEnd );
+ logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, shardEnd );
updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
}
@@ -247,9 +272,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
+
+ logger.info("Deleting remaining edges from old shard");
deleteRowBatch.execute();
+ // now update with our shard end
updateShardMetaBatch.execute();
+
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -438,6 +467,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
* It's already compacting, don't do anything
*/
if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ logger.info("the group is already compacting");
return AuditResult.COMPACTING;
}
@@ -477,8 +507,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
ShardEntryGroup group ) {
final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
-
final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
+ //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(), edgeMeta, group);
+ //logger.info("checking hash value of: {}, already started: {}", hash, returned );
/**
* Someone already put the value
@@ -509,12 +540,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
/**
* Hash our data into a consistent long
*/
+ @Override
protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
final ShardEntryGroup shardEntryGroup ) {
final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup );
- //add our compaction target to the hash
+ // add the compaction target to the hash
final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
hasher.putLong( compactionTarget.getShardIndex() );
@@ -541,14 +573,16 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
addToHash( hasher, scope.getApplication() );
- /**
- * add our edge meta data
- */
+
+ /** Commenting the full meta from the hash so we allocate/compact shards in a more controlled fashion
+
for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
addToHash( hasher, nodeMeta.getId() );
hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
}
+ **/
+
/**
* Add our edge type
@@ -557,8 +591,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
hasher.putString( type, CHARSET );
}
- //add our compaction target to the hash
-
return hasher;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 8fd7cea..9e6996d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -102,6 +102,8 @@ public class GraphManagerShardConsistencyIT {
protected ListeningExecutorService deleteExecutor;
+ protected int TARGET_NUM_SHARDS = 6;
+
@Before
@@ -172,15 +174,15 @@ public class GraphManagerShardConsistencyIT {
public void writeThousandsSingleSource()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
- final Id sourceId = IdGenerator.createId( "sourceWrite" );
- final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString();
+ final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString() );
+ final String edgeType = "testWrite";
final EdgeGenerator generator = new EdgeGenerator() {
@Override
public Edge newEdge() {
- Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) );
+ Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+ UUIDGenerator.newTimeUUID().toString() ) );
return edge;
@@ -196,7 +198,7 @@ public class GraphManagerShardConsistencyIT {
};
- final int numInjectors = 2;
+ final int numInjectors = 1;
/**
* create injectors. This way all the caches are independent of one another. This is the same as
@@ -218,10 +220,7 @@ public class GraphManagerShardConsistencyIT {
- /**
- * Do 4x shard size so we should have approximately 4 shards
- */
- final long numberOfEdges = shardSize * 4;
+ final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
@@ -233,7 +232,7 @@ public class GraphManagerShardConsistencyIT {
//min stop time the min delta + 1 cache cycle timeout
- final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 60000;
+ final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout() + 120000;
logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
@@ -279,7 +278,7 @@ public class GraphManagerShardConsistencyIT {
final List<Throwable> failures = new ArrayList<>();
Thread.sleep(3000); // let's make sure everything is written
- for(int i = 0; i < 2; i ++) {
+ for(int i = 0; i < 1; i ++) {
/**
@@ -452,11 +451,7 @@ public class GraphManagerShardConsistencyIT {
final int numWorkersPerInjector = numProcessors / numInjectors;
-
- /**
- * Do 4x shard size so we should have approximately 4 shards
- */
- final long numberOfEdges = shardSize * 4;
+ final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
[09/20] usergrid git commit: Clean up some comments and unused code.
Posted by mr...@apache.org.
Clean up some comments and unused code.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bd776723
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bd776723
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bd776723
Branch: refs/heads/release-2.1.1
Commit: bd7767233193ca8c9cde164dee3b1475aefcc440
Parents: bec5093
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Mar 20 18:44:41 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Mar 20 18:44:41 2016 -0700
----------------------------------------------------------------------
.../astyanax/MultiRowShardColumnIterator.java | 35 +++-----------------
.../impl/shard/impl/ShardsColumnIterator.java | 20 ++---------
2 files changed, 8 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bd776723/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
index bfc04c4..b13d0f5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -57,19 +57,14 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
private final Comparator<T> comparator;
- private final Collection<R> rowKeys;
-
private final Keyspace keyspace;
private final ConsistencyLevel consistencyLevel;
-
private T startColumn;
-
private boolean moreToReturn;
-
private Iterator<T> currentColumnIterator;
private Iterator<SmartShard> currentShardIterator;
@@ -78,43 +73,23 @@ public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
private SmartShard currentShard;
- private List<T> resultsTracking;
+ private List<T> resultsTracking; // use for de-duping results that are possible during shard transition
private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
private boolean ascending = false;
- /**
- * Remove after finding bug
- */
-
-
- // private int advanceCount;
- //
- // private final HashMap<T, SeekPosition> seenResults;
-
- /**
- * Complete Remove
- */
-
-
- /**
- * Create the iterator
- */
- // temporarily use a new constructor for specific searches until we update each caller of this class
public MultiRowShardColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
- final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
- final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
- final Collection<R> rowKeys, final int pageSize,
- final List<SmartShard> rowKeysWithShardEnd,
- final boolean ascending) {
+ final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+ final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+ final int pageSize, final List<SmartShard> rowKeysWithShardEnd,
+ final boolean ascending) {
this.cf = cf;
this.pageSize = pageSize;
this.columnParser = columnParser;
this.columnSearch = columnSearch;
this.comparator = comparator;
- this.rowKeys = rowKeys;
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.moreToReturn = true;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bd776723/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index e2dd549..0c90103 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -111,34 +111,20 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
logger.trace("Starting shards column iterator");
}
-
- /**
- * If the edge is present, we need to being seeking from this
- */
-
final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
- //set the range into the search
+ // set the range into the search
searcher.buildRange( rangeBuilder );
-
- /**
- * Get our list of slices
- */
- final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys();
-
+ // get the rows keys and their corresponding 'shardEnd' that we will seek from
final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd();
final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING;
- if (logger.isTraceEnabled()) {
- logger.trace("Searching with row keys {}", rowKeys);
- }
-
currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher,
- searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending);
+ searcher.getComparator(), pageSize, rowKeysWithShardEnd, ascending);
}
[03/20] usergrid git commit: USERGRID-1266: check permissions at REST
layer to avoid incorrect response codes also block user token exchange via
GET /org/app/users/{user}/token?access_token={usertoken}
Posted by mr...@apache.org.
USERGRID-1266: check permissions at REST layer to avoid incorrect response codes
also block user token exchange via GET /org/app/users/{user}/token?access_token={usertoken}
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/10e2be7b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/10e2be7b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/10e2be7b
Branch: refs/heads/release-2.1.1
Commit: 10e2be7b405e1031abf74082efa8dce2ca9cb1fc
Parents: 8435783
Author: Mike Dunker <md...@apigee.com>
Authored: Tue Mar 15 12:30:25 2016 -0700
Committer: Mike Dunker <md...@apigee.com>
Committed: Tue Mar 15 12:30:25 2016 -0700
----------------------------------------------------------------------
.../rest/applications/ServiceResource.java | 24 ++++++++--------
.../notifiers/NotifierResource.java | 3 +-
.../notifiers/NotifiersResource.java | 3 +-
.../rest/applications/users/UserResource.java | 26 ++++++++++++++----
.../rest/applications/users/UsersResource.java | 3 +-
.../ServiceResourceNotFoundExceptionMapper.java | 9 +-----
.../security/SecuredResourceFilterFactory.java | 29 ++++++++++++++------
.../applications/ApplicationResourceIT.java | 2 +-
.../collection/users/PermissionsResourceIT.java | 4 +--
9 files changed, 62 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
index a600bf1..4c92fef 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/ServiceResource.java
@@ -31,6 +31,7 @@ import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.RootResource;
import org.apache.usergrid.rest.applications.assets.AssetsResource;
+import org.apache.usergrid.rest.security.annotations.CheckPermissionsForPath;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
import org.apache.usergrid.security.oauth.AccessInfo;
import org.apache.usergrid.services.*;
@@ -39,7 +40,6 @@ import org.apache.usergrid.services.assets.data.AwsSdkS3BinaryStore;
import org.apache.usergrid.services.assets.data.BinaryStore;
import org.apache.usergrid.services.assets.data.LocalFileBinaryStore;
import org.apache.usergrid.services.exceptions.AwsPropertiesNotFoundException;
-import org.apache.usergrid.utils.InflectionUtils;
import org.apache.usergrid.utils.JsonUtils;
import org.glassfish.jersey.media.multipart.BodyPart;
import org.glassfish.jersey.media.multipart.BodyPartEntity;
@@ -292,8 +292,7 @@ public class ServiceResource extends AbstractContextResource {
boolean collectionGet = false;
if ( action == ServiceAction.GET ) {
- collectionGet = (getServiceParameters().size() == 1 && InflectionUtils
- .isPlural(getServiceParameters().get(0)));
+ collectionGet = getServiceParameters().size() == 1;
}
addQueryParams( getServiceParameters(), ui );
ServiceRequest r = services.newRequest( action, tree, getServiceParameters(), payload,
@@ -330,9 +329,9 @@ public class ServiceResource extends AbstractContextResource {
}
+ @CheckPermissionsForPath
@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.TEXT_HTML, "application/javascript"})
- @RequireApplicationAccess
@JSONP
public ApiResponse executeGet( @Context UriInfo ui,
@QueryParam("callback") @DefaultValue("callback") String callback )
@@ -430,8 +429,8 @@ public class ServiceResource extends AbstractContextResource {
}
+ @CheckPermissionsForPath
@POST
- @RequireApplicationAccess
@Consumes(MediaType.APPLICATION_JSON)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@@ -465,8 +464,8 @@ public class ServiceResource extends AbstractContextResource {
+ @CheckPermissionsForPath
@PUT
- @RequireApplicationAccess
@Consumes(MediaType.APPLICATION_JSON)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@@ -485,8 +484,8 @@ public class ServiceResource extends AbstractContextResource {
}
+ @CheckPermissionsForPath
@DELETE
- @RequireApplicationAccess
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse executeDelete(
@@ -605,8 +604,8 @@ public class ServiceResource extends AbstractContextResource {
/** ************** the following is file attachment (Asset) support ********************* */
+ @CheckPermissionsForPath
@POST
- @RequireApplicationAccess
@Consumes(MediaType.MULTIPART_FORM_DATA)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@@ -621,8 +620,8 @@ public class ServiceResource extends AbstractContextResource {
}
+ @CheckPermissionsForPath
@PUT
- @RequireApplicationAccess
@Consumes(MediaType.MULTIPART_FORM_DATA)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@@ -709,16 +708,16 @@ public class ServiceResource extends AbstractContextResource {
}
+ @CheckPermissionsForPath
@PUT
- @RequireApplicationAccess
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
public Response uploadDataStreamPut( @Context UriInfo ui, InputStream uploadedInputStream ) throws Exception {
return uploadDataStream( ui, uploadedInputStream );
}
+ @CheckPermissionsForPath
@POST
- @RequireApplicationAccess
@Consumes(MediaType.APPLICATION_OCTET_STREAM)
public Response uploadDataStream( @Context UriInfo ui, InputStream uploadedInputStream ) throws Exception {
@@ -752,9 +751,8 @@ public class ServiceResource extends AbstractContextResource {
return Response.status( 200 ).build();
}
-
+ @CheckPermissionsForPath
@GET
- @RequireApplicationAccess
@Produces(MediaType.WILDCARD)
public Response executeStreamGet( @Context UriInfo ui, @PathParam("entityId") PathSegment entityId,
@HeaderParam("range") String rangeHeader,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifierResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifierResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifierResource.java
index 2049983..22fbe0b 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifierResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifierResource.java
@@ -21,6 +21,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.applications.ServiceResource;
+import org.apache.usergrid.rest.security.annotations.CheckPermissionsForPath;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
import org.apache.usergrid.rest.utils.CertificateUtils;
import org.apache.usergrid.services.ServiceAction;
@@ -55,8 +56,8 @@ public class NotifierResource extends ServiceResource {
}
/* Multipart PUT update with uploaded p12Certificate */
+ @CheckPermissionsForPath
@PUT
- @RequireApplicationAccess
@Consumes(MediaType.MULTIPART_FORM_DATA)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifiersResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifiersResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifiersResource.java
index b92e775..c99be8e 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifiersResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/notifiers/NotifiersResource.java
@@ -23,6 +23,7 @@ import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.rest.AbstractContextResource;
import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.applications.ServiceResource;
+import org.apache.usergrid.rest.security.annotations.CheckPermissionsForPath;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
import org.apache.usergrid.rest.utils.CertificateUtils;
import org.apache.usergrid.services.ServiceAction;
@@ -101,8 +102,8 @@ public class NotifiersResource extends ServiceResource {
}
/* Multipart POST create with uploaded p12Certificate */
+ @CheckPermissionsForPath
@POST
- @RequireApplicationAccess
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Override
@JSONP
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
index 751113f..373bf09 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UserResource.java
@@ -36,6 +36,7 @@ import javax.ws.rs.core.PathSegment;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
+import org.apache.usergrid.rest.security.annotations.CheckPermissionsForPath;
import org.glassfish.jersey.server.mvc.Viewable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,8 +104,8 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@PUT
- @RequireApplicationAccess
@Consumes(MediaType.APPLICATION_JSON)
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
@@ -123,7 +124,7 @@ public class UserResource extends ServiceResource {
return super.executePutWithMap( ui, json, callback );
}
-
+ // no access token needed
@PUT
@Path("password")
@JSONP
@@ -253,6 +254,7 @@ public class UserResource extends ServiceResource {
}
+ // no access token needed
@POST
@Path("password")
@JSONP
@@ -264,6 +266,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Path("deactivate")
@JSONP
@@ -282,6 +285,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@GET
@Path("sendpin")
@JSONP
@@ -308,6 +312,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Path("sendpin")
@JSONP
@@ -319,9 +324,9 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@GET
@Path("setpin")
- @RequireApplicationAccess
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse setPin( @Context UriInfo ui, @QueryParam("pin") String pin,
@@ -346,10 +351,10 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Path("setpin")
@Consumes("application/x-www-form-urlencoded")
- @RequireApplicationAccess
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse postPin( @Context UriInfo ui, @FormParam("pin") String pin,
@@ -374,10 +379,10 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Path("setpin")
@Consumes(MediaType.APPLICATION_JSON)
- @RequireApplicationAccess
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse jsonPin( @Context UriInfo ui, JsonNode json,
@@ -402,6 +407,7 @@ public class UserResource extends ServiceResource {
}
+ // no access token needed
@GET
@Path("resetpw")
@Produces(MediaType.TEXT_HTML)
@@ -429,6 +435,7 @@ public class UserResource extends ServiceResource {
}
+ // no access token needed, reset token required
@POST
@Path("resetpw")
@Consumes("application/x-www-form-urlencoded")
@@ -526,6 +533,7 @@ public class UserResource extends ServiceResource {
}
+ // no access token needed, activation token required
@GET
@Path("activate")
@Produces(MediaType.TEXT_HTML)
@@ -547,6 +555,7 @@ public class UserResource extends ServiceResource {
}
+ // no access token needed, confirmation token required
@GET
@Path("confirm")
@Produces(MediaType.TEXT_HTML)
@@ -614,6 +623,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@PUT
@Path("revoketokens")
@JSONP
@@ -625,6 +635,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Path("revoketoken")
@JSONP
@@ -646,6 +657,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@PUT
@Path("revoketoken")
@JSONP
@@ -657,6 +669,7 @@ public class UserResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@GET
@Path("token")
@RequireApplicationAccess
@@ -669,7 +682,8 @@ public class UserResource extends ServiceResource {
try {
- if ( isApplicationUser() && !getUserUuid().equals( getSubjectUserId() ) ) {
+ // don't allow application user tokens to be exchanged for new tokens (possibly increasing ttl)
+ if ( isApplicationUser() ) {
OAuthResponse res = OAuthResponse.errorResponse( SC_FORBIDDEN ).buildJSONMessage();
return Response.status( res.getResponseStatus() ).type( jsonMediaType( callback ) )
.entity( wrapWithCallback( res.getBody(), callback ) ).build();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UsersResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UsersResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UsersResource.java
index 76614f8..34c6915 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UsersResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/users/UsersResource.java
@@ -30,6 +30,7 @@ import org.apache.usergrid.rest.ApiResponse;
import org.apache.usergrid.rest.RootResource;
import org.apache.usergrid.rest.applications.ServiceResource;
import org.apache.usergrid.rest.exceptions.RedirectionException;
+import org.apache.usergrid.rest.security.annotations.CheckPermissionsForPath;
import org.apache.usergrid.rest.security.annotations.RequireApplicationAccess;
import org.glassfish.jersey.server.mvc.Viewable;
import org.slf4j.Logger;
@@ -203,9 +204,9 @@ public class UsersResource extends ServiceResource {
}
+ @CheckPermissionsForPath
@POST
@Override
- @RequireApplicationAccess
@JSONP
@Produces({MediaType.APPLICATION_JSON, "application/javascript"})
public ApiResponse executePost( @Context UriInfo ui, String body,
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/ServiceResourceNotFoundExceptionMapper.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/ServiceResourceNotFoundExceptionMapper.java b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/ServiceResourceNotFoundExceptionMapper.java
index 64eb5d9..4d92811 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/ServiceResourceNotFoundExceptionMapper.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/exceptions/ServiceResourceNotFoundExceptionMapper.java
@@ -20,11 +20,9 @@ package org.apache.usergrid.rest.exceptions;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;
-import org.apache.usergrid.security.shiro.utils.SubjectUtils;
import org.apache.usergrid.services.exceptions.ServiceResourceNotFoundException;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
-import static javax.ws.rs.core.Response.Status.UNAUTHORIZED;
@Provider
@@ -32,11 +30,6 @@ public class ServiceResourceNotFoundExceptionMapper extends AbstractExceptionMap
@Override
public Response toResponse( ServiceResourceNotFoundException e ) {
- if ( SubjectUtils.getSubjectUserId() == null ) {
- return toResponse( UNAUTHORIZED, e );
- }
- else {
- return toResponse( NOT_FOUND, e );
- }
+ return toResponse( NOT_FOUND, e );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java b/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
index 0514dca..4c7d26a 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/security/SecuredResourceFilterFactory.java
@@ -123,7 +123,6 @@ public class SecuredResourceFilterFactory implements DynamicFeature {
featureContext.register( ApplicationFilter.class );
}
else if ( am.isAnnotationPresent( RequireOrganizationAccess.class ) ) {
-
featureContext.register( OrganizationFilter.class );
}
else if ( am.isAnnotationPresent( RequireSystemAccess.class ) ) {
@@ -414,12 +413,32 @@ public class SecuredResourceFilterFactory implements DynamicFeature {
"---- Checked permissions for path --------------------------------------------\n" + "Requested path: {} \n"
+ "Requested action: {} \n" + "Requested permission: {} \n" + "Permitted: {} \n";
- ApplicationInfo application;
+ ApplicationInfo application = null;
try {
application = management.getApplicationInfo( getApplicationIdentifier() );
EntityManager em = emf.getEntityManager( application.getId() );
+
+ if ( SubjectUtils.isAnonymous() ) {
+ Map<String, String> roles = null;
+ try {
+ roles = em.getRoles();
+ if (logger.isTraceEnabled()) {
+ logger.trace("found roles {}", roles);
+ }
+ }
+ catch ( Exception e ) {
+ logger.error( "Unable to retrieve roles", e );
+ }
+ if ( ( roles != null ) && roles.containsKey( "guest" ) ) {
+ loginApplicationGuest( application );
+ }
+ else {
+ throw mappableSecurityException( "unauthorized", "No application guest access authorized" );
+ }
+ }
+
Subject currentUser = SubjectUtils.getSubject();
if ( currentUser == null ) {
@@ -435,12 +454,6 @@ public class SecuredResourceFilterFactory implements DynamicFeature {
logger.debug( PATH_MSG, path, operation, perm, permitted );
}
- if(!permitted){
- // throwing this so we can raise a proper mapped REST exception
- throw new Exception("Subject not permitted");
- }
-
-
SubjectUtils.checkPermission( perm );
Subject subject = SubjectUtils.getSubject();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
index e822f66..2dd5090 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/ApplicationResourceIT.java
@@ -517,7 +517,7 @@ public class ApplicationResourceIT extends AbstractRestIT {
//Set the default TTL of the application to a date far in the future
final MapUtils.HashMapBuilder<String, String> map =
new MapUtils.HashMapBuilder<String, String>().map( "accesstokenttl", "31536000000" );
- this.app().getTarget( false )
+ this.app().getTarget(true, clientSetup.getSuperuserToken())
.queryParam( "access_token", token )
.request()
.accept( MediaType.APPLICATION_JSON )
http://git-wip-us.apache.org/repos/asf/usergrid/blob/10e2be7b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
index 340b75f..973de55 100644
--- a/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/applications/collection/users/PermissionsResourceIT.java
@@ -193,7 +193,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
}
@Test
- public void getNonExistantEntityReturns404() throws Exception {
+ public void getNonExistentEntityReturns404() throws Exception {
// Call a get on a existing entity with no access token and check if we get a 401
try {
@@ -409,7 +409,7 @@ public class PermissionsResourceIT extends AbstractRestIT {
"get,put,post:/reviews/**" );
// allow access to all user's connections excluding delete
addPermission( "reviewer",
- "get,put,post:/users/${user}/**" );
+ "get,put,post:/users/me/**" );
// allow access to the review relationship excluding delete
addPermission( "reviewer",
"get,put,post:/books/*/review/*" );
[11/20] usergrid git commit: Fix info log statement to be trace.
Posted by mr...@apache.org.
Fix info log statement to be trace.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1e2b0827
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1e2b0827
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1e2b0827
Branch: refs/heads/release-2.1.1
Commit: 1e2b082704b1fa1ff3ae4fbdf5afa69902f4aa26
Parents: 11d8a0e
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Mar 20 22:52:31 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Mar 20 22:52:31 2016 -0700
----------------------------------------------------------------------
.../impl/shard/impl/ShardGroupCompactionImpl.java | 7 ++++++-
1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1e2b0827/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index e63db46..c83be6b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -486,7 +486,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
* It's already compacting, don't do anything
*/
if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- logger.info("the group is already compacting");
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Already compacting, won't compact group: {}", group);
+ }
+
+
return AuditResult.COMPACTING;
}
[13/20] usergrid git commit: Add additional filter for index
processing to ensure things do not get ack'd that should not get ack'd.
Posted by mr...@apache.org.
Add additional filter for index processing to ensure things do not get ack'd that should not get ack'd.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4d828e1c
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4d828e1c
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4d828e1c
Branch: refs/heads/release-2.1.1
Commit: 4d828e1cdba69430b06337988e91490eaf0cfd1b
Parents: 4c41f1b
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 21 11:12:34 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 21 11:12:34 2016 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AsyncEventServiceImpl.java | 1 +
.../usergrid/persistence/index/impl/IndexOperationMessage.java | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d828e1c/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 4f8b7ce..cc1246f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -756,6 +756,7 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// filter out messages that are not present, they were not processed and put into the results
.filter( result -> result.getQueueMessage().isPresent() )
+ .filter( result -> result.getIndexOperationMessage().isPresent() )
.map(indexEventResult -> {
//record the cycle time
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4d828e1c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
index 7d19ce3..f544967 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexOperationMessage.java
@@ -54,7 +54,7 @@ public class IndexOperationMessage implements Serializable {
public void addIndexRequest( final IndexOperation indexRequest ) {
- indexRequests.add( indexRequest );
+ this.indexRequests.add( indexRequest );
}
[12/20] usergrid git commit: Remove unused import.
Posted by mr...@apache.org.
Remove unused import.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4c41f1bd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4c41f1bd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4c41f1bd
Branch: refs/heads/release-2.1.1
Commit: 4c41f1bdc878190076deb875c7c5fd202487a0d0
Parents: 1e2b082
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Mar 20 22:56:10 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Mar 20 22:56:10 2016 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AsyncEventServiceImpl.java | 1 -
.../serialization/impl/shard/impl/ShardGroupCompactionImpl.java | 2 +-
2 files changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c41f1bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 7e368c7..4f8b7ce 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.avro.generic.GenericData;
import org.apache.usergrid.persistence.index.impl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4c41f1bd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index c83be6b..b88c52c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -490,7 +490,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if(logger.isTraceEnabled()) {
logger.trace("Already compacting, won't compact group: {}", group);
}
-
+
return AuditResult.COMPACTING;
}
[15/20] usergrid git commit: Upgrade to ES 1.7.3 Java client and
remove incorrect filter on async events.
Posted by mr...@apache.org.
Upgrade to ES 1.7.3 Java client and remove incorrect filter on async events.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/903fd186
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/903fd186
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/903fd186
Branch: refs/heads/release-2.1.1
Commit: 903fd1864465819ea055dab8d41e6d336f582491
Parents: aef26b9
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 21 12:51:49 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 21 12:51:49 2016 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AsyncEventServiceImpl.java | 9 ++++++++-
stack/corepersistence/pom.xml | 2 +-
2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/903fd186/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 1b32b48..3b74daf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -741,6 +741,14 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Submit results to index and return the queue messages to be ack'd
+ *
+ *
+ *
+ *
+ *
+ *
+ *
+ *
* @param indexEventResults
* @return
*/
@@ -756,7 +764,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
// filter out messages that are not present, they were not processed and put into the results
.filter( result -> result.getQueueMessage().isPresent() )
- .filter( result -> result.getIndexOperationMessage().isPresent() )
.map(indexEventResult -> {
//record the cycle time
http://git-wip-us.apache.org/repos/asf/usergrid/blob/903fd186/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 0d8609b..f2bbbcd 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -80,7 +80,7 @@ limitations under the License.
<commons.collections.version>3.2.1</commons.collections.version>
<commons.io.version>2.4</commons.io.version>
<commons.lang.version>3.1</commons.lang.version>
- <elasticsearch.version>1.4.4</elasticsearch.version>
+ <elasticsearch.version>1.7.3</elasticsearch.version>
<fasterxml-uuid.version>3.1.3</fasterxml-uuid.version>
<guava.version>18.0</guava.version>
<guice.version>4.0-beta5</guice.version>
[20/20] usergrid git commit: Merge commit 'refs/pull/493/head' of
github.com:apache/usergrid into release-2.1.1
Posted by mr...@apache.org.
Merge commit 'refs/pull/493/head' of github.com:apache/usergrid into release-2.1.1
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/e64fa350
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/e64fa350
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/e64fa350
Branch: refs/heads/release-2.1.1
Commit: e64fa3503069f41b66e3942a946b14d7222e1525
Parents: 10e2be7 39351f1
Author: Michael Russo <mr...@apigee.com>
Authored: Wed Mar 23 10:32:30 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Wed Mar 23 10:32:30 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 18 +-
.../usergrid/corepersistence/index/RxTest.java | 129 ++++++
.../persistence/core/astyanax/ColumnSearch.java | 7 +-
.../core/astyanax/MultiRowColumnIterator.java | 37 +-
.../astyanax/MultiRowShardColumnIterator.java | 430 +++++++++++++++++++
.../persistence/core/shard/SmartShard.java | 52 +++
.../astyanax/MultiRowColumnIteratorTest.java | 14 +-
.../impl/EdgeMetadataSerializationV2Impl.java | 4 +-
.../impl/shard/NodeShardCache.java | 7 +-
.../graph/serialization/impl/shard/Shard.java | 33 +-
.../impl/shard/impl/EdgeSearcher.java | 86 +++-
.../shard/impl/EdgeShardSerializationImpl.java | 57 ++-
.../shard/impl/NodeShardAllocationImpl.java | 38 +-
.../impl/shard/impl/NodeShardCacheImpl.java | 14 +-
.../shard/impl/ShardEntryGroupIterator.java | 12 +
.../shard/impl/ShardGroupCompactionImpl.java | 179 ++++++--
.../impl/ShardedEdgeSerializationImpl.java | 9 +
.../impl/shard/impl/ShardsColumnIterator.java | 49 ++-
.../shard/impl/serialize/ShardSerializer.java | 99 +++++
.../graph/GraphManagerShardConsistencyIT.java | 215 ++++++----
.../impl/shard/EdgeShardSerializationTest.java | 12 +-
.../impl/shard/NodeShardAllocationTest.java | 44 +-
.../impl/shard/ShardGroupCompactionTest.java | 4 +-
.../graph/src/test/resources/log4j.properties | 6 +
stack/corepersistence/pom.xml | 2 +-
.../index/impl/IndexOperationMessage.java | 2 +-
26 files changed, 1303 insertions(+), 256 deletions(-)
----------------------------------------------------------------------
[06/20] usergrid git commit: Add 'smart' shard seeking into the multi
row column iterator. This leverages a stored 'end' edge marked on a shard at
the end of compacting and balancing a shard's (row's) edges (columns).
Posted by mr...@apache.org.
Add 'smart' shard seeking into the multi row column iterator. This leverages a stored 'end' edge marked on a shard at the end of compacting and balancing a shard's (row's) edges (columns).
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e407ff6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e407ff6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e407ff6
Branch: refs/heads/release-2.1.1
Commit: 4e407ff690f220ee04c535a5ce91ca5a3a07ad1d
Parents: b112488
Author: Michael Russo <mr...@apigee.com>
Authored: Thu Mar 17 22:12:51 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Thu Mar 17 22:12:51 2016 -0700
----------------------------------------------------------------------
.../persistence/core/astyanax/ColumnSearch.java | 7 +-
.../core/astyanax/MultiRowColumnIterator.java | 146 ++++++++++---------
.../persistence/core/shard/SmartShard.java | 8 +-
.../astyanax/MultiRowColumnIteratorTest.java | 14 +-
.../impl/EdgeMetadataSerializationV2Impl.java | 4 +-
.../graph/serialization/impl/shard/Shard.java | 8 +-
.../impl/shard/impl/EdgeSearcher.java | 38 ++---
.../shard/impl/EdgeShardSerializationImpl.java | 44 +++++-
.../shard/impl/NodeShardAllocationImpl.java | 13 +-
.../shard/impl/ShardGroupCompactionImpl.java | 88 +++--------
.../impl/ShardedEdgeSerializationImpl.java | 2 +-
.../shard/impl/serialize/ShardSerializer.java | 99 +++++++++++++
.../graph/GraphManagerShardConsistencyIT.java | 42 +++---
.../graph/src/test/resources/log4j.properties | 1 +
14 files changed, 315 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
index 112f4aa..43654ad 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
@@ -33,17 +33,18 @@ public interface ColumnSearch<T> {
* Set the start value supplied and the user supplied end value (if present)
*
* @param value The value to set in the start
+ * @param end
*/
- public void buildRange( final RangeBuilder rangeBuilder, final T value );
+ void buildRange(final RangeBuilder rangeBuilder, final T start, T end);
/**
* Set the range builder with the user supplied start and finish
*/
- public void buildRange( final RangeBuilder rangeBuilder );
+ void buildRange( final RangeBuilder rangeBuilder );
/**
* Return true if we should skip the first result
* @return
*/
- public boolean skipFirst(final T first);
+ boolean skipFirst(final T first);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 6c91aca..d8b9097 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -227,14 +227,14 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
if(currentShard == null){
if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+ logger.trace("currentShard: {}", currentShard);
}
currentShard = currentShardIterator.next();
if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
- logger.trace(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+ logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
+ logger.trace("initializing iterator with shard: {}", currentShard);
}
@@ -242,21 +242,85 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
+ // initial request, build the range with no start and no end
+ if ( startColumn == null && currentShard.getShardEnd() == null ){
+ columnSearch.buildRange( rangeBuilder );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("initial search (no start or shard end)");
+ }
- //set the range into the search
- if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - startColumn={}", startColumn);
}
+ // if there's only a startColumn set the range start startColumn always
+ else if ( startColumn != null && currentShard.getShardEnd() == null ){
+
+ columnSearch.buildRange( rangeBuilder, startColumn, null );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search (no shard end) with start: {}", startColumn);
+ }
- if ( startColumn == null ) {
- columnSearch.buildRange( rangeBuilder );
}
- else {
- columnSearch.buildRange( rangeBuilder, startColumn );
+ // if there's only a shardEnd, set the start/end according based on the search order
+ else if ( startColumn == null && currentShard.getShardEnd() != null ){
+
+ T shardEnd = (T) currentShard.getShardEnd();
+
+ // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
+ if(!ascending) {
+
+ columnSearch.buildRange(rangeBuilder, shardEnd, null);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search descending with start: {}", shardEnd);
+ }
+
+ }
+ // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
+ else{
+
+ columnSearch.buildRange( rangeBuilder, null, shardEnd );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search ascending with end: {}", shardEnd);
+ }
+
+ }
+
}
+ // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
+ else if ( startColumn != null && currentShard.getShardEnd() != null) {
+
+ T shardEnd = (T) currentShard.getShardEnd();
+
+
+ // if the search is not ascending, set the start to be the older edge
+ if(!ascending){
+
+ T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
+ columnSearch.buildRange( rangeBuilder, searchStart, null);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search descending with start: {} in shard", searchStart, currentShard);
+ }
+
+ }
+ // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
+ else{
+
+ columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
+ }
+
+ }
+
+ }
+
rangeBuilder.setLimit( selectSize );
if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
@@ -296,8 +360,8 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
if(logger.isTraceEnabled()){
- logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size);
- logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+ logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
+ logger.trace("selectSize={}, size={}, ", selectSize, size);
}
@@ -348,8 +412,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
if(logger.isTraceEnabled()){
- logger.trace(
- Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
+ logger.trace("currentColumnIterator.hasNext()={}, " +
"moreToReturn={}, currentShardIterator.hasNext()={}",
currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
}
@@ -359,61 +422,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
/**
- * Return true if we have < 2 rows with columns, false otherwise
- */
- private boolean containsSingleRowOnly( final Rows<R, C> result ) {
-
- int count = 0;
-
- for ( R key : result.getKeys() ) {
- if ( result.getRow( key ).getColumns().size() > 0 ) {
- count++;
-
- //we have more than 1 row with values, return them
- if ( count > 1 ) {
- return false;
- }
- }
- }
-
- return true;
- }
-
-
- /**
- * A single row is present, only parse the single row
- * @param result
- * @return
- */
- private List<T> singleRowResult( final Rows<R, C> result ) {
-
- if (logger.isTraceEnabled()) logger.trace( "Only a single row has columns. Parsing directly" );
-
- for ( R key : result.getKeys() ) {
- final ColumnList<C> columnList = result.getRow( key ).getColumns();
-
- final int size = columnList.size();
-
- if ( size > 0 ) {
-
- final List<T> results = new ArrayList<>(size);
-
- for(Column<C> column: columnList){
- results.add(columnParser.parseColumn( column ));
- }
-
- return results;
-
-
- }
- }
-
- //we didn't have any results, just return nothing
- return Collections.<T>emptyList();
- }
-
-
- /**
* Process the result set and filter any duplicates that may have already been seen in previous shards. During
* a shard transition, there could be the same columns in multiple shards (rows). This will also allow for
* filtering the startColumn (the seek starting point) when paging a row in Cassandra.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
index 8a1bee8..b60cb59 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -20,13 +20,13 @@ package org.apache.usergrid.persistence.core.shard;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-public class SmartShard<R, C> {
+public class SmartShard<R, T> {
final ScopedRowKey<R> rowKey;
- final C shardEnd;
+ final T shardEnd;
- public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+ public SmartShard(final ScopedRowKey<R> rowKey, final T shardEnd){
this.rowKey = rowKey;
this.shardEnd = shardEnd;
@@ -37,7 +37,7 @@ public class SmartShard<R, C> {
return rowKey;
}
- public C getShardEnd(){
+ public T getShardEnd(){
return shardEnd;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 9f5741b..b6ee7fe 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,7 +52,6 @@ import com.netflix.astyanax.util.RangeBuilder;
import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
import static org.junit.Assert.assertEquals;
@@ -156,7 +154,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
}
@@ -201,7 +199,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
buildRange( rangeBuilder );
}
@@ -276,7 +274,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
}
@@ -325,7 +323,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
buildRange( rangeBuilder );
}
@@ -414,7 +412,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
}
@@ -459,7 +457,7 @@ public class MultiRowColumnIteratorTest {
final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
rangeBuilder.setStart( value );
buildRange( rangeBuilder );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 2af62a8..9b0257f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -489,7 +489,7 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
//resume from the last if specified. Also set the range
return new ColumnSearch<String>() {
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final String value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final String value, String end) {
rangeBuilder.setLimit( graphFig.getScanPageSize() );
@@ -517,7 +517,7 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
@Override
public void buildRange( final RangeBuilder rangeBuilder ) {
- buildRange( rangeBuilder, null );
+ buildRange( rangeBuilder, null, null);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 92793cb..6394703 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -33,7 +33,7 @@ public class Shard implements Comparable<Shard> {
private final long shardIndex;
private final long createdTime;
private final boolean compacted;
- private Optional<Edge> shardEnd;
+ private Optional<DirectedEdge> shardEnd;
public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
@@ -76,11 +76,11 @@ public class Shard implements Comparable<Shard> {
return shardIndex == MIN_SHARD.shardIndex;
}
- public void setShardEnd(final Optional<Edge> shardEnd) {
+ public void setShardEnd(final Optional<DirectedEdge> shardEnd) {
this.shardEnd = shardEnd;
}
- public Optional<Edge> getShardEnd() {
+ public Optional<DirectedEdge> getShardEnd() {
return shardEnd;
}
@@ -170,7 +170,7 @@ public class Shard implements Comparable<Shard> {
string.append(", compacted=").append(compacted);
string.append(", shardEndTimestamp=");
if(shardEnd.isPresent()){
- string.append(shardEnd.get().getTimestamp());
+ string.append(shardEnd.get().timestamp);
}else{
string.append("null");
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 2f5817d..6d8ddd4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -19,20 +19,14 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.http.cookie.SM;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
-import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.shard.SmartShard;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import com.google.common.base.Optional;
@@ -43,8 +37,6 @@ import com.netflix.astyanax.util.RangeBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator.*;
-
/**
* Searcher to be used when performing the search. Performs I/O transformation as well as parsing for the iterator. If
@@ -56,6 +48,9 @@ import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterat
*/
public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
+ private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
+
+
protected final Optional<T> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
@@ -84,7 +79,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
for(Shard shard : shards){
-
final ScopedRowKey< R> rowKey = ScopedRowKey
.fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
@@ -105,16 +99,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
final ScopedRowKey< R> rowKey = ScopedRowKey
.fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
- final C shardEnd;
- if(shard.getShardEnd().isPresent()){
- shardEnd = createColumn((T) shard.getShardEnd().get());
+ final T shardEnd;
+ if(shard.getShardEnd().isPresent()){
+ shardEnd = createEdge((C) shard.getShardEnd().get(), false); // convert DirectedEdge to Edge
}else{
shardEnd = null;
}
-
-
rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd));
}
@@ -142,11 +134,23 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
@Override
- public void buildRange( final RangeBuilder rangeBuilder, final T value ) {
+ public void buildRange(final RangeBuilder rangeBuilder, final T start, T end) {
- C edge = createColumn( value );
+ if ( start != null){
- rangeBuilder.setStart( edge, getSerializer() );
+ C startEdge = createColumn( start );
+ rangeBuilder.setStart( startEdge, getSerializer() );
+ }else{
+
+ setTimeScan( rangeBuilder );
+ }
+
+ if( end != null){
+
+ C endEdge = createColumn( end );
+ rangeBuilder.setEnd( endEdge, getSerializer() );
+
+ }
setRangeOptions( rangeBuilder );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 120a15c..d22f472 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEd
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
@@ -53,11 +54,16 @@ import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Singleton
public class EdgeShardSerializationImpl implements EdgeShardSerialization {
+ private static final Logger logger = LoggerFactory.getLogger( EdgeShardSerializationImpl.class );
+
+
/**
* Edge shards
*/
@@ -67,6 +73,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
+ private static final ShardSerializer SHARD_SERIALIZER = ShardSerializer.INSTANCE;
protected final Keyspace keyspace;
@@ -101,7 +108,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final MutationBatch batch = keyspace.prepareMutationBatch();
batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
- .putColumn( shard.getShardIndex(), shard.isCompacted() );
+ .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard));
return batch;
}
@@ -180,9 +187,42 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
private static class ShardColumnParser implements ColumnParser<Long, Shard> {
+ /** Example CQL schema for this table
+ *
+ * CREATE TABLE "Usergrid_Applications"."Edge_Shards" (
+ * key blob,
+ * column1 bigint,
+ * value blob,
+ * PRIMARY KEY (key, column1)
+ * ) WITH COMPACT STORAGE
+ * AND CLUSTERING ORDER BY (column1 DESC)
+ *
+ *
+ *
+ */
+
+
@Override
public Shard parseColumn( final Column<Long> column ) {
- return new Shard( column.getName(), column.getTimestamp(), column.getBooleanValue() );
+
+ // A custom serializer was introduced to handle parsing multiple column formats without re-writing the data.
+ // The column can be stored as a legacy, single boolean, value OR a new, composite, value which contains
+ // every item in the shard. If the legacy value is seen, we return a shard with Long.MIN for index and
+ // createdTime so it can be identified later and handled.
+
+
+ Shard shard = column.getValue(SHARD_SERIALIZER);
+
+ if (shard.getShardIndex() == Long.MIN_VALUE && shard.getCreatedTime() == Long.MIN_VALUE){
+
+ // this was deserialized as a legacy column format, use the column name and timestamp for the shard
+ return new Shard(column.getName(), column.getTimestamp(), shard.isCompacted());
+
+ } else {
+
+ return shard;
+ }
+
}
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6f95cf5..6b190a1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -105,7 +105,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
if ( existingShards == null || !existingShards.hasNext() ) {
- //logger.info("writing min shard");
final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
try {
batch.execute();
@@ -160,7 +159,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
if ( shard.getCreatedTime() >= minTime ) {
- if (logger.isTraceEnabled()) logger.trace( "Shard entry group {} and shard {} is before the minimum created time of {}. Not allocating.does not have 1 entry, not allocating", shardEntryGroup, shard, minTime );
+ if (logger.isTraceEnabled()) logger.trace( "Shard entry group {} and shard {} is before the minimum created time of {}. Not allocating", shardEntryGroup, shard, minTime );
return false;
}
@@ -196,7 +195,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
final Iterator<MarkedEdge> edges = directedEdgeMeta
- .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singletonList(shard),0,
SearchByEdgeType.Order.ASCENDING );
@@ -217,7 +216,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* element will suffice.
*/
-
+ long edgeCount = 0;
for ( long i = 1; edges.hasNext(); i++ ) {
//we hit a pivot shard, set it since it could be the last one we encounter
if ( i % shardSize == 0 ) {
@@ -226,6 +225,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
else {
edges.next();
}
+ edgeCount++;
}
@@ -233,7 +233,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* Sanity check in case we audit before we have a full shard
*/
if ( marked == null ) {
- if (logger.isTraceEnabled()) logger.trace( "Shard {} in shard group {} not full, not splitting", shard, shardEntryGroup );
+ if (logger.isTraceEnabled()){
+ logger.trace( "Shard {} in shard group {} not full, " +
+ "not splitting. Edge count: {}", shard, shardEntryGroup, edgeCount );
+ }
return false;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index f644380..1890d53 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Optional;
import com.netflix.astyanax.connectionpool.OperationResult;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,15 +44,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -189,7 +181,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final MarkedEdge edge = edges.next();
final long edgeTimestamp = edge.getTimestamp();
- shardEnd = edge;
+
/**
* The edge is within a different shard, break
@@ -208,9 +200,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
edgeCount++;
+ shardEnd = edge;
- //if we're at our count, execute the mutation of writing the edges to the new row, then remove them
- //from the old rows
+ // if we're at our count, execute the mutation of writing the edges to the new row, then remove them
+ // from the old rows
if ( edgeCount % maxWorkSize == 0 ) {
try {
@@ -218,30 +211,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
- List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
- {{
- add(deleteRowBatch);
- }};
-
- // fire the mutation in the background after 1 second delay
- if(logger.isTraceEnabled()){
- logger.trace("scheduling shard compaction delete");
-
- }
-
- // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
- Observable.from(deleteMutations)
- .delay(1000, TimeUnit.MILLISECONDS)
- .map(deleteRowBatchSingle -> {
- try {
- return deleteRowBatchSingle.execute();
- } catch (ConnectionException e) {
- logger.error("Unable to remove edges from old shards");
- throw new RuntimeException("Unable to remove edges from old shards");
- }
- })
- .subscribeOn(Schedulers.io())
- .toBlocking().last();
+ // on purpose block this thread before deleting the old edges to be sure there are no gaps
+ // duplicates are filtered on graph seeking so this is OK
+ Thread.sleep(1000);
+ deleteRowBatch.execute();
}
catch ( Throwable t ) {
@@ -250,10 +223,16 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
}
- Shard updatedShard = new Shard( sourceShard.getShardIndex(), sourceShard.getCreatedTime(), sourceShard.isCompacted() );
- updatedShard.setShardEnd(Optional.fromNullable(shardEnd));
- logger.info("updating with shard end: {}", shardEnd );
- updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, updatedShard, edgeMeta));
+ if (shardEnd != null){
+
+ sourceShard.setShardEnd(
+ Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+ );
+
+
+ logger.info("Updating shard {} with shardEnd: {}", sourceShard, shardEnd );
+ updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+ }
}
@@ -265,31 +244,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
- List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
- {{
- add(deleteRowBatch);
- }};
-
-
- if(logger.isTraceEnabled()) {
- logger.trace("scheduling shard compaction delete");
- }
-
- // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
- Observable.from(deleteMutations)
- .delay(1000, TimeUnit.MILLISECONDS)
- .map(deleteRowBatchSingle -> {
- try {
- return deleteRowBatchSingle.execute();
- } catch (ConnectionException e) {
- logger.error("Unable to remove edges from old shards");
- throw new RuntimeException("Unable to remove edges from old shards");
- }
- })
- .subscribeOn(Schedulers.io())
- .toBlocking().last();
+ // on purpose block this thread before deleting the old edges to be sure there are no gaps
+ // duplicates are filtered on graph seeking so this is OK
+ Thread.sleep(1000);
+ deleteRowBatch.execute();
- //updateShardMetaBatch.execute();
+ updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges to target shard {}", targetShard );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index c3e0cc0..c7028aa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -407,7 +407,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
GraphValidation.validateSearchByEdgeType( search );
if(logger.isTraceEnabled()){
- logger.info("getEdgesFromSource shards: {}", shards);
+ logger.trace("getEdgesFromSource shards: {}", shards);
}
final Id sourceId = search.getNode();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
new file mode 100644
index 0000000..58276fe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.BooleanSerializer;
+import com.netflix.astyanax.serializers.ByteSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+
+import java.nio.ByteBuffer;
+
+
+public class ShardSerializer extends AbstractSerializer<Shard> {
+
+ private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+ private static final EdgeSerializer EDGE_SERIALIZER = EdgeSerializer.INSTANCE;
+ private static final ByteSerializer BYTE_SERIALIZER = ByteSerializer.get();
+
+
+ public static final ShardSerializer INSTANCE = new ShardSerializer();
+
+
+ @Override
+ public ByteBuffer toByteBuffer(final Shard shard ) {
+
+ DynamicComposite composite = new DynamicComposite();
+
+ composite.addComponent( (byte) 2 , BYTE_SERIALIZER);
+ composite.addComponent( shard.getShardIndex(), LONG_SERIALIZER);
+ composite.addComponent( shard.getCreatedTime(), LONG_SERIALIZER);
+
+ if(shard.getShardEnd().isPresent()) {
+ composite.addComponent(shard.getShardEnd().get(), EDGE_SERIALIZER);
+ }else{
+ composite.addComponent(null, EDGE_SERIALIZER);
+ }
+
+ composite.addComponent( shard.isCompacted(), BOOLEAN_SERIALIZER);
+
+ return composite.serialize();
+ }
+
+
+ @Override
+ public Shard fromByteBuffer( final ByteBuffer byteBuffer ) {
+ DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+ Preconditions.checkArgument( composite.size() == 1 || composite.size() == 5,
+ "Composite should have 1 or 5 elements" );
+
+ // this is the legacy column format, return a shard with identifiable values so the column name and timestamp
+ // can be used
+ if( composite.size() == 1){
+
+ final boolean isCompacted = composite.get( 0, BOOLEAN_SERIALIZER);
+ return new Shard(Long.MIN_VALUE, Long.MIN_VALUE, isCompacted);
+
+ }
+ // This is the new format which contains all the information about a Shard. Include a byte version of 2 if it's
+ // needed later for any reason.
+ else{
+
+ final byte version = composite.get(0, BYTE_SERIALIZER);
+ final long shardIndex = composite.get( 1, LONG_SERIALIZER );
+ final long shardCreated = composite.get( 2, LONG_SERIALIZER );
+ final DirectedEdge shardEnd = composite.get( 3, EDGE_SERIALIZER);
+ final boolean isCompacted = composite.get( 4, BOOLEAN_SERIALIZER);
+
+
+ final Shard shard = new Shard(shardIndex, shardCreated, isCompacted);
+ shard.setShardEnd(Optional.fromNullable(shardEnd));
+ return shard;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 439553c..8fd7cea 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -115,10 +115,10 @@ public class GraphManagerShardConsistencyIT {
originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
- ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 5000 );
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
- final long cacheTimeout = 2000;
+ final long cacheTimeout = 1000;
//set our cache timeout to the above value
ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
@@ -128,7 +128,7 @@ public class GraphManagerShardConsistencyIT {
ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
- //get the system property of the UUID to use. If one is not set, use the defualt
+ // get the system property of the UUID to use. If one is not set, use the defualt
String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
@@ -196,8 +196,7 @@ public class GraphManagerShardConsistencyIT {
};
- //final int numInjectors = 2;
- final int numInjectors = 1;
+ final int numInjectors = 2;
/**
* create injectors. This way all the caches are independent of one another. This is the same as
@@ -280,7 +279,7 @@ public class GraphManagerShardConsistencyIT {
final List<Throwable> failures = new ArrayList<>();
Thread.sleep(3000); // let's make sure everything is written
- for(int i = 0; i < 1; i ++) {
+ for(int i = 0; i < 2; i ++) {
/**
@@ -312,7 +311,7 @@ public class GraphManagerShardConsistencyIT {
int compactedCount;
- //now start our readers
+ // now start the compaction watcher
while ( true ) {
@@ -336,10 +335,10 @@ public class GraphManagerShardConsistencyIT {
fail( builder.toString() );
}
- //reset our count. Ultimately we'll have 4 groups once our compaction completes
+ // reset our count. Ultimately we'll have 4 groups once our compaction completes
compactedCount = 0;
- //we have to get it from the cache, because this will trigger the compaction process
+ // we have to get it from the cache, because this will trigger the compaction process
final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
final Set<ShardEntryGroup> shardEntryGroups = new HashSet<>();
@@ -433,7 +432,7 @@ public class GraphManagerShardConsistencyIT {
};
- final int numInjectors = 1;
+ final int numInjectors = 2;
/**
* create injectors. This way all the caches are independent of one another. This is the same as
@@ -498,12 +497,11 @@ public class GraphManagerShardConsistencyIT {
future.get();
}
- //now get all our shards
+ // now get all our shards
final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType );
- //now submit the readers.
final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
@@ -527,7 +525,7 @@ public class GraphManagerShardConsistencyIT {
}
- logger.info( "found {} shard groups", shardCount );
+ logger.info( "Found {} shard groups", shardCount );
//now mark and delete all the edges
@@ -543,6 +541,7 @@ public class GraphManagerShardConsistencyIT {
long totalDeleted = 0;
+ // now do the deletes
while(count != 0) {
logger.info("total deleted: {}", totalDeleted);
@@ -565,7 +564,7 @@ public class GraphManagerShardConsistencyIT {
}
- //now loop until with a reader until our shards are gone
+ // loop with a reader until our shards are gone
/**
@@ -582,7 +581,7 @@ public class GraphManagerShardConsistencyIT {
@Override
public void onSuccess( @Nullable final Long result ) {
logger.info( "Successfully ran the read, re-running" );
- deleteExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+ deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
}
@@ -593,9 +592,9 @@ public class GraphManagerShardConsistencyIT {
}
} );
+ Thread.sleep(3000); // let the edge readers start
- //now start our readers
-
+ // now loop check the shard count
while ( true ) {
if ( !failures.isEmpty() ) {
@@ -647,9 +646,12 @@ public class GraphManagerShardConsistencyIT {
Thread.sleep( 2000 );
}
- //now that we have finished expanding s
+ future.cancel(true); // stop the read future
+ //now that we have finished deleting and shards are removed, shutdown
deleteExecutor.shutdownNow();
+
+ Thread.sleep( 3000 ); // sleep before the next test
}
@@ -695,7 +697,7 @@ public class GraphManagerShardConsistencyIT {
if ( i % 100 == 0 ) {
- logger.info( Thread.currentThread().getName()+" wrote: " + i );
+ logger.info( "wrote: " + i );
}
}
@@ -741,7 +743,7 @@ public class GraphManagerShardConsistencyIT {
logger.info( "Completed reading {} edges", returnedEdgeCount );
if ( writeCount != returnedEdgeCount ) {
- logger.warn( Thread.currentThread().getName()+" - Unexpected edge count returned!!! Expected {} but was {}", writeCount,
+ logger.warn( "Unexpected edge count returned!!! Expected {} but was {}", writeCount,
returnedEdgeCount );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 5afc288..e7f7524 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -42,4 +42,5 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE
[08/20] usergrid git commit: Reset original multi-row column iterator
and use a new class for the smart iterating over edge shards. Change to
system time for 'last write wins' in cassandra instead of a shard's
'createdTime'.
Posted by mr...@apache.org.
Reset original multi-row column iterator and use a new class for the smart iterating over edge shards. Change to system time for 'last write wins' in cassandra instead of a shard's 'createdTime'.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bec50939
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bec50939
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bec50939
Branch: refs/heads/release-2.1.1
Commit: bec5093978175c87b7d76f66c8a503f062275ead
Parents: 58ae197
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Mar 20 17:49:10 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Mar 20 17:49:10 2016 -0700
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 346 ++++++--------
.../astyanax/MultiRowShardColumnIterator.java | 455 +++++++++++++++++++
.../shard/impl/EdgeShardSerializationImpl.java | 14 +-
.../shard/impl/ShardGroupCompactionImpl.java | 99 ++--
.../impl/shard/impl/ShardsColumnIterator.java | 3 +-
.../graph/GraphManagerShardConsistencyIT.java | 3 +-
.../impl/shard/EdgeShardSerializationTest.java | 12 +-
7 files changed, 665 insertions(+), 267 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 6049c1f..c071d53 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -20,9 +20,14 @@
package org.apache.usergrid.persistence.core.astyanax;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
-import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,18 +77,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
private Iterator<T> currentColumnIterator;
- private Iterator<SmartShard> currentShardIterator;
-
- private List<SmartShard> rowKeysWithShardEnd;
-
- private SmartShard currentShard;
-
- private List<T> resultsTracking;
-
- private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
-
- private boolean ascending = false;
-
/**
* Remove after finding bug
@@ -115,63 +108,18 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.moreToReturn = true;
- this.resultsTracking = new ArrayList<>();
-
- }
-
- // temporarily use a new constructor for specific searches until we update each caller of this class
- public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
- final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
- final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
- final Collection<R> rowKeys, final int pageSize,
- final List<SmartShard> rowKeysWithShardEnd,
- final boolean ascending) {
- this.cf = cf;
- this.pageSize = pageSize;
- this.columnParser = columnParser;
- this.columnSearch = columnSearch;
- this.comparator = comparator;
- this.rowKeys = rowKeys;
- this.keyspace = keyspace;
- this.consistencyLevel = consistencyLevel;
- this.moreToReturn = true;
- this.rowKeysWithShardEnd = rowKeysWithShardEnd;
- this.resultsTracking = new ArrayList<>();
- this.ascending = ascending;
+ // seenResults = new HashMap<>( pageSize * 10 );
}
@Override
public boolean hasNext() {
- // if column iterator is null, initialize with first call to advance()
- // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c*
if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
advance();
}
- // when there are no more columns, nothing reported to return, but more shards available, go to the next shard
- if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
- !moreToReturn && currentShardIterator.hasNext()){
-
- if(logger.isTraceEnabled()){
- logger.trace("Advancing shard iterator");
- logger.trace("Shard before advance: {}", currentShard);
- }
-
-
- // advance to the next shard
- currentShard = currentShardIterator.next();
-
- if(logger.isTraceEnabled()){
- logger.trace("Shard after advance: {}", currentShard);
-
- }
-
- advance();
-
- }
return currentColumnIterator.hasNext();
}
@@ -198,6 +146,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
public void advance() {
+
if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
/**
@@ -206,130 +155,32 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final boolean skipFirstColumn = startColumn != null;
- final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
-
- final RangeBuilder rangeBuilder = new RangeBuilder();
-
-
- if(currentShardIterator == null){
-
- // flip the order of our shards if ascending
- if(ascending){
- Collections.reverse(rowKeysWithShardEnd);
- }
-
- currentShardIterator = rowKeysWithShardEnd.iterator();
-
- }
-
- if(currentShard == null){
-
- if(logger.isTraceEnabled()){
- logger.trace("currentShard: {}", currentShard);
- }
-
- currentShard = currentShardIterator.next();
-
- if(logger.isTraceEnabled()){
- logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
- logger.trace("initializing iterator with shard: {}", currentShard);
- }
-
-
- }
+ final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
+ final RangeBuilder rangeBuilder = new RangeBuilder();
- // initial request, build the range with no start and no end
- if ( startColumn == null && currentShard.getShardEnd() == null ){
+ //set the range into the search
+ if ( startColumn == null ) {
columnSearch.buildRange( rangeBuilder );
-
- if(logger.isTraceEnabled()){
- logger.trace("initial search (no start or shard end)");
- }
-
}
- // if there's only a startColumn set the range start startColumn always
- else if ( startColumn != null && currentShard.getShardEnd() == null ){
-
+ else {
columnSearch.buildRange( rangeBuilder, startColumn, null );
-
- if(logger.isTraceEnabled()){
- logger.trace("search (no shard end) with start: {}", startColumn);
- }
-
- }
- // if there's only a shardEnd, set the start/end according based on the search order
- else if ( startColumn == null && currentShard.getShardEnd() != null ){
-
- T shardEnd = (T) currentShard.getShardEnd();
-
- // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
- if(!ascending) {
-
- columnSearch.buildRange(rangeBuilder, shardEnd, null);
-
- if(logger.isTraceEnabled()){
- logger.trace("search descending with start: {}", shardEnd);
- }
-
- }
- // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
- else{
-
- columnSearch.buildRange( rangeBuilder, null, shardEnd );
-
- if(logger.isTraceEnabled()){
- logger.trace("search ascending with end: {}", shardEnd);
- }
-
- }
-
}
- // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
- else if ( startColumn != null && currentShard.getShardEnd() != null) {
- T shardEnd = (T) currentShard.getShardEnd();
-
-
- // if the search is not ascending, set the start to be the older edge
- if(!ascending){
-
- T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
- columnSearch.buildRange( rangeBuilder, searchStart, null);
-
- if(logger.isTraceEnabled()){
- logger.trace("search descending with start: {} in shard", searchStart, currentShard);
- }
-
- }
- // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
- else{
-
- columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
-
- if(logger.isTraceEnabled()){
- logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
- }
-
-
-
- }
-
- }
rangeBuilder.setLimit( selectSize );
- if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
+ if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" );
/**
* Get our list of slices
*/
final RowSliceQuery<R, C> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
.withColumnRange( rangeBuilder.build() );
final Rows<R, C> result;
@@ -341,43 +192,36 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
+ //now aggregate them together
+ //this is an optimization. It's faster to see if we only have values for one row,
+ // then return the iterator of those columns than
+ //do a merge if only one row has data.
- final List<T> mergedResults;
-
- skipSize = 0;
- mergedResults = processResults( result, selectSize );
+ final List<T> mergedResults;
- if(logger.isTraceEnabled()){
- logger.trace("skipped amount: {}", skipSize);
+ if ( containsSingleRowOnly( result ) ) {
+ mergedResults = singleRowResult( result );
+ }
+ else {
+ mergedResults = mergeResults( result, selectSize );
}
- final int size = mergedResults.size();
+ //we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another
+ //trip back to cassandra
- if(logger.isTraceEnabled()){
- logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
- logger.trace("selectSize={}, size={}, ", selectSize, size);
+ //discard our first element (maybe)
- }
-
- moreToReturn = size == selectSize;
-
- if(selectSize == 1001 && mergedResults.size() == 1000){
- moreToReturn = true;
- }
+ final int size = mergedResults.size();
- // if a whole page is skipped OR the result size equals the the difference of what's skipped,
- // it is likely during a shard transition and we should assume there is more to read
- if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
- moreToReturn = true;
- }
+ moreToReturn = size == selectSize;
//we have a first column to to check
if( size > 0) {
@@ -386,53 +230,93 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//The search has either told us to skip the first element, or it matches our last, therefore we disregard it
if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
- if(logger.isTraceEnabled()){
- logger.trace("removing an entry");
-
- }
mergedResults.remove( 0 );
}
}
- // set the start column for the enxt query
if(moreToReturn && mergedResults.size() > 0){
startColumn = mergedResults.get( mergedResults.size() - 1 );
-
}
currentColumnIterator = mergedResults.iterator();
+ if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
+ }
+
+
+ /**
+ * Return true if we have < 2 rows with columns, false otherwise
+ */
+ private boolean containsSingleRowOnly( final Rows<R, C> result ) {
- //force an advance of this iterator when there are still shards to read but result set on current shard is 0
- if(size == 0 && currentShardIterator.hasNext()){
- hasNext();
+ int count = 0;
+
+ for ( R key : result.getKeys() ) {
+ if ( result.getRow( key ).getColumns().size() > 0 ) {
+ count++;
+
+ //we have more than 1 row with values, return them
+ if ( count > 1 ) {
+ return false;
+ }
+ }
}
- if(logger.isTraceEnabled()){
- logger.trace("currentColumnIterator.hasNext()={}, " +
- "moreToReturn={}, currentShardIterator.hasNext()={}",
- currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
- }
+ return true;
+ }
+
+
+ /**
+ * A single row is present, only parse the single row
+ * @param result
+ * @return
+ */
+ private List<T> singleRowResult( final Rows<R, C> result ) {
+
+ if (logger.isTraceEnabled()) logger.trace( "Only a single row has columns. Parsing directly" );
+
+ for ( R key : result.getKeys() ) {
+ final ColumnList<C> columnList = result.getRow( key ).getColumns();
+
+ final int size = columnList.size();
+
+ if ( size > 0 ) {
+
+ final List<T> results = new ArrayList<>(size);
+
+ for(Column<C> column: columnList){
+ results.add(columnParser.parseColumn( column ));
+ }
+
+ return results;
+ }
+ }
+
+ //we didn't have any results, just return nothing
+ return Collections.<T>emptyList();
}
/**
- * Process the result set and filter any duplicates that may have already been seen in previous shards. During
- * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for
- * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
- *
+ * Multiple rows are present, merge them into a single result set
* @param result
* @return
*/
- private List<T> processResults(final Rows<R, C> result, final int maxSize ) {
+ private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
+
+ if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns. Merging" );
+
final List<T> mergedResults = new ArrayList<>(maxSize);
+
+
+
for ( final R key : result.getKeys() ) {
final ColumnList<C> columns = result.getRow( key ).getColumns();
@@ -441,24 +325,52 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
final T returnedValue = columnParser.parseColumn( column );
- // use an O(log n) search, same as a tree, but with fast access to indexes for later operations
- int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
+ //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
+ int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator );
+
+ /**
+ * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
+ * you'll
+ * need to enable this
+ */
+ //
+ // if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) {
+ // throw new RuntimeException( String.format(
+ // "Cassandra returned 2 unique columns,
+ // but your comparator marked them as equal. This " +
+ // "indicates a bug in your comparator. Previous value was %s and
+ // current value is " +
+ // "%s",
+ // previous, returnedValue ) );
+ // }
+ //
+ // previous = returnedValue;
+
+ //we've already seen it, no-op
+ if(searchIndex > -1){
+ continue;
+ }
+ final int insertIndex = (searchIndex+1)*-1;
- //we've already seen the column, filter it out as we might be in a shard transition or our start column
- if(searchIndex > -1){
- if(logger.isTraceEnabled()){
- logger.trace("skipping column as it was already retrieved before");
- }
- skipSize++;
+ //it's at the end of the list, don't bother inserting just to remove it
+ if(insertIndex >= maxSize){
continue;
}
+ if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+
+ mergedResults.add( insertIndex, returnedValue );
- resultsTracking.add(returnedValue);
- mergedResults.add(returnedValue );
+ //prune the mergedResults
+ while ( mergedResults.size() > maxSize ) {
+ if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
+
+ //just remove from our tail until the size falls to the correct value
+ mergedResults.remove(mergedResults.size()-1);
+ }
}
if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
@@ -467,6 +379,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
return mergedResults;
}
-}
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
new file mode 100644
index 0000000..bfc04c4
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.*;
+
+import org.apache.usergrid.persistence.core.shard.SmartShard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ *
+ */
+public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+
+ private final int pageSize;
+
+ private final ColumnFamily<R, C> cf;
+
+
+ private final ColumnParser<C, T> columnParser;
+
+ private final ColumnSearch<T> columnSearch;
+
+ private final Comparator<T> comparator;
+
+
+ private final Collection<R> rowKeys;
+
+ private final Keyspace keyspace;
+
+ private final ConsistencyLevel consistencyLevel;
+
+
+ private T startColumn;
+
+
+ private boolean moreToReturn;
+
+
+ private Iterator<T> currentColumnIterator;
+
+ private Iterator<SmartShard> currentShardIterator;
+
+ private List<SmartShard> rowKeysWithShardEnd;
+
+ private SmartShard currentShard;
+
+ private List<T> resultsTracking;
+
+ private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
+
+ private boolean ascending = false;
+
+
+ /**
+ * Remove after finding bug
+ */
+
+
+ // private int advanceCount;
+ //
+ // private final HashMap<T, SeekPosition> seenResults;
+
+ /**
+ * Complete Remove
+ */
+
+
+ /**
+ * Create the iterator
+ */
+ // temporarily use a new constructor for specific searches until we update each caller of this class
+ public MultiRowShardColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
+ final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+ final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+ final Collection<R> rowKeys, final int pageSize,
+ final List<SmartShard> rowKeysWithShardEnd,
+ final boolean ascending) {
+ this.cf = cf;
+ this.pageSize = pageSize;
+ this.columnParser = columnParser;
+ this.columnSearch = columnSearch;
+ this.comparator = comparator;
+ this.rowKeys = rowKeys;
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.moreToReturn = true;
+ this.rowKeysWithShardEnd = rowKeysWithShardEnd;
+ this.resultsTracking = new ArrayList<>();
+ this.ascending = ascending;
+
+ }
+
+
+ @Override
+ public boolean hasNext() {
+
+ // if column iterator is null, initialize with first call to advance()
+ // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c*
+ if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
+ advance();
+ }
+
+ // when there are no more columns, nothing reported to return, but more shards available, go to the next shard
+ if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
+ !moreToReturn && currentShardIterator.hasNext()){
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Advancing shard iterator");
+ logger.trace("Shard before advance: {}", currentShard);
+ }
+
+
+ // advance to the next shard
+ currentShard = currentShardIterator.next();
+
+ if(logger.isTraceEnabled()){
+ logger.trace("Shard after advance: {}", currentShard);
+
+ }
+
+ advance();
+
+ }
+
+ return currentColumnIterator.hasNext();
+ }
+
+
+ @Override
+ public T next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No new element exists" );
+ }
+
+ final T next = currentColumnIterator.next();
+
+
+ return next;
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported this is a read only iterator" );
+ }
+
+
+ public void advance() {
+
+ if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
+
+ /**
+ * If the edge is present, we need to being seeking from this
+ */
+
+ final boolean skipFirstColumn = startColumn != null;
+
+ final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
+
+ final RangeBuilder rangeBuilder = new RangeBuilder();
+
+
+
+
+ if(currentShardIterator == null){
+
+ // flip the order of our shards if ascending
+ if(ascending){
+ Collections.reverse(rowKeysWithShardEnd);
+ }
+
+ currentShardIterator = rowKeysWithShardEnd.iterator();
+
+ }
+
+ if(currentShard == null){
+
+ if(logger.isTraceEnabled()){
+ logger.trace("currentShard: {}", currentShard);
+ }
+
+ currentShard = currentShardIterator.next();
+
+ if(logger.isTraceEnabled()){
+ logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
+ logger.trace("initializing iterator with shard: {}", currentShard);
+ }
+
+
+ }
+
+
+
+ // initial request, build the range with no start and no end
+ if ( startColumn == null && currentShard.getShardEnd() == null ){
+
+ columnSearch.buildRange( rangeBuilder );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("initial search (no start or shard end)");
+ }
+
+ }
+ // if there's only a startColumn set the range start startColumn always
+ else if ( startColumn != null && currentShard.getShardEnd() == null ){
+
+ columnSearch.buildRange( rangeBuilder, startColumn, null );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search (no shard end) with start: {}", startColumn);
+ }
+
+ }
+ // if there's only a shardEnd, set the start/end according based on the search order
+ else if ( startColumn == null && currentShard.getShardEnd() != null ){
+
+ T shardEnd = (T) currentShard.getShardEnd();
+
+ // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
+ if(!ascending) {
+
+ columnSearch.buildRange(rangeBuilder, shardEnd, null);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search descending with start: {}", shardEnd);
+ }
+
+ }
+ // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
+ else{
+
+ columnSearch.buildRange( rangeBuilder, null, shardEnd );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search ascending with end: {}", shardEnd);
+ }
+
+ }
+
+ }
+ // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
+ else if ( startColumn != null && currentShard.getShardEnd() != null) {
+
+ T shardEnd = (T) currentShard.getShardEnd();
+
+
+ // if the search is not ascending, set the start to be the older edge
+ if(!ascending){
+
+ T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
+ columnSearch.buildRange( rangeBuilder, searchStart, null);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search descending with start: {} in shard", searchStart, currentShard);
+ }
+
+ }
+ // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
+ else{
+
+ columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
+
+ if(logger.isTraceEnabled()){
+ logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
+ }
+
+
+
+ }
+
+ }
+
+ rangeBuilder.setLimit( selectSize );
+
+ if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
+
+ /**
+ * Get our list of slices
+ */
+ final RowSliceQuery<R, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+ .withColumnRange( rangeBuilder.build() );
+
+ final Rows<R, C> result;
+ try {
+ result = query.execute().getResult();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to connect to casandra", e );
+ }
+
+
+
+
+ final List<T> mergedResults;
+
+ skipSize = 0;
+
+ mergedResults = processResults( result, selectSize );
+
+ if(logger.isTraceEnabled()){
+ logger.trace("skipped amount: {}", skipSize);
+ }
+
+
+
+ final int size = mergedResults.size();
+
+
+
+ if(logger.isTraceEnabled()){
+ logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
+ logger.trace("selectSize={}, size={}, ", selectSize, size);
+
+
+ }
+
+ moreToReturn = size == selectSize;
+
+ if(selectSize == 1001 && mergedResults.size() == 1000){
+ moreToReturn = true;
+ }
+
+
+ // if a whole page is skipped OR the result size equals the the difference of what's skipped,
+ // it is likely during a shard transition and we should assume there is more to read
+ if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
+ moreToReturn = true;
+ }
+
+ //we have a first column to to check
+ if( size > 0) {
+
+ final T firstResult = mergedResults.get( 0 );
+
+ //The search has either told us to skip the first element, or it matches our last, therefore we disregard it
+ if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
+ if(logger.isTraceEnabled()){
+ logger.trace("removing an entry");
+
+ }
+ mergedResults.remove( 0 );
+ }
+
+ }
+
+
+ // set the start column for the enxt query
+ if(moreToReturn && mergedResults.size() > 0){
+ startColumn = mergedResults.get( mergedResults.size() - 1 );
+
+ }
+
+
+ currentColumnIterator = mergedResults.iterator();
+
+
+ //force an advance of this iterator when there are still shards to read but result set on current shard is 0
+ if(size == 0 && currentShardIterator.hasNext()){
+ hasNext();
+ }
+
+ if(logger.isTraceEnabled()){
+ logger.trace("currentColumnIterator.hasNext()={}, " +
+ "moreToReturn={}, currentShardIterator.hasNext()={}",
+ currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
+ }
+
+
+ }
+
+
+ /**
+ * Process the result set and filter any duplicates that may have already been seen in previous shards. During
+ * a shard transition, there could be the same columns in multiple shards (rows). This will also allow for
+ * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
+ *
+ * @param result
+ * @return
+ */
+ private List<T> processResults(final Rows<R, C> result, final int maxSize ) {
+
+ final List<T> mergedResults = new ArrayList<>(maxSize);
+
+ for ( final R key : result.getKeys() ) {
+ final ColumnList<C> columns = result.getRow( key ).getColumns();
+
+
+ for (final Column<C> column :columns ) {
+
+ final T returnedValue = columnParser.parseColumn( column );
+
+ // use an O(log n) search, same as a tree, but with fast access to indexes for later operations
+ int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
+
+
+ //we've already seen the column, filter it out as we might be in a shard transition or our start column
+ if(searchIndex > -1){
+ if(logger.isTraceEnabled()){
+ logger.trace("skipping column as it was already retrieved before");
+ }
+ skipSize++;
+ continue;
+ }
+
+
+ resultsTracking.add(returnedValue);
+ mergedResults.add(returnedValue );
+
+
+ }
+
+ if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
+
+ }
+ return mergedResults;
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index d22f472..5eeeae0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -107,8 +107,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
- .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard));
+ // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta
+ long batchTimestamp = System.currentTimeMillis();
+
+ batch.withTimestamp( batchTimestamp ).withRow( EDGE_SHARDS, rowKey )
+ .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard)).setTimestamp(batchTimestamp);
return batch;
}
@@ -163,8 +166,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final MutationBatch batch = keyspace.prepareMutationBatch();
- batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey )
- .deleteColumn( shard.getShardIndex() );
+ // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta
+ long batchTimestamp = System.currentTimeMillis();
+
+ batch.withTimestamp(batchTimestamp).withRow( EDGE_SHARDS, rowKey )
+ .deleteColumn( shard.getShardIndex() ).setTimestamp(batchTimestamp);
return batch;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 8728c6c..e63db46 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -161,17 +161,22 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final int maxWorkSize = graphFig.getScanPageSize();
- final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
- final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
- final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
+
/**
* As we move edges, we want to keep track of it
*/
- long edgeCount = 0;
+ long totalEdgeCount = 0;
for ( Shard sourceShard : sourceShards ) {
+
+ final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
+ final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
+ final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
+
+ long edgeCount = 0;
+
Iterator<MarkedEdge> edges = edgeMeta
.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
@@ -183,6 +188,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final long edgeTimestamp = edge.getTimestamp();
+ shardEnd = edge;
/**
* The edge is within a different shard, break
@@ -203,6 +209,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
edgeCount++;
+
// if we're at our count, execute the mutation of writing the edges to the new row, then remove them
// from the old rows
if ( edgeCount % maxWorkSize == 0 ) {
@@ -214,15 +221,15 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
// write the edges into the new shard atomically so we know they all succeed
newRowBatch.withAtomicBatch(true).execute();
- // set the shardEnd after the write is known to be successful
- shardEnd = edge;
// Update the shard end after each batch so any reads during transition stay as close to current
sourceShard.setShardEnd(
Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
);
- logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, edgeMeta.getNodes(), shardEnd );
+ if(logger.isTraceEnabled()) {
+ logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
+ }
updateShardMetaBatch.mergeShallow(
edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
@@ -231,74 +238,85 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
// on purpose block this thread before deleting the old edges to be sure there are no gaps
// duplicates are filtered on graph seeking so this is OK
Thread.sleep(1000);
- logger.info("Deleting batch of {} from old shard", maxWorkSize);
- deleteRowBatch.execute();
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Deleting batch of {} from old shard", maxWorkSize);
+ }
+ deleteRowBatch.withAtomicBatch(true).execute();
+
+ updateShardMetaBatch.execute();
}
catch ( Throwable t ) {
logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
}
- }else {
-
- shardEnd = edge;
+ totalEdgeCount += edgeCount;
+ edgeCount = 0;
}
}
- if (shardEnd != null && edgeCount > 0){
+ totalEdgeCount += edgeCount;
- sourceShard.setShardEnd(
- Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
- );
+ try {
- logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, shardEnd );
- updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
- }
+ // write the edges into the new shard atomically so we know they all succeed
+ newRowBatch.withAtomicBatch(true).execute();
- }
+ // on purpose block this thread before deleting the old edges to be sure there are no gaps
+ // duplicates are filtered on graph seeking so this is OK
+ Thread.sleep(1000);
+ if(logger.isTraceEnabled()) {
+ logger.trace("Deleting remaining {} edges from old shard", edgeCount);
+ }
+ deleteRowBatch.withAtomicBatch(true).execute();
+ if (shardEnd != null){
+ sourceShard.setShardEnd(
+ Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+ );
- try {
+ if(logger.isTraceEnabled()) {
+ logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
+ }
+ updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+ updateShardMetaBatch.execute();
+ }
- // write the edges into the new shard atomically so we know they all succeed
- newRowBatch.withAtomicBatch(true).execute();
- // on purpose block this thread before deleting the old edges to be sure there are no gaps
- // duplicates are filtered on graph seeking so this is OK
- Thread.sleep(1000);
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to move edges to target shard {}", targetShard );
+ }
- logger.info("Deleting remaining edges from old shard");
- deleteRowBatch.execute();
- // now update with our shard end
- updateShardMetaBatch.execute();
}
- catch ( Throwable t ) {
- logger.error( "Unable to move edges to target shard {}", targetShard );
- }
+
+
if (logger.isTraceEnabled()) {
- logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+ logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
}
- logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+
+ logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
- resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
+ resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
* We didn't move anything this pass, mark the shard as compacted. If we move something,
* it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
- if ( edgeCount == 0 ) {
+ if ( totalEdgeCount == 0 ) {
//now that we've marked our target as compacted, we can successfully remove any shards that are not
@@ -329,11 +347,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
-
- logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", targetShard );
-
//Overwrite our shard index with a newly created one that has been marked as compacted
Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+ compactedShard.setShardEnd(targetShard.getShardEnd());
+
+ logger.info( "Shard has been fully compacted. Marking shard {} as compacted in Cassandra", compactedShard );
+
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index e609d33..e2dd549 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.*;
+import org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator;
import org.apache.usergrid.persistence.core.shard.SmartShard;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.slf4j.Logger;
@@ -136,7 +137,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
logger.trace("Searching with row keys {}", rowKeys);
}
- currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher,
+ currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf, consistencyLevel, searcher, searcher,
searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending);
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 9e6996d..e7027f4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -82,6 +82,7 @@ import static org.junit.Assert.fail;
public class GraphManagerShardConsistencyIT {
private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
+
private static final MetricRegistry registry = new MetricRegistry();
private static final Meter writeMeter = registry.meter( "writeThroughput" );
@@ -102,7 +103,7 @@ public class GraphManagerShardConsistencyIT {
protected ListeningExecutorService deleteExecutor;
- protected int TARGET_NUM_SHARDS = 6;
+ protected int TARGET_NUM_SHARDS = 5;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 1f8bfa9..145aa03 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -209,7 +209,7 @@ public class EdgeShardSerializationTest {
}
@Test
- public void sameShardIndexRemoval() throws ConnectionException {
+ public void testShardDelete() throws ConnectionException {
final Id now = IdGenerator.createId( "test" );
@@ -217,11 +217,15 @@ public class EdgeShardSerializationTest {
final Shard shard1 = new Shard( 1000L, timestamp, false );
final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true );
+ final Shard shard3 = new Shard( shard2.getShardIndex() * 2, timestamp * 3, true );
+
final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" );
MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta );
batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) );
+ batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard3, sourceEdgeMeta ) );
+
batch.execute();
@@ -229,16 +233,16 @@ public class EdgeShardSerializationTest {
edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
// Latest timestamp comes first
- assertEquals( shard2, results.next() );
+ assertEquals( shard3, results.next() );
// This should now not remove anything
- edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute();
+ edgeShardSerialization.removeShardMeta( scope, shard3, sourceEdgeMeta ).execute();
// Get iterator again
results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
- // We should still have shard3 stored
+ // We should still have shard2 stored
assertEquals( shard2, results.next() );
[10/20] usergrid git commit: Add a timeout to one the graph shard
tests so it can't run forever in the event that it is failing.
Posted by mr...@apache.org.
Add a timeout to one the graph shard tests so it can't run forever in the event that it is failing.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/11d8a0ed
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/11d8a0ed
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/11d8a0ed
Branch: refs/heads/release-2.1.1
Commit: 11d8a0edebd63b05314c711b457cc03e4a477599
Parents: bd77672
Author: Michael Russo <mr...@apigee.com>
Authored: Sun Mar 20 19:18:44 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sun Mar 20 19:18:44 2016 -0700
----------------------------------------------------------------------
.../persistence/graph/GraphManagerShardConsistencyIT.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/11d8a0ed/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index e7027f4..1ce23b9 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -171,7 +171,7 @@ public class GraphManagerShardConsistencyIT {
deleteExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size ) );
}
- @Test
+ @Test(timeout=300000) // set a timeout so this doesn't run forever in the event that it is failing
public void writeThousandsSingleSource()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
@@ -279,7 +279,7 @@ public class GraphManagerShardConsistencyIT {
final List<Throwable> failures = new ArrayList<>();
Thread.sleep(3000); // let's make sure everything is written
- for(int i = 0; i < 1; i ++) {
+ for(int i = 0; i < 2; i ++) {
/**
@@ -642,8 +642,6 @@ public class GraphManagerShardConsistencyIT {
Thread.sleep( 2000 );
}
- future.cancel(true); // stop the read future
-
//now that we have finished deleting and shards are removed, shutdown
deleteExecutor.shutdownNow();
[17/20] usergrid git commit: Fix issue where new shards were not
picked up after new shards are allocated.
Posted by mr...@apache.org.
Fix issue where new shards were not picked up after new shards are allocated.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/97719684
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/97719684
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/97719684
Branch: refs/heads/release-2.1.1
Commit: 9771968408658e8ce4c14db305bc10fc173d65d8
Parents: 85cd12d
Author: Michael Russo <mr...@apigee.com>
Authored: Tue Mar 22 18:16:44 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Tue Mar 22 18:16:44 2016 -0700
----------------------------------------------------------------------
.../astyanax/MultiRowShardColumnIterator.java | 2 +-
.../impl/shard/NodeShardCache.java | 7 +++-
.../shard/impl/EdgeShardSerializationImpl.java | 1 +
.../shard/impl/NodeShardAllocationImpl.java | 23 +++++-----
.../impl/shard/impl/NodeShardCacheImpl.java | 11 +++++
.../shard/impl/ShardGroupCompactionImpl.java | 23 +++++-----
.../graph/GraphManagerShardConsistencyIT.java | 28 +++++++++----
.../impl/shard/NodeShardAllocationTest.java | 44 ++++++++++++++++----
.../impl/shard/ShardGroupCompactionTest.java | 4 +-
.../graph/src/test/resources/log4j.properties | 4 +-
10 files changed, 103 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
index b13d0f5..86e3b4d 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -43,7 +43,7 @@ import com.netflix.astyanax.util.RangeBuilder;
*/
public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
- private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+ private static final Logger logger = LoggerFactory.getLogger( MultiRowShardColumnIterator.class );
private final int pageSize;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 173b89d..23c2c25 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -38,7 +38,7 @@ public interface NodeShardCache {
* @param timestamp The time to select the slice for.
* @param directedEdgeMeta The directed edge meta data
*/
- public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
+ ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
/**
@@ -49,6 +49,9 @@ public interface NodeShardCache {
* @param directedEdgeMeta The directed edge meta data
* @return
*/
- public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
+ Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
+
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 5eeeae0..76a0922 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6b190a1..a6cf378 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.Collections;
import java.util.Iterator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,14 +34,6 @@ import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
@@ -65,19 +58,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private final TimeService timeService;
private final GraphFig graphFig;
private final ShardGroupCompaction shardGroupCompaction;
+ private final NodeShardCache nodeShardCache;
@Inject
public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
final EdgeColumnFamilies edgeColumnFamilies,
final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
- final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
+ final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction,
+ final NodeShardCache nodeShardCache) {
this.edgeShardSerialization = edgeShardSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
this.timeService = timeService;
this.graphFig = graphFig;
this.shardGroupCompaction = shardGroupCompaction;
+ this.nodeShardCache = nodeShardCache;
}
@@ -101,7 +97,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
//logger.info("existing shards has something: {}", existingShards.hasNext());
/**
- * We didn't get anything out of cassandra, so we need to create the minumum shard
+ * We didn't get anything out of cassandra, so we need to create the minimum shard
*/
if ( existingShards == null || !existingShards.hasNext() ) {
@@ -250,6 +246,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
try {
batch.execute();
+
+ if(logger.isTraceEnabled()) {
+ logger.trace("Clearing shard cache");
+ }
+
+ // invalidate the shard cache so we can be sure that all read shards are up to date
+ nodeShardCache.invalidate();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to casandra", e );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 1a88ebb..5eaaaa0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -34,6 +34,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +69,7 @@ import com.google.inject.Inject;
* Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the
* shard, it will need to be searched via cassandra.
*/
+@Singleton
public class NodeShardCacheImpl implements NodeShardCache {
private static final Logger logger = LoggerFactory.getLogger( NodeShardCacheImpl.class );
@@ -171,6 +173,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
throw new GraphRuntimeException( "Unable to load shard key for graph", e );
}
+ // do this if wanting to bypass the cache for getting the read shards
+ //entry = new CacheEntry(nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta ));
+
Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
if ( iterator == null ) {
@@ -180,6 +185,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
return iterator;
}
+ @Override
+ public void invalidate(){
+
+ graphs.invalidateAll();
+
+ }
/**
* This is a race condition. We could re-init the shard while another thread is reading it. This is fine, the read
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index b88c52c..7854c3b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -27,15 +27,11 @@ import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import com.google.common.base.Optional;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,8 +58,6 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import rx.Observable;
-import rx.schedulers.Schedulers;
/**
@@ -94,6 +88,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
private final Random random;
private final ShardCompactionTaskTracker shardCompactionTaskTracker;
private final ShardAuditTaskTracker shardAuditTaskTracker;
+ private final NodeShardCache nodeShardCache;
@Inject
@@ -102,7 +97,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
final EdgeShardSerialization edgeShardSerialization,
- final AsyncTaskExecutor asyncTaskExecutor) {
+ final AsyncTaskExecutor asyncTaskExecutor,
+ final NodeShardCache nodeShardCache ) {
this.timeService = timeService;
this.countAudits = new AtomicLong();
@@ -119,6 +115,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.taskExecutor = asyncTaskExecutor.getExecutorService();
+ this.nodeShardCache = nodeShardCache;
}
@@ -319,7 +316,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
if ( totalEdgeCount == 0 ) {
- //now that we've marked our target as compacted, we can successfully remove any shards that are not
+ // now that we've marked our target as compacted, we can successfully remove any shards that are not
// compacted themselves in the sources
final MutationBatch shardRemovalRollup = keyspace.prepareMutationBatch();
@@ -342,6 +339,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
try {
shardRemovalRollup.execute();
+
+ // invalidate the shard cache so we can be sure that all read shards are up to date
+ nodeShardCache.invalidate();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
@@ -357,6 +357,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
try {
updateMark.execute();
+
+ // invalidate the shard cache so we can be sure that all read shards are up to date
+ nodeShardCache.invalidate();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to cassandra", e );
@@ -598,15 +601,11 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
addToHash( hasher, scope.getApplication() );
- /** Commenting the full meta from the hash so we allocate/compact shards in a more controlled fashion
-
for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
addToHash( hasher, nodeMeta.getId() );
hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
}
- **/
-
/**
* Add our edge type
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 1ce23b9..652c8d6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -105,6 +105,8 @@ public class GraphManagerShardConsistencyIT {
protected int TARGET_NUM_SHARDS = 5;
+ protected int POST_WRITE_SLEEP = 2000;
+
@Before
@@ -175,7 +177,7 @@ public class GraphManagerShardConsistencyIT {
public void writeThousandsSingleSource()
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
- final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString() );
+ final Id sourceId = IdGenerator.createId( "sourceWrite" );
final String edgeType = "testWrite";
final EdgeGenerator generator = new EdgeGenerator() {
@@ -183,7 +185,7 @@ public class GraphManagerShardConsistencyIT {
@Override
public Edge newEdge() {
- Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+ UUIDGenerator.newTimeUUID().toString() ) );
+ Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite" ) );
return edge;
@@ -199,7 +201,7 @@ public class GraphManagerShardConsistencyIT {
};
- final int numInjectors = 1;
+ final int numInjectors = 2;
/**
* create injectors. This way all the caches are independent of one another. This is the same as
@@ -277,9 +279,11 @@ public class GraphManagerShardConsistencyIT {
final List<Throwable> failures = new ArrayList<>();
- Thread.sleep(3000); // let's make sure everything is written
- for(int i = 0; i < 2; i ++) {
+ logger.info("Sleeping {}ms before reading to ensure all compactions have completed", POST_WRITE_SLEEP);
+ Thread.sleep(POST_WRITE_SLEEP); // let's make sure everything is written
+
+ for(int i = 0; i < 1; i ++) {
/**
@@ -303,6 +307,16 @@ public class GraphManagerShardConsistencyIT {
public void onFailure( final Throwable t ) {
failures.add( t );
logger.error( "Failed test!", t );
+
+ final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+
+ while ( groups.hasNext() ) {
+
+ logger.info( "Shard entry group: {}", groups.next() );
+
+ }
+
+
}
} );
}
@@ -409,7 +423,7 @@ public class GraphManagerShardConsistencyIT {
throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
final Id sourceId = IdGenerator.createId( "sourceDelete" );
- final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString();
+ final String deleteEdgeType = "testDelete";
final EdgeGenerator generator = new EdgeGenerator() {
@@ -432,7 +446,7 @@ public class GraphManagerShardConsistencyIT {
};
- final int numInjectors = 2;
+ final int numInjectors = 3;
/**
* create injectors. This way all the caches are independent of one another. This is the same as
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 6671dec..00406c0 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -101,10 +101,12 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final long timeservicetime = System.currentTimeMillis();
@@ -131,10 +133,13 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache);
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -172,9 +177,12 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -216,9 +224,12 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -326,9 +337,12 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -412,9 +426,12 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -475,10 +492,13 @@ public class NodeShardAllocationTest {
final TimeService timeService = mock( TimeService.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -613,9 +633,12 @@ public class NodeShardAllocationTest {
final MutationBatch batch = mock( MutationBatch.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
final Id nodeId = IdGenerator.createId( "test" );
final String type = "type";
@@ -704,9 +727,12 @@ public class NodeShardAllocationTest {
when( graphFig.getShardMinDelta() ).thenReturn( tooSmallDelta );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class);
+
+
NodeShardAllocation approximation =
new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
- timeService, graphFig, shardGroupCompaction );
+ timeService, graphFig, shardGroupCompaction, nodeShardCache );
/**
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 65f19ff..666e30a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -99,6 +99,8 @@ public class ShardGroupCompactionTest {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ final NodeShardCache nodeShardCache = mock( NodeShardCache.class );
+
final long delta = 10000;
@@ -116,7 +118,7 @@ public class ShardGroupCompactionTest {
ShardGroupCompactionImpl compaction =
new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
- edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor );
+ edgeColumnFamilies, keyspace, edgeShardSerialization, asyncTaskExecutor, nodeShardCache );
DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( IdGenerator.createId( "source" ), "test" );
http://git-wip-us.apache.org/repos/asf/usergrid/blob/97719684/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index e7f7524..5c6b045 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,10 +37,10 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
#log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
-#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardEntryGroupIterator=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE
[16/20] usergrid git commit: Fix bad comment.
Posted by mr...@apache.org.
Fix bad comment.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/85cd12d1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/85cd12d1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/85cd12d1
Branch: refs/heads/release-2.1.1
Commit: 85cd12d106bde2398c1132ef2d4a439166bb2035
Parents: 903fd18
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 21 13:00:29 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 21 13:00:29 2016 -0700
----------------------------------------------------------------------
.../corepersistence/asyncevents/AsyncEventServiceImpl.java | 9 ---------
1 file changed, 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/85cd12d1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 3b74daf..1349011 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -742,15 +742,6 @@ public class AsyncEventServiceImpl implements AsyncEventService {
/**
* Submit results to index and return the queue messages to be ack'd
*
- *
- *
- *
- *
- *
- *
- *
- * @param indexEventResults
- * @return
*/
private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
[02/20] usergrid git commit: Set verbose logging to trace.
Posted by mr...@apache.org.
Set verbose logging to trace.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/92fae0df
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/92fae0df
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/92fae0df
Branch: refs/heads/release-2.1.1
Commit: 92fae0df4d99e6e5d14811f6ba3680487a0b71b9
Parents: 8c725f1
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Mar 14 17:27:16 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Mar 14 17:27:16 2016 -0700
----------------------------------------------------------------------
.../core/astyanax/MultiRowColumnIterator.java | 109 +++++++++++--------
.../graph/GraphManagerShardConsistencyIT.java | 6 +-
2 files changed, 66 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/92fae0df/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 9971fba..c384899 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -141,18 +141,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
@Override
public boolean hasNext() {
- //logger.info(Thread.currentThread().getName()+" - calling hasNext()");
+
if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){
if(currentShardIterator.hasNext()) {
- logger.info(Thread.currentThread().getName()+" - advancing shard iterator");
- //logger.info(Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
- logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
- //Collections.reverse(rowKeysWithShardEnd);
- logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
- logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - advancing shard iterator");
+ logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+ logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+ logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ }
+
+
currentShard = currentShardIterator.next();
- logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+
+ }
+
startColumn = null;
advance();
@@ -161,11 +168,16 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
if(currentColumnIterator != null) {
- logger.info(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+
+ }
}
- logger.info(Thread.currentThread().getName()+" - moreToReturn={}", moreToReturn);
- logger.info(Thread.currentThread().getName()+" - going into advance()");
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - going into advance()");
+
+ }
advance();
}
@@ -194,7 +206,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
public void advance() {
- logger.info( "Advancing multi row column iterator" );
if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
/**
@@ -221,10 +232,18 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
if(currentShard == null){
Collections.reverse(rowKeysWithShardEnd); // ranges are ascending
- logger.info(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+ }
+
currentShard = currentShardIterator.next();
- logger.info(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
- logger.info(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
+ logger.trace(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+ }
+
}
@@ -233,7 +252,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//set the range into the search
- logger.info(Thread.currentThread().getName()+" - startColumn={}", startColumn);
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - startColumn={}", startColumn);
+ }
+
if ( startColumn == null ) {
columnSearch.buildRange( rangeBuilder );
}
@@ -249,7 +271,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
/**
* Get our list of slices
*/
- //logger.info("shard: {}, end: {}",currentShard.getRowKey().getKey(), currentShard.getShardEnd());
final RowSliceQuery<R, C> query =
keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
.withColumnRange( rangeBuilder.build() );
@@ -325,33 +346,22 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
// });
-
-
-
-
-
- //we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another
- //trip back to cassandra
-
- //discard our first element (maybe)
-
-
-
final int size = mergedResults.size();
if(logger.isTraceEnabled()){
logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size);
+ logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+
}
- logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
moreToReturn = size == selectSize;
-// if(selectSize == 1001 && mergedResults.size() == 1000){
-// moreToReturn = true;
-// }
+ if(selectSize == 1001 && mergedResults.size() == 1000){
+ moreToReturn = true;
+ }
//we have a first column to to check
if( size > 0) {
@@ -360,7 +370,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//The search has either told us to skip the first element, or it matches our last, therefore we disregard it
if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
- logger.info("removing an entry");
+ if(logger.isTraceEnabled()){
+ logger.trace("removing an entry");
+
+ }
mergedResults.remove( 0 );
}
@@ -372,22 +385,24 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
}
- logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
- logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+ if(logger.isTraceEnabled()){
+ logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+ logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+ }
+
-// if(mergedResults.size() == 0 && currentShardIterator.hasNext()){
-// //currentShard = currentShardIterator.next();
-//
-// }
currentColumnIterator = mergedResults.iterator();
- //logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
- logger.info(
- Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
- "moreToReturn={}, currentShardIterator.hasNext()={}",
- currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
+
+ if(logger.isTraceEnabled()){
+ logger.trace(
+ Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
+ "moreToReturn={}, currentShardIterator.hasNext()={}",
+ currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
+ }
+
if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
}
@@ -494,7 +509,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
//we've already seen it, no-op
if(searchIndex > -1){
- logger.info("skipping column as it was already retrieved before");
+ if(logger.isTraceEnabled()){
+ logger.trace("skipping column as it was already retrieved before");
+ }
continue;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/92fae0df/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index b131e95..2602e88 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -114,7 +114,7 @@ public class GraphManagerShardConsistencyIT {
originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
- ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 500 );
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
final long cacheTimeout = 2000;
@@ -203,9 +203,9 @@ public class GraphManagerShardConsistencyIT {
// power for writes
final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
- //final int numWorkersPerInjector = numProcessors / numInjectors;
+ final int numWorkersPerInjector = numProcessors / numInjectors;
- final int numWorkersPerInjector = 1;
+ //final int numWorkersPerInjector = 1;