You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/28 05:36:55 UTC
[4/5] Updated OrderedMerge to use a faster implementation at runtime.
After initialization,
it's an O(1) emit operation as long as our produces are fast enough.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index d434db7..0a6ecfa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -35,8 +35,15 @@ public interface GraphFig extends GuicyFig {
public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+ /**
+ * The size of the shards. This is approximate, and should be set lower than what you would like your max to be
+ */
public static final String SHARD_SIZE = "usergrid.graph.shard.size";
+
+ /**
+ * Number of shards we can cache.
+ */
public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
@@ -45,17 +52,33 @@ public interface GraphFig extends GuicyFig {
*/
public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+ /**
+ * Number of worker threads to refresh the cache
+ */
public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
- public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+ /**
+ * The size of the worker count for shard auditing
+ */
+ public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
/**
- * The minimum amount of time than can occur (in millis) between shard allocation. Must be at least 2x the cache timeout.
+ * The size of the worker count for shard auditing
+ */
+ public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+
+
+ public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+
+
+ /**
+ * The minimum amount of time than can occur (in millis) between shard allocation and compaction. Must be at least 2x the cache
+ * timeout. Set to 2.5x the cache timeout to be safe
*
- * Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache timeout + 30 seconds,
- * assuming you have NTP and allow a max drift of 30 seconds
+ * Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache
+ * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
*/
public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
@@ -67,26 +90,23 @@ public interface GraphFig extends GuicyFig {
public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
-
-
@Default("1000")
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
-
@Default("5")
@Key(REPAIR_CONCURRENT_SIZE)
int getRepairConcurrentSize();
@Default( ".10" )
- @Key( SHARD_REPAIR_CHANCE )
+ @Key( SHARD_REPAIR_CHANCE )
double getShardRepairChance();
- @Default("500000")
- @Key(SHARD_SIZE)
+ @Default( "500000" )
+ @Key( SHARD_SIZE )
long getShardSize();
@@ -95,31 +115,40 @@ public interface GraphFig extends GuicyFig {
long getShardCacheTimeout();
@Default("60000")
- @Key( SHARD_MIN_DELTA )
+ @Key(SHARD_MIN_DELTA)
long getShardMinDelta();
- @Default( "250000" )
- @Key( SHARD_CACHE_SIZE )
+ @Default("250000")
+ @Key(SHARD_CACHE_SIZE)
long getShardCacheSize();
- @Default( "2" )
- @Key( SHARD_CACHE_REFRESH_WORKERS )
+ @Default("2")
+ @Key(SHARD_CACHE_REFRESH_WORKERS)
int getShardCacheRefreshWorkerCount();
- @Default( "10000" )
- @Key( COUNTER_WRITE_FLUSH_COUNT )
+ @Default( "10" )
+ @Key( SHARD_AUDIT_WORKERS )
+ int getShardAuditWorkerCount();
+
+ @Default( "1000" )
+ @Key( SHARD_AUDIT_QUEUE_SIZE )
+ int getShardAuditWorkerQueueSize();
+
+
+ @Default("10000")
+ @Key(COUNTER_WRITE_FLUSH_COUNT)
long getCounterFlushCount();
- @Default( "30000" )
- @Key( COUNTER_WRITE_FLUSH_INTERVAL )
+ @Default("30000")
+ @Key(COUNTER_WRITE_FLUSH_INTERVAL)
long getCounterFlushInterval();
- @Default( "1000" )
- @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE )
+ @Default("1000")
+ @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE)
int getCounterFlushQueueSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 7e589f2..114440f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -65,4 +65,10 @@ public interface SearchByEdge {
*/
Optional<Edge> last();
+ /**
+ * Get the sort order
+ * @return
+ */
+ SearchByEdgeType.Order getOrder();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 29cc3f5..749130b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -59,4 +59,20 @@ public interface SearchByEdgeType {
*/
Optional<Edge> last();
+ /**
+ * Get the direction we're seeking
+ * @return
+ */
+ Order getOrder();
+
+
+ /**
+ * Options for ordering. By default, we want to perform descending for common use cases and read speed. This is our our data
+ * is optimized in cassandra
+ */
+ public enum Order {
+ DESCENDING,
+ ASCENDING
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 64f0fbb..f0e954b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -48,6 +48,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
@@ -55,6 +56,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.Node
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -119,6 +121,8 @@ public class GraphModule extends AbstractModule {
bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
+ bind( ShardGroupCompaction.class).to( ShardGroupCompactionImpl.class);
+
/**
* Bind our implementation
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
deleted file mode 100644
index 20bc637..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
-@BindingAnnotation
-public @interface MergedProxy {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 12192fc..9fcb816 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -70,7 +70,6 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
return false;
}
-
return true;
}
@@ -90,3 +89,4 @@ public class SimpleMarkedEdge extends SimpleEdge implements MarkedEdge {
"} " + super.toString();
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index e8971f6..d40efc0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -23,10 +23,12 @@ package org.apache.usergrid.persistence.graph.impl;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
@@ -40,6 +42,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
private final String type;
private final long maxTimestamp;
private final Optional<Edge> last;
+ private final SearchByEdgeType.Order order;
/**
@@ -50,17 +53,20 @@ public class SimpleSearchByEdge implements SearchByEdge {
* @param maxTimestamp The maximum timestamp to seek from
* @param last The value to start seeking from. Must be >= this value
*/
- public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final Edge last ) {
+ public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Edge last ) {
+
ValidationUtils.verifyIdentity(sourceNode);
ValidationUtils.verifyIdentity(targetNode);
ValidationUtils.verifyString( type, "type" );
GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+ Preconditions.checkNotNull(order, "order must not be null");
this.sourceNode = sourceNode;
this.targetNode = targetNode;
this.type = type;
this.maxTimestamp = maxTimestamp;
+ this.order = order;
this.last = Optional.fromNullable(last);
}
@@ -93,4 +99,10 @@ public class SimpleSearchByEdge implements SearchByEdge {
public Optional<Edge> last() {
return last;
}
+
+
+ @Override
+ public SearchByEdgeType.Order getOrder() {
+ return order;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9e7dcde..6bc8b1b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
@@ -39,6 +40,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
private final String type;
private final long maxTimestamp;
private final Optional<Edge> last;
+ private final Order order;
/**
@@ -46,9 +48,14 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
* @param node The node to search from
* @param type The edge type
* @param maxTimestamp The maximum timestamp to return
+ * @param order The order order. Descending is most efficient
* @param last The value to start seeking from. Must be >= this value
+ * @param order
*/
- public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Edge last ) {
+ public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last
+ ) {
+
+ Preconditions.checkNotNull( order, "order is required");
ValidationUtils.verifyIdentity(node);
ValidationUtils.verifyString( type, "type" );
GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
@@ -57,6 +64,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
this.node = node;
this.type = type;
this.maxTimestamp = maxTimestamp;
+ this.order = order;
this.last = Optional.fromNullable(last);
}
@@ -86,6 +94,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
@Override
+ public Order getOrder() {
+ return order;
+ }
+
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
index 4249ae7..4b73347 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
@@ -44,8 +44,8 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear
* @param last The value to start seeking from. Must be >= this value
*/
- public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final String idType, final Edge last ) {
- super( node, type, maxTimestamp, last );
+ public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType, final Edge last ) {
+ super( node, type, maxTimestamp, order, last );
ValidationUtils.verifyString( idType, "idType" );
this.idType = idType;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 5be3541..0137ba4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -32,14 +32,13 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -116,7 +115,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
final SimpleSearchByEdge search =
new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
- edge.getTimestamp(), null );
+ edge.getTimestamp(), SearchByEdgeType.Order.DESCENDING, null );
return serialization.getEdgeVersions( scope, search );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 055b867..7e09eca 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -33,7 +33,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -46,7 +46,6 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -286,7 +285,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTargetBySourceType( scope,
- new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, subType, null ) );
+ new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, SearchByEdgeType.Order.DESCENDING, subType, null ) );
}
} );
}
@@ -332,7 +331,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope,
- new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, subType, null ) );
+ new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, SearchByEdgeType.Order.DESCENDING, subType, null ) );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index 962da21..2be6c55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -33,8 +33,8 @@ import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -46,7 +46,6 @@ import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action0;
@@ -160,7 +159,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesToTarget( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) );
+ new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
}
} );
}
@@ -177,7 +176,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
@Override
protected Iterator<MarkedEdge> getIterator() {
return storageSerialization.getEdgesFromSource( scope,
- new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) );
+ new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 92f2548..6bb467f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.MarkedEdge;
@@ -38,6 +39,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.netflix.astyanax.MutationBatch;
+
/**
* A bean to define directed edge meta data. This is used to encapsulate the meta data around a source or target node,
@@ -151,14 +154,52 @@ public abstract class DirectedEdgeMeta {
}
+ @Override
+ public String toString() {
+ return "DirectedEdgeMeta{" +
+ "nodes=" + Arrays.toString( nodes ) +
+ ", types=" + Arrays.toString( types ) +
+ '}';
+ }
+
+
/**
* Given the edge serialization, load all shard in the shard group
*/
public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
final ApplicationScope scope, final Collection<Shard> shards,
- final long maxValue );
+ final long maxValue, final SearchByEdgeType.Order order );
+
+
+ /**
+ * Write the edge for this meta data to the target edge
+ * @param shardedEdgeSerialization
+ * @param edgeColumnFamilies
+ * @param scope
+ * @param targetShard
+ * @param edge
+ * @param timestamp The timestamp on the operation
+ * @return
+ */
+ public abstract MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp );
+
+ /**
+ * Delete the edge for this meta data from the shard
+ * @param shardedEdgeSerialization
+ * @param edgeColumnFamilies
+ * @param scope
+ * @param sourceShard
+ * @param edge
+ * @param timestamp The timestamp on the operation
+ * @return
+ */
+ public abstract MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp );
/**
@@ -225,19 +266,39 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final Collection<Shard> shards,
- final long maxValue ) {
+ final ApplicationScope scope, final Collection<Shard> shards,
+ final long maxValue, final SearchByEdgeType.Order order ) {
final Id sourceId = nodes[0].id;
final String edgeType = types[0];
- final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
+ final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, order, null);
return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards );
}
@Override
+ public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .writeEdgeFromSource( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+ this, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .deleteEdgeFromSource( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+ this, timestamp );
+ }
+
+
+ @Override
public MetaType getType() {
return MetaType.SOURCE;
}
@@ -264,19 +325,39 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
- final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final Collection<Shard>shards,
- final long maxValue ) {
-//
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final Collection<Shard> shards,
+ final long maxValue, final SearchByEdgeType.Order order ) {
+ //
final Id sourceId = nodes[0].id;
final String edgeType = types[0];
final String targetType = types[1];
final SearchByIdType search =
- new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
+ new SimpleSearchByIdType( sourceId, edgeType, maxValue, order, targetType, null );
+
+ return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards );
+ }
- return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards);
+
+
+
+ @Override
+ public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization.writeEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, edge,
+ Collections.singleton( targetShard ), this, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization.deleteEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, edge,
+ Collections.singleton( sourceShard ), this, timestamp );
}
@@ -304,20 +385,40 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final Collection<Shard> shards,
- final long maxValue ) {
+ final ApplicationScope scope, final Collection<Shard> shards,
+ final long maxValue, final SearchByEdgeType.Order order ) {
final Id targetId = nodes[0].id;
final String edgeType = types[0];
- final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
+ final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, order, null);
return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards );
}
@Override
+ public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .writeEdgeToTarget( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+ this, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .deleteEdgeToTarget( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+ this, timestamp );
+ }
+
+
+ @Override
public MetaType getType() {
return MetaType.TARGET;
}
@@ -339,11 +440,13 @@ public abstract class DirectedEdgeMeta {
private static DirectedEdgeMeta fromTargetNodeSourceType( final NodeMeta[] nodes, final String[] types ) {
return new DirectedEdgeMeta( nodes, types ) {
+
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
final ApplicationScope scope, final Collection<Shard> shards,
- final long maxValue ) {
+ final long maxValue, final SearchByEdgeType.Order order ) {
+
final Id targetId = nodes[0].id;
final String edgeType = types[0];
@@ -351,9 +454,27 @@ public abstract class DirectedEdgeMeta {
final SearchByIdType search =
- new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
+ new SimpleSearchByIdType( targetId, edgeType, maxValue, order, sourceType, null );
+
+ return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards );
+ }
+
- return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards);
+ @Override
+ public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization.writeEdgeToTargetWithSourceType( edgeColumnFamilies, scope, edge,
+ Collections.singleton( targetShard ), this, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization.deleteEdgeToTargetWithSourceType( edgeColumnFamilies, scope, edge,
+ Collections.singleton( sourceShard ), this, timestamp );
}
@@ -385,18 +506,37 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final Collection<Shard> shards,
- final long maxValue ) {
+ final ApplicationScope scope, final Collection<Shard> shards,
+ final long maxValue, final SearchByEdgeType.Order order ) {
final Id sourceId = nodes[0].id;
final Id targetId = nodes[1].id;
final String edgeType = types[0];
final SimpleSearchByEdge search =
- new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
+ new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, order, null );
+
+ return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards );
+ }
- return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards);
+ @Override
+ public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .writeEdgeVersions( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+ this, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+ final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+ return shardedEdgeSerialization
+ .deleteEdgeVersions( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+ this, timestamp );
}
@@ -408,16 +548,15 @@ public abstract class DirectedEdgeMeta {
}
-
/**
* Create a directed edge from the stored meta data
+ *
* @param metaType The meta type stored
* @param nodes The metadata of the nodes
* @param types The types in the meta data
- *
- *
*/
- public static DirectedEdgeMeta fromStorage( final MetaType metaType, final NodeMeta[] nodes, final String[] types ) {
+ public static DirectedEdgeMeta fromStorage( final MetaType metaType, final NodeMeta[] nodes,
+ final String[] types ) {
switch ( metaType ) {
case SOURCE:
return fromSourceNode( nodes, types );
@@ -428,7 +567,7 @@ public abstract class DirectedEdgeMeta {
case TARGETSOURCE:
return fromTargetNodeSourceType( nodes, types );
case VERSIONS:
- return fromEdge(nodes, types);
+ return fromEdge( nodes, types );
default:
throw new UnsupportedOperationException( "No supported meta type found" );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
index 9bd9937..74f7ffc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -42,4 +42,14 @@ public class RowKey {
this.edgeType = edgeType;
this.shardId = shardId;
}
+
+
+ @Override
+ public String toString() {
+ return "RowKey{" +
+ "nodeId=" + nodeId +
+ ", edgeType='" + edgeType + '\'' +
+ ", shardId=" + shardId +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
index 6e69bbf..3368c40 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -57,5 +57,10 @@ public class RowKeyType extends RowKey {
}
-
+ @Override
+ public String toString() {
+ return "RowKeyType{" +
+ "idType='" + idType + '\'' +
+ "} " + super.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 38fe51c..9ca6cbe 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -137,6 +137,7 @@ public class Shard implements Comparable<Shard> {
return "Shard{" +
"shardIndex=" + shardIndex +
", createdTime=" + createdTime +
- "} ";
+ ", compacted=" + compacted +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 70569fd..11bf7a4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -20,10 +20,15 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.base.Preconditions;
@@ -36,6 +41,7 @@ import com.google.common.base.Preconditions;
*/
public class ShardEntryGroup {
+ private static final Logger LOG = LoggerFactory.getLogger( ShardEntryGroup.class );
private List<Shard> shards;
@@ -45,6 +51,8 @@ public class ShardEntryGroup {
private Shard compactionTarget;
+ private Shard rootShard;
+
/**
* The max delta we accept in milliseconds for create time to be considered a member of this group
@@ -85,10 +93,13 @@ public class ShardEntryGroup {
//shard is not compacted, or it's predecessor isn't, we should include it in this group
if ( !minShard.isCompacted() ) {
addShardInternal( shard );
+
return true;
}
+
+
return false;
}
@@ -124,10 +135,26 @@ public class ShardEntryGroup {
* Get the entries that we should read from.
*/
public Collection<Shard> getReadShards() {
- return shards;
+
+
+ final Shard staticShard = getRootShard();
+ final Shard compactionTarget = getCompactionTarget();
+
+
+
+ if(compactionTarget != null){
+ LOG.debug( "Returning shards {} and {} as read shards", compactionTarget, staticShard );
+ return Arrays.asList( compactionTarget, staticShard );
+ }
+
+
+ LOG.debug( "Returning shards {} read shard", staticShard );
+ return Collections.singleton( staticShard );
}
+
+
/**
* Get the entries, with the max shard time being first. We write to all shards until they're migrated
*/
@@ -138,11 +165,22 @@ public class ShardEntryGroup {
* adding data to other shards
*/
if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
- return Collections.singleton( getCompactionTarget() );
+
+ final Shard compactionTarget = getCompactionTarget();
+
+ LOG.debug( "Returning shard {} as write shard", compactionTarget);
+
+ return Collections.singleton( compactionTarget );
+
}
+ final Shard staticShard = getRootShard();
+
+
+ LOG.debug( "Returning shard {} as write shard", staticShard);
+
+ return Collections.singleton( staticShard );
- return shards;
}
@@ -155,6 +193,24 @@ public class ShardEntryGroup {
/**
+ * Get the root shard that was created in this group
+ * @return
+ */
+ private Shard getRootShard(){
+ if(rootShard != null){
+ return rootShard;
+ }
+
+ final Shard rootCandidate = shards.get( shards.size() -1 );
+
+ if(rootCandidate.isCompacted()){
+ rootShard = rootCandidate;
+ }
+
+ return rootShard;
+ }
+
+ /**
* Get the shard all compactions should write to. Null indicates we cannot find a shard that could be used as a
* compaction target. Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
* first to ensure all criteria are met before initiating compaction
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index bf4d3c9..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,38 +22,32 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Set;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import rx.Observable;
+import com.google.common.util.concurrent.ListenableFuture;
/**
* Defines tasks for running compaction
- *
- *
*/
public interface ShardGroupCompaction {
-
- /**
- * Execute the compaction task. Will return the number of edges that have
- * @param group The shard entry group to compact
- * @return The shards that were compacted
- */
- public Set<Shard> compact(final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group);
-
/**
- * Possibly audit the shard entry group. This is asynchronous and returns immediately
- * @param group
- * @return
+ * Possibly audit the shard entry group. This is asynchronous and returns the future that will
+ * report the operations performed (if any) upon completion.
+ *
+ * @return A ListenableFuture with the result. Note that some
*/
- public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
- final ShardEntryGroup group );
+ public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group );
- public enum AuditResult{
+ public enum AuditResult {
/**
* We didn't check this shard
*/
@@ -68,11 +62,10 @@ public interface ShardGroupCompaction {
*/
CHECKED_CREATED,
- /**
+ COMPACTED, /**
* The shard group is already compacting
*/
COMPACTING
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index ed3daaf..749416c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -39,10 +39,9 @@ import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
@@ -51,7 +50,6 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
@@ -127,7 +125,10 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
}
//column not found, return 0
catch ( RuntimeException re ) {
- if(re.getCause().getCause() instanceof NotFoundException) {
+
+ final Throwable cause = re.getCause();
+
+ if(cause != null && cause.getCause() instanceof NotFoundException) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
deleted file mode 100644
index 90b264c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-/**
- * Class to perform serialization for row keys from edges
- */
-
-public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
-
- //add the row id to the composite
- ID_SER.toComposite( builder, key.sourceId );
- builder.addString( key.edgeType );
- ID_SER.toComposite( builder, key.targetId );
- builder.addLong( key.shardId );
- }
-
-
- @Override
- public EdgeRowKey fromComposite( final CompositeParser composite ) {
-
- final Id sourceId = ID_SER.fromComposite( composite );
- final String edgeType = composite.readString();
- final Id targetId = ID_SER.fromComposite( composite );
- final long shard = composite.readLong();
-
- return new EdgeRowKey( sourceId, edgeType, targetId, shard );
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 27862d0..413c2a3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -4,16 +4,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
-import java.util.Set;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -30,23 +30,26 @@ import com.netflix.astyanax.util.RangeBuilder;
* @param <C> The column type
* @param <T> The parsed return type
*/
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
- protected final Optional<Edge> last;
+ protected final Optional<T> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
protected final Collection<Shard> shards;
+ protected final SearchByEdgeType.Order order;
+ protected final Comparator<T> comparator;
- protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Collection<Shard> shards ) {
+ protected EdgeSearcher( final ApplicationScope scope, final Collection<Shard> shards, final SearchByEdgeType.Order order, final Comparator<T> comparator, final long maxTimestamp, final Optional<T> last) {
Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards");
this.scope = scope;
this.maxTimestamp = maxTimestamp;
- this.last = last;
+ this.order = order;
this.shards = shards;
+ this.last = last;
+ this.comparator = comparator;
}
@@ -69,20 +72,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
- /**
- * Set the range on a search
- */
- public void setRange( final RangeBuilder builder ) {
-
- //set our start range since it was supplied to us
- if ( last.isPresent() ) {
- C sourceEdge = getStartColumn( last.get() );
-
-
- builder.setStart( sourceEdge, getSerializer() );
- }
-
- }
public boolean hasPage() {
@@ -98,6 +87,50 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
}
+ @Override
+ public void buildRange( final RangeBuilder rangeBuilder, final T value ) {
+
+ C edge = createColumn( value );
+
+ rangeBuilder.setStart( edge, getSerializer() );
+
+ setRangeOptions( rangeBuilder );
+ }
+
+
+ @Override
+ public void buildRange( final RangeBuilder rangeBuilder ) {
+
+ //set our start range since it was supplied to us
+ if ( last.isPresent() ) {
+ C sourceEdge = createColumn( last.get() );
+
+
+ rangeBuilder.setStart( sourceEdge, getSerializer() );
+ }
+
+
+ setRangeOptions(rangeBuilder);
+
+
+ }
+
+ private void setRangeOptions(final RangeBuilder rangeBuilder){
+ //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag
+ final boolean reversed = order == SearchByEdgeType.Order.ASCENDING;
+
+ rangeBuilder.setReversed( reversed );
+ }
+
+
+ /**
+ * Get the comparator
+ * @return
+ */
+ public Comparator<T> getComparator() {
+ return comparator;
+ }
+
/**
* Get the column's serializer
@@ -116,7 +149,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
/**
* Set the start column to begin searching from. The last is provided
*/
- protected abstract C getStartColumn( final Edge last );
+ protected abstract C createColumn( final T last );
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
deleted file mode 100644
index d93f679..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.nio.ByteBuffer;
-
-import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.model.DynamicComposite;
-import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.LongSerializer;
-
-
-/**
- * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target
- * Id and version.
- */
-public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
-
- private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
- private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
-
-
- @Override
- public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
-
- DynamicComposite composite = new DynamicComposite();
-
- composite.addComponent( edge.timestamp, LONG_SERIALIZER );
-
- ID_COL_SERIALIZER.toComposite( composite, edge.id );
-
- return composite.serialize();
- }
-
-
- @Override
- public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
- DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
-
- Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
-
-
- //return the version
- final long timestamp = composite.get( 0, LONG_SERIALIZER );
-
-
- //parse our id
- final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
-
-
- return new DirectedEdge( id, timestamp );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
deleted file mode 100644
index 0451d68..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
- public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
-
-
- final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
-
- //add the stored value
- builder.addInteger( meta.getType().getStorageValue() );
-
- final int length = nodeMeta.length;
-
- builder.addInteger( length );
-
-
- for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
- ID_SER.toComposite( builder, node.getId() );
- builder.addInteger( node.getNodeType().getStorageValue() );
- }
-
- final String[] edgeTypes = meta.getTypes();
-
- builder.addInteger( edgeTypes.length );
-
- for ( String type : edgeTypes ) {
- builder.addString( type );
- }
- }
-
-
- @Override
- public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
-
-
- final int storageType = composite.readInteger();
-
- final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
-
- final int idLength = composite.readInteger();
-
- final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
-
-
- for ( int i = 0; i < idLength; i++ ) {
- final Id sourceId = ID_SER.fromComposite( composite );
-
- final NodeType type = NodeType.get( composite.readInteger() );
-
- nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
- }
-
-
- final int length = composite.readInteger();
-
- String[] types = new String[length];
-
- for ( int i = 0; i < length; i++ ) {
- types[i] = composite.readString();
- }
-
- return DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 8233d0d..3b1f37a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -40,6 +40,7 @@ import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index b919ad7..832d9b0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
@@ -40,16 +41,15 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardA
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import com.fasterxml.uuid.impl.UUIDUtil;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.util.TimeUUIDUtils;
/**
@@ -68,6 +68,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private final NodeShardApproximation nodeShardApproximation;
private final TimeService timeService;
private final GraphFig graphFig;
+ private final ShardGroupCompaction shardGroupCompaction;
@Inject
@@ -75,13 +76,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final EdgeColumnFamilies edgeColumnFamilies,
final ShardedEdgeSerialization shardedEdgeSerialization,
final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
- final GraphFig graphFig ) {
+ final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
this.edgeShardSerialization = edgeShardSerialization;
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
this.nodeShardApproximation = nodeShardApproximation;
this.timeService = timeService;
this.graphFig = graphFig;
+ this.shardGroupCompaction = shardGroupCompaction;
}
@@ -113,7 +115,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
- return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta() );
+ return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
+ directedEdgeMeta );
}
@@ -158,18 +161,31 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
+ final long shardSize = graphFig.getShardSize();
- if ( count < graphFig.getShardSize() ) {
+
+ if ( count < shardSize ) {
return false;
}
+ /**
+ * We want to allocate a new shard as close to the max value as possible. This way if we're filling up a shard rapidly, we split it near the head of the values.
+ * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node.
+ *
+ * This means that the lower shard can be re-split later if it is still too large. We do the division to truncate
+ * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the
+ * lower bound and moved forward. Doing this will stop the current shard from expanding and avoid a point where we cannot
+ * ultimately compact to the correct shard size.
+ */
+
/**
* Allocate the shard
*/
final Iterator<MarkedEdge> edges = directedEdgeMeta
- .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), Long.MAX_VALUE );
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+ SearchByEdgeType.Order.ASCENDING );
if ( !edges.hasNext() ) {
@@ -178,14 +194,41 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return false;
}
- //we have a next, allocate it based on the max
- MarkedEdge marked = edges.next();
+ MarkedEdge marked = null;
+
+ /**
+ * Advance to the pivot point we should use. Once it's compacted, we can split again.
+ * We either want to take the first one (unlikely) or we take our total count - the shard size.
+ * If this is a negative number, we're approaching our max count for this shard, so the first
+ * element will suffice.
+ */
+
+
+ for(long i = 1; edges.hasNext(); i++){
+ //we hit a pivot shard, set it since it could be the last one we encounter
+ if(i% shardSize == 0){
+ marked = edges.next();
+ }
+ else{
+ edges.next();
+ }
+ }
+
+
+ /**
+ * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
+ */
+ if(marked == null){
+ LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+ return false;
+ }
final long createTimestamp = timeService.getCurrentTime();
final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false );
+ LOG.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta );
final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
@@ -220,22 +263,30 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
+
+ //TODO: TN this is broken....
//The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
- final long uuidDelta = graphFig.getShardCacheTimeout() * 10000;
+ final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+
+ final long timeNow = timeService.getCurrentTime();
- final long timeNow = UUIDGenerator.newTimeUUID().timestamp();
+ boolean isNew = true;
for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
- final long uuidTime = node.getId().getUuid().timestamp();
+ //short circuit
+ if(!isNew){
+ return false;
+ }
- final long uuidTimeDelta = uuidTime + uuidDelta;
+ final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
- if ( uuidTimeDelta < timeNow ) {
- return true;
- }
+ final long newExpirationTimeout = uuidTime + timeoutDelta;
+
+ //our expiration is after our current time, treat it as new
+ isNew = isNew && newExpirationTimeout > timeNow;
}
- return false;
+ return isNew;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d444eec..36a7bc1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -299,13 +299,13 @@ public class NodeShardCacheImpl implements NodeShardCache {
/**
- * Get all shards <= this one in decending order
+ * Get all shards <= this one in descending order
*/
public Iterator<ShardEntryGroup> getShards( final Long maxShard ) {
final Long firstKey = shards.floorKey( maxShard );
- return shards.headMap( firstKey, true ).descendingMap().values().iterator();
+ return Collections.unmodifiableCollection( shards.headMap( firstKey, true ).descendingMap().values()).iterator();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
deleted file mode 100644
index 1edaf21..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-/**
- * Class to perform serialization for row keys from edges
- */
-public class RowSerializer implements CompositeFieldSerializer<RowKey> {
-
- private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
- @Override
- public void toComposite( final CompositeBuilder builder, final RowKey key ) {
-
- //add the row id to the composite
- ID_SER.toComposite( builder, key.nodeId );
-
- builder.addString( key.edgeType );
- builder.addLong( key.shardId );
- }
-
-
- @Override
- public RowKey fromComposite( final CompositeParser composite ) {
-
- final Id id = ID_SER.fromComposite( composite );
- final String edgeType = composite.readString();
- final long shard = composite.readLong();
-
-
- return new RowKey( id, edgeType, shard );
- }
-}