You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/14 17:20:24 UTC
[5/6] Implemented New rolling shard algorithm. This should allow
eventual shard consistency between all clients without the need for an
external locking allocation system
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
new file mode 100644
index 0000000..89e46d9
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -0,0 +1,405 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * A bean to define directed edge meta data. This is used to encapsulate the meta data around a source or target node,
+ * and the types used for grouping them.
+ */
+public abstract class DirectedEdgeMeta {
+
+
+ protected final NodeMeta[] nodes;
+ protected final String[] types;
+
+
+ private DirectedEdgeMeta( NodeMeta[] nodes, String[] types ) {
+ this.nodes = nodes;
+ this.types = types;
+ }
+
+
+ public NodeMeta[] getNodes() {
+ return nodes;
+ }
+
+
+ public String[] getTypes() {
+ return types;
+ }
+
+
+ /**
+ * Inner class to represent node meta dat
+ */
+ public static class NodeMeta {
+ private final Id id;
+ private final NodeType nodeType;
+
+
+ public NodeMeta( final Id id, final NodeType nodeType ) {
+ this.id = id;
+ this.nodeType = nodeType;
+ }
+
+
+ public Id getId() {
+ return id;
+ }
+
+
+ public NodeType getNodeType() {
+ return nodeType;
+ }
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final DirectedEdgeMeta that = ( DirectedEdgeMeta ) o;
+
+ if ( !Arrays.equals( nodes, that.nodes ) ) {
+ return false;
+ }
+ if ( !Arrays.equals( types, that.types ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode( nodes );
+ result = 31 * result + Arrays.hashCode( types );
+ return result;
+ }
+
+
+ /**
+ * Given the edge serialization, load all shard in the shard group
+ */
+ public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue );
+
+ /**
+ * Get the type of this directed edge
+ */
+ public abstract MetaType getType();
+
+
+ public enum MetaType {
+ SOURCE( 0 ),
+ SOURCETARGET( 1 ),
+ TARGET( 2 ),
+ TARGETSOURCE( 3 ),
+ VERSIONS( 4 );
+
+ private final int storageValue;
+
+
+ MetaType( final int storageValue ) {this.storageValue = storageValue;}
+
+
+ public int getStorageValue() {
+ return storageValue;
+ }
+
+
+ /**
+ * Get value from storageValue
+ */
+ public static MetaType fromStorage( final int ordinal ) {
+ return mappings.get( ordinal );
+ }
+
+
+ private static Map<Integer, MetaType> mappings = new HashMap<Integer, MetaType>();
+
+
+ static {
+
+ for ( MetaType meta : MetaType.values() ) {
+ mappings.put( meta.storageValue, meta );
+ }
+ }
+ }
+
+
+ /**
+ * Created directed edge meta data from source node
+ */
+ public static DirectedEdgeMeta fromSourceNode( final Id sourceId, final String edgeType ) {
+ return fromSourceNode(
+ new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ) },
+ new String[] { edgeType } );
+ }
+
+
+ /**
+ * Return meta data from the source node by edge type
+ */
+ private static DirectedEdgeMeta fromSourceNode( final NodeMeta[] nodes, final String[] types ) {
+
+ return new DirectedEdgeMeta( nodes, types ) {
+
+ @Override
+ public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue ) {
+
+ final Id sourceId = nodes[0].id;
+ final String edgeType = types[0];
+
+ final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
+
+ return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search,
+ Collections.singleton( group ).iterator() );
+ }
+
+
+ @Override
+ public MetaType getType() {
+ return MetaType.SOURCE;
+ }
+ };
+ }
+
+
+ /**
+ * Return meta data that represents a source node with edge type and target type
+ */
+ public static DirectedEdgeMeta fromSourceNodeTargetType( final Id sourceId, final String edgeType,
+ final String targetType ) {
+ return fromSourceNodeTargetType(
+ new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ) },
+ new String[] { edgeType, targetType } );
+ }
+
+
+ /**
+ * Return meta data that represents a source node with edge type and target type
+ */
+ private static DirectedEdgeMeta fromSourceNodeTargetType( NodeMeta[] nodes, String[] types ) {
+ return new DirectedEdgeMeta( nodes, types ) {
+
+ @Override
+ public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue ) {
+
+ final Id sourceId = nodes[0].id;
+ final String edgeType = types[0];
+ final String targetType = types[1];
+
+ final SearchByIdType search =
+ new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
+
+ return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search,
+ Collections.singleton( group ).iterator() );
+ }
+
+
+ @Override
+ public MetaType getType() {
+ return MetaType.SOURCETARGET;
+ }
+ };
+ }
+
+
+ public static DirectedEdgeMeta fromTargetNode( final Id targetId, final String edgeType ) {
+ return fromTargetNode(
+ new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET ) },
+ new String[] { edgeType } );
+ }
+
+
+ /**
+ * Return meta data that represents from a target node by edge type
+ */
+ private static DirectedEdgeMeta fromTargetNode( final NodeMeta[] nodes, final String[] types ) {
+ return new DirectedEdgeMeta( nodes, types ) {
+
+ @Override
+ public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue ) {
+
+
+ final Id targetId = nodes[0].id;
+ final String edgeType = types[0];
+
+ final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
+
+ return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search,
+ Collections.singleton( group ).iterator() );
+ }
+
+
+ @Override
+ public MetaType getType() {
+ return MetaType.TARGET;
+ }
+ };
+ }
+
+
+ public static DirectedEdgeMeta fromTargetNodeSourceType( final Id targetId, final String edgeType,
+ final String sourceType ) {
+ return fromTargetNodeSourceType(
+ new DirectedEdgeMeta.NodeMeta[] { new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET ) },
+ new String[] { edgeType, sourceType } );
+ }
+
+
+ /**
+ * Return meta data that represents a target node and a source node type
+ */
+ private static DirectedEdgeMeta fromTargetNodeSourceType( final NodeMeta[] nodes, final String[] types ) {
+ return new DirectedEdgeMeta( nodes, types ) {
+
+ @Override
+ public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue ) {
+
+ final Id targetId = nodes[0].id;
+ final String edgeType = types[0];
+ final String sourceType = types[1];
+
+
+ final SearchByIdType search =
+ new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
+
+ return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search,
+ Collections.singleton( group ).iterator() );
+ }
+
+
+ @Override
+ public MetaType getType() {
+ return MetaType.TARGETSOURCE;
+ }
+ };
+ }
+
+
+ /**
+ * Return meta data that represents an entire edge
+ */
+ public static DirectedEdgeMeta fromEdge( final Id sourceId, final Id targetId, final String edgeType ) {
+ return fromEdge( new DirectedEdgeMeta.NodeMeta[] {
+ new DirectedEdgeMeta.NodeMeta( sourceId, NodeType.SOURCE ),
+ new DirectedEdgeMeta.NodeMeta( targetId, NodeType.TARGET )
+ }, new String[] { edgeType } );
+ }
+
+
+ /**
+ * Return meta data that represents an entire edge
+ */
+ private static DirectedEdgeMeta fromEdge( final NodeMeta[] nodes, final String[] types ) {
+ return new DirectedEdgeMeta( nodes, types ) {
+
+ @Override
+ public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final ApplicationScope scope, final ShardEntryGroup group,
+ final long maxValue ) {
+
+ final Id sourceId = nodes[0].id;
+ final Id targetId = nodes[1].id;
+ final String edgeType = types[0];
+
+ final SimpleSearchByEdge search =
+ new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
+
+ return serialization.getEdgeVersions( edgeColumnFamilies, scope, search,
+ Collections.singleton( group ).iterator() );
+ }
+
+
+ @Override
+ public MetaType getType() {
+ return MetaType.VERSIONS;
+ }
+ };
+ }
+
+
+ /**
+ * Create a directed edge from the stored meta data
+ * @param metaType The meta type stored
+ * @param nodes The metadata of the nodes
+ * @param types The types in the meta data
+ *
+ *
+ */
+ public static DirectedEdgeMeta fromStorage( final MetaType metaType, final NodeMeta[] nodes, final String[] types ) {
+ switch ( metaType ) {
+ case SOURCE:
+ return fromSourceNode( nodes, types );
+ case SOURCETARGET:
+ return fromSourceNodeTargetType( nodes, types );
+ case TARGET:
+ return fromTargetNode( nodes, types );
+ case TARGETSOURCE:
+ return fromTargetNodeSourceType( nodes, types );
+ case VERSIONS:
+ return fromEdge(nodes, types);
+ default:
+ throw new UnsupportedOperationException( "No supported meta type found" );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
new file mode 100644
index 0000000..6f6c72d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeColumnFamilies.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+
+/**
+ * Implementation for using multiple column families
+ */
+public interface EdgeColumnFamilies extends Migration{
+
+ /**
+ * Get the name of the column family for getting source nodes
+ */
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName();
+
+ /**
+ * Get the name of the column family for getting target nodes
+ */
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName();
+
+
+ /**
+ * Get the name of the column family for getting source nodes with a target type
+ */
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName();
+
+ /**
+ * Get the name of the column family for getting target nodes with a source type
+ */
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName();
+
+ /**
+ * Get the Graph edge versions cf
+ * @return
+ */
+ public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
new file mode 100644
index 0000000..d7982d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeRowKey.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Used to store row keys by sourceId, targetId and edgeType
+ */
+public class EdgeRowKey {
+ public final Id sourceId;
+ public final Id targetId;
+ public final String edgeType;
+ public final long shardId;
+
+
+ public EdgeRowKey( final Id sourceId, final String edgeType, final Id targetId, final long shardId ) {
+ this.sourceId = sourceId;
+ this.targetId = targetId;
+ this.edgeType = edgeType;
+ this.shardId = shardId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index d49cfdf..81dcb39 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -38,31 +38,28 @@ public interface EdgeShardSerialization extends Migration{
/**
* Write a new time shard for the meta data
* @param scope The scope to write
- * @param nodeId The id in the edge
- * @param shard The next time to write
- * @param types The types to write to. Can be edge type, or edgeType+id type
+ * @param shard The shard to write
+ * @param directedEdgeMeta The edge meta data to use
*/
- public MutationBatch writeEdgeMeta(ApplicationScope scope, Id nodeId, long shard, String... types);
+ public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
/**
* Get an iterator of all meta data and types. Returns a range from High to low
* @param scope The organization scope
- * @param nodeId The id of the node
* @param start The shard time to start seeking from. Values <= this value will be returned.
- * @param types The types to use
+ * @param directedEdgeMeta The edge meta data to use
* @return
*/
- public Iterator<Long> getEdgeMetaData(ApplicationScope scope, Id nodeId, Optional<Long> start, String... types);
+ public Iterator<Shard> getShardMetaData( ApplicationScope scope, Optional<Shard> start, DirectedEdgeMeta directedEdgeMeta);
/**
* Remove the shard from the edge meta data from the types.
- * @param scope
- * @param nodeId
- * @param shard
- * @param types
+ * @param scope The scope of the application
+ * @param shard The shard to remove
+ * @param directedEdgeMeta The edge meta data to use
* @return
*/
- public MutationBatch removeEdgeMeta(ApplicationScope scope, Id nodeId, long shard, String... types);
+ public MutationBatch removeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 09436ac..10f3794 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Iterator;
-import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -32,62 +31,31 @@ public interface EdgeShardStrategy {
/**
* Get the shard key used for writing this shard. CUD operations should use this
*
- * @param scope The application's scope
- * @param rowKeyId The id being used in the row key
+ * @param scope The application's scope]
* @param timestamp The timestamp on the edge
- * @param types The types in the edge
*/
- public long getWriteShard(final ApplicationScope scope, final Id rowKeyId, final long timestamp, final String... types );
+ public ShardEntryGroup getWriteShards( final ApplicationScope scope, final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
/**
* Get the iterator of all shards for this entity
*
* @param scope The application scope
- * @param rowKeyId The id used in the row key
* @param maxTimestamp The max timestamp to use
- * @param types the types in the edge
*/
- public Iterator<Long> getReadShards(final ApplicationScope scope,final Id rowKeyId, final long maxTimestamp,final String... types );
+ public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
/**
* Increment our count meta data by the passed value. Can be a positive or a negative number.
* @param scope The scope in the application
- * @param rowKeyId The row key id
- * @param shardId The shard id to use
+ * @param shard The shard to use
* @param count The amount to increment or decrement
- * @param types The types
+ * @param directedEdgeMeta The edge meta data to use
* @return
*/
- public void increment(final ApplicationScope scope,final Id rowKeyId, long shardId, long count ,final String... types );
+ public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
- /**
- * Get the name of the column family for getting source nodes
- */
- public String getSourceNodeCfName();
-
- /**
- * Get the name of the column family for getting target nodes
- */
- public String getTargetNodeCfName();
-
-
- /**
- * Get the name of the column family for getting source nodes with a target type
- */
- public String getSourceNodeTargetTypeCfName();
-
- /**
- * Get the name of the column family for getting target nodes with a source type
- */
- public String getTargetNodeSourceTypeCfName();
-
- /**
- * Get the Graph edge versions cf
- * @return
- */
- public String getGraphEdgeVersions();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 1097ced..75bdc74 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -36,27 +36,31 @@ public interface NodeShardAllocation {
/**
- * Get all shards for the given info. If none exist, a default shard should be allocated
+ * Get all shards for the given info. If none exist, a default shard should be allocated. The nodeId is the source node
*
* @param scope The application scope
- * @param nodeId
* @param maxShardId The max value to start seeking from. Values <= this will be returned if specified
- * @param edgeTypes
+ * @param directedEdgeMeta The directed edge metadata to use
* @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated
*/
- public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId,
- final String... edgeTypes );
+ public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta );
/**
* Audit our highest shard for it's maximum capacity. If it has reached the max capacity <=, it will allocate a new shard
*
* @param scope The app scope
- * @param nodeId The node id
- * @param edgeType The edge types
+ * @param shardEntryGroup The shard operator to use
+ * @param directedEdgeMeta The directed edge metadata to use
* @return True if a new shard was allocated
*/
- public boolean auditMaxShard(final ApplicationScope scope, final Id nodeId, final String... edgeType);
+ public boolean auditShard(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta);
+
+ /**
+ * Get the minimum time that a created shard should be considered "new", and be used for both new writes and reads
+ * @return
+ */
+ public long getMinTime();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
index 90503d4..dbe645f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
@@ -34,24 +34,34 @@ public interface NodeShardApproximation {
* Increment the shard Id the specified amount
*
* @param scope The scope
- * @param nodeId The node id
- * @param shardId The shard id
- * @param count
- * @param edgeType The edge type
+ * @param shard The shard to use
+ * @param count The count to increment
+ * @param directedEdgeMeta The directed edge meta data to use
*/
- public void increment( final ApplicationScope scope, final Id nodeId, final long shardId, final long count,
- final String... edgeType );
+ public void increment( final ApplicationScope scope, final Shard shard,
+ final long count, final DirectedEdgeMeta directedEdgeMeta );
/**
* Get the approximation of the number of unique items
+ *
+ * @param scope The scope
+ * @param directedEdgeMeta The directed edge meta data to use
*/
- public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
- final String... edgeType );
+ public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta );
/**
- * Flush the current counters in the Approximation
+ * Flush the current counters in the Approximation. Will return immediately after the flush. You can then use flushPending
+ * to check the state.
*/
- public void flush();
+ public void beginFlush();
+
+ /**
+ * Return true if there is data to be flushed
+ * @return
+ */
+ public boolean flushPending();
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 667fdbf..58924af 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -20,7 +20,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Iterator;
-import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -35,21 +34,22 @@ public interface NodeShardCache {
/**
- * Get the time meta data for the given node
- * @param nodeId
+ * Get the shard for the given timestamp
+ * @param scope The scope for the application
* @param timestamp The time to select the slice for.
- * @param edgeType
+ * @param directedEdgeMeta The directed edge meta data
*/
- public long getSlice(final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType);
+ public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope,
+ final long timestamp, final DirectedEdgeMeta directedEdgeMeta );
/**
- * Get an iterator of all versions <= the version
- * @param scope
- * @param nodeId
+ * Get an iterator of all versions <= the version for iterating shard entry sets. The iterator of groups will be ordered
+ * highest to lowest. I.E range scanning from Long.MAX_VALUE to 0
+ * @param scope The scope for the application
* @param maxTimestamp The highest timestamp
- * @param edgeType
+ * @param directedEdgeMeta The directed edge meta data
* @return
*/
- public Iterator<Long> getVersions(final ApplicationScope scope, final Id nodeId, final long maxTimestamp, final String... edgeType);
+ public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
new file mode 100644
index 0000000..62c2f11
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeType.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.HashMap;
+
+
+/**
+ * The node type of the source or target
+ */
+public enum NodeType {
+ SOURCE( 0 ),
+ TARGET( 1 );
+
+ private final int ordinal;
+
+
+ private NodeType( final int ordinal ) {this.ordinal = ordinal;}
+
+
+ public int getStorageValue() {
+ return ordinal;
+ }
+
+
+ /**
+ * Get the type from the storageValue value
+ * @param storageValue
+ * @return
+ */
+ public static NodeType get(final int storageValue){
+ return types.get( storageValue );
+ }
+
+
+ /**
+ * Internal map and initialization for fast access
+ */
+ private static final HashMap<Integer, NodeType> types;
+
+
+ static{
+ types = new HashMap<>();
+
+ for(NodeType type: NodeType.values()){
+ types.put( type.getStorageValue(), type );
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
new file mode 100644
index 0000000..9895978
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Class that represents an edge row key
+ */
+public class RowKey {
+ public final Id nodeId;
+ public final long[] hash;
+ public final long shardId;
+
+
+ /**
+ * Create a row key with the node and the edgeType
+ */
+ public RowKey( Id nodeId, String edgeType, final long shardId ) {
+ this( nodeId, EdgeHasher.createEdgeHash( edgeType ), shardId );
+ }
+
+
+ /**
+ * Create a new row key with the hash, should only be used in deserialization or internal callers.
+ */
+ public RowKey( Id nodeId, long[] hash, final long shardId ) {
+ this.nodeId = nodeId;
+ this.hash = hash;
+ this.shardId = shardId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
new file mode 100644
index 0000000..5705eb3
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The row key with the additional type
+ */
+public class RowKeyType extends RowKey {
+
+ /**
+ * Create a row key with the node id in the row key, the edge type, and the type from the typeid
+ *
+ * @param nodeId The node id in the row key
+ * @param edgeType The type of the edge
+ * @param typeId The type of hte id
+ */
+ public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shardId ) {
+ this( nodeId, edgeType, typeId.getType(), shardId );
+ }
+
+
+ /**
+ * Create a row key with the node id in the row key, the edge type, adn the target type from the id
+ */
+ public RowKeyType( final Id nodeId, final String edgeType, final String targetType, final long shardId ) {
+ super( nodeId, EdgeHasher.createEdgeHash( edgeType, targetType ), shardId );
+ }
+
+
+ /**
+ * Internal use in de-serializing. Should only be used in this case or by internal callers
+ */
+ public RowKeyType( final Id nodeId, final long[] hash, final long shardId ) {
+ super( nodeId, hash, shardId );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
new file mode 100644
index 0000000..38fe51c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+public class Shard implements Comparable<Shard> {
+
+ private final long shardIndex;
+ private final long createdTime;
+ private final boolean compacted;
+
+
+ public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
+ this.shardIndex = shardIndex;
+ this.createdTime = createdTime;
+ this.compacted = compacted;
+ }
+
+
+ /**
+ * Get the long shard index
+ */
+ public long getShardIndex() {
+ return shardIndex;
+ }
+
+
+ /**
+ * Get the timestamp in epoch millis this shard was created
+ */
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+
+ /**
+ * Return true if this shard has been compacted
+ */
+ public boolean isCompacted() {
+ return compacted;
+ }
+
+
+ /**
+ * Compare the shards based on the timestamp first, then the created time second
+ */
+ @Override
+ public int compareTo( final Shard o ) {
+ if ( o == null ) {
+ return 1;
+ }
+
+ if ( shardIndex > o.shardIndex ) {
+ return 1;
+ }
+
+ else if ( shardIndex == o.shardIndex ) {
+ if ( createdTime > o.createdTime ) {
+ return 1;
+ }
+ else if ( createdTime < o.createdTime ) {
+ return -1;
+ }
+
+ else {
+
+ //kind of arbitrary compacted takes precedence
+ if ( compacted && !o.compacted ) {
+ return 1;
+ }
+
+ else if ( !compacted && o.compacted ){
+ return -1;
+ }
+
+
+ }
+ return 0;
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final Shard shard = ( Shard ) o;
+
+ if ( compacted != shard.compacted ) {
+ return false;
+ }
+ if ( createdTime != shard.createdTime ) {
+ return false;
+ }
+ if ( shardIndex != shard.shardIndex ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
+ result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+ result = 31 * result + ( compacted ? 1 : 0 );
+ return result;
+ }
+
+
+ @Override
+ public String toString() {
+ return "Shard{" +
+ "shardIndex=" + shardIndex +
+ ", createdTime=" + createdTime +
+ "} ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
new file mode 100644
index 0000000..6e82cbc
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.TreeSet;
+
+import com.google.common.base.Preconditions;
+
+
+/**
+ * There are cases where we need to read or write to more than 1 shard. This object encapsulates a set of shards that
+ * should be written to and read from. All reads should combine the data sets from all shards in the group, and writes
+ * should be written to each shard. Once the shard can safely be compacted a background process should be triggered to
+ * remove additional shards and make seeks faster. This multiread/write should only occur during the time period of the
+ * delta (in milliseconds), after which the next read will asynchronously compact the shards into a single shard.
+ */
+public class ShardEntryGroup {
+
+
+ private List<Shard> shards;
+
+ private final long delta;
+
+ private long maxCreatedTime;
+
+ private Shard compactionTarget;
+
+
+ /**
+ * The max delta we accept in milliseconds for create time to be considered a member of this group
+ */
+ public ShardEntryGroup( final long delta ) {
+ Preconditions.checkArgument( delta > 0, "delta must be greater than 0" );
+ this.delta = delta;
+ this.shards = new ArrayList<>();
+ this.maxCreatedTime = 0;
+ }
+
+
+ /**
+ * Only add a shard if it is within the rules require to meet a group. The rules are outlined below.
+ *
+ * Case 1) First shard in the group, always added
+ *
+ * Case 2) Shard is unmerged, it should be included with it's peers since other nodes may not have it yet
+ *
+ * Case 3) The list contains only non compacted shards, and this is last and and merged. It is considered a lower
+ * bound
+ */
+ public boolean addShard( final Shard shard ) {
+
+ Preconditions.checkNotNull( "shard cannot be null", shard );
+
+ final int size = shards.size();
+
+ if ( size == 0 ) {
+ addShardInternal( shard );
+ return true;
+ }
+
+ final Shard minShard = shards.get( size - 1 );
+
+ Preconditions.checkArgument( minShard.compareTo( shard ) > 0, "shard must be less than the current max" );
+
+ //shard is not compacted, or it's predecessor isn't, we should include it in this group
+ if ( !minShard.isCompacted() ) {
+ addShardInternal( shard );
+ return true;
+ }
+
+
+ return false;
+ }
+
+
+ /**
+ * Add the shard and set the min created time
+ */
+ private void addShardInternal( final Shard shard ) {
+ shards.add( shard );
+
+ maxCreatedTime = Math.max( maxCreatedTime, shard.getCreatedTime() );
+
+ //we're changing our structure, unset the compaction target
+ compactionTarget = null;
+ }
+
+
+ /**
+ * Return the minum shard based on time indexes
+ */
+ public Shard getMinShard() {
+ final int size = shards.size();
+
+ if ( size < 1 ) {
+ return null;
+ }
+
+ return shards.get( size - 1 );
+ }
+
+
+ /**
+ * Get the entries that we should read from.
+ */
+ public Collection<Shard> getReadShards() {
+ return shards;
+ }
+
+
+ /**
+ * Get the entries, with the max shard time being first. We write to all shards until they're migrated
+ */
+ public Collection<Shard> getWriteShards( long currentTime ) {
+
+ /**
+ * The shards in this set can be combined, we should only write to the compaction target to avoid
+ * adding data to other shards
+ */
+ if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
+ return Collections.singleton( getCompactionTarget() );
+ }
+
+
+ return shards;
+ }
+
+
+ /**
+ * Return true if we have a pending compaction
+ */
+ public boolean isCompactionPending() {
+ return !isTooSmallToCompact();
+ }
+
+
+ /**
+ * Get the shard all compactions should write to. Null indicates we cannot find a shard that could be used as a
+ * compaction target. Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
+ * first to ensure all criteria are met before initiating compaction
+ */
+ public Shard getCompactionTarget() {
+
+ if ( compactionTarget != null ) {
+ return compactionTarget;
+ }
+
+
+ //we have < 2 shards, we can't compact
+ if ( isTooSmallToCompact() ) {
+ return null;
+ }
+
+
+ final int lastIndex = shards.size() - 1;
+
+ final Shard last = shards.get( lastIndex );
+
+ //Our oldest isn't compacted. As a result we have no "bookend" to delimit this entry group. Therefore we
+ // can't compact
+ if ( !last.isCompacted() ) {
+ return null;
+ }
+
+ //Start seeking from the end of our group. The first shard we encounter that is not compacted is our
+ // compaction target
+ //NOTE: This does not mean we can compact, rather it's just an indication that we have a target set.
+ for ( int i = lastIndex - 1; i > -1; i-- ) {
+ final Shard compactionCandidate = shards.get( i );
+
+
+ if ( !compactionCandidate.isCompacted() ) {
+ compactionTarget = compactionCandidate;
+ break;
+ }
+ }
+
+ return compactionTarget;
+ }
+
+
+ /**
+ * Return the number of entries in this shard group
+ */
+ public int entrySize() {
+ return shards.size();
+ }
+
+
+ /**
+ * Return true if there are not enough elements in this entry group to consider compaction
+ */
+ private boolean isTooSmallToCompact() {
+ return shards.size() < 2;
+ }
+
+
+ /**
+ * Returns true if the newest created shard is path the currentTime - delta
+ *
+ * @param currentTime The current system time in milliseconds
+ *
+ * @return True if these shards can safely be combined into a single shard, false otherwise
+ */
+ public boolean shouldCompact( final long currentTime ) {
+
+ /**
+ * We don't have enough shards to compact, ignore
+ */
+ return getCompactionTarget() != null
+
+
+ /**
+ * If something was created within the delta time frame, not everyone may have seen it due to
+ * cache refresh, we can't compact yet.
+ */
+
+ && currentTime - delta > maxCreatedTime;
+ }
+
+
+ /**
+ * Return true if this shard can be deleted AFTER all of the data in it has been moved
+ */
+ public boolean canBeDeleted( final Shard shard ) {
+ //if we're a neighbor shard (n-1) or the target compaction shard, we can't be deleted
+ //we purposefully use shard index comparison over .equals here, since 2 shards might have the same index with
+ // different timestamps
+ // (unlikely but could happen)
+
+ final Shard compactionTarget = getCompactionTarget();
+
+
+ return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
+ .getShardIndex() );
+ }
+
+
+ /**
+ * Helper method to create a shard entry group with a single shard
+ */
+ public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
+ ShardEntryGroup group = new ShardEntryGroup( delta );
+ group.addShard( shard );
+
+ return group;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
new file mode 100644
index 0000000..13c5596
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import rx.Observable;
+
+
+/**
+ * Defines tasks for running compaction
+ *
+ *
+ */
+public interface ShardGroupCompaction {
+
+
+ /**
+ * Execute the compaction task. Will return the number of edges that have
+ * @param group The shard entry group to compact
+ * @return The number of edges that are now compacted into the target shard
+ */
+ public Observable<Integer> compact(ShardEntryGroup group);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
new file mode 100644
index 0000000..895c6d6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardOperator.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+
+
+/**
+ *
+ * Strategy for performing shard operations based on their shard types
+ *
+ */
+public interface ShardOperator {
+
+ /**
+ * Get the edges for this operator. Search
+ * @return
+ */
+ public Iterator<MarkedEdge> getEdges(final ApplicationScope scope, final Shard shard, final long maxValue);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
new file mode 100644
index 0000000..d1ad18d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+
+import com.netflix.astyanax.MutationBatch;
+
+
+/**
+ * Performs serialization on the shards
+ */
+public interface ShardedEdgeSerialization {
+
+ /**
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp to use
+ */
+ MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ UUID timestamp );
+
+ /**
+ * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp of the uuid
+ */
+ MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ UUID timestamp );
+
+
+ /**
+ * Search for all versions of this edge < the search version. Returns all versions
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdge search, Iterator<ShardEntryGroup> shards );
+
+ /**
+ * Get an iterator of all edges by edge type originating from source node
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+ /**
+ * Get an iterator of all edges by edge type originating from source node. Also filters by target node id type
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByIdType search, Iterator<ShardEntryGroup> shards );
+
+ /**
+ * Get an iterator of all edges by edge type pointing to the target node. Returns all versions
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+
+
+ /**
+ * Get an iterator of all edges by edge type pointing to the target node. Also uses the source id type to limit the
+ * results
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The application scope
+ * @param search The search criteria
+ * @param shards The shards to iterate when searching
+ */
+ Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ SearchByIdType search, Iterator<ShardEntryGroup> shards );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 4318200..f5666a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -24,6 +24,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.base.Preconditions;
+
/**
* This class is synchronized for addition. It is meant to be used across multiple threads
@@ -35,7 +37,7 @@ public class Counter {
private final AtomicLong invokeCounter;
/**
- * Pointer to our "current" counter map. We flush this when time expires or we hit our count
+ * Pointer to our "current" counter map. We beginFlush this when time expires or we hit our count
*/
private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
@@ -94,6 +96,10 @@ public class Counter {
* @param other
*/
public void merge(final Counter other){
+
+ Preconditions.checkNotNull(other, "other cannot be null");
+ Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
+
for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
add(entry.getKey(), entry.getValue().get());
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 7dc763f..b4d88d5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -20,15 +20,23 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.model.entity.Id;
import com.netflix.astyanax.MutationBatch;
@@ -40,10 +48,12 @@ import rx.schedulers.Schedulers;
/**
* Implementation for doing edge approximation based on counters. Uses a guava loading cache to load values from
- * cassandra, and flush them on cache eviction.
+ * cassandra, and beginFlush them on cache eviction.
*/
public class NodeShardApproximationImpl implements NodeShardApproximation {
+ private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
+
/**
* Read write locks to ensure we atomically swap correctly
*/
@@ -63,7 +73,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* The counter that is currently in process of flushing to Cassandra. Can be null
*/
- private volatile Counter flushPending;
+ private final BlockingQueue<Counter> flushQueue;
+
+ private final FlushWorker worker;
/**
@@ -77,15 +89,22 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
this.nodeShardCounterSerialization = nodeShardCounterSerialization;
this.timeService = timeService;
this.currentCounter = new Counter();
+ this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
+
+ this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
+
+ Schedulers.newThread().createWorker().schedule( worker );
+
}
@Override
- public void increment( final ApplicationScope scope, final Id nodeId, final long shardId, final long count,
- final String... edgeType ) {
+ public void increment(
+ final ApplicationScope scope, final Shard shard,
+ final long count, final DirectedEdgeMeta directedEdgeMeta ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
readLock.lock();
@@ -102,10 +121,9 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId,
- final String... edgeType ) {
+ public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
- final ShardKey key = new ShardKey( scope, nodeId, shardId, edgeType );
+ final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
readLock.lock();
@@ -115,9 +133,6 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
try {
count = currentCounter.get( key );
- if ( flushPending != null ) {
- count += flushPending.get( key );
- }
}
finally {
readLock.unlock();
@@ -130,78 +145,121 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
@Override
- public void flush() {
+ public void beginFlush() {
writeLockLock.lock();
try {
- flushPending = currentCounter;
- currentCounter = new Counter();
+
+ final boolean queued = flushQueue.offer( currentCounter );
+
+ /**
+ * We were able to q the beginFlush, swap it
+ */
+ if ( queued ) {
+ currentCounter = new Counter();
+ }
}
finally {
writeLockLock.unlock();
}
+ }
- //copy to the batch outside of the command for performance
- final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
+ @Override
+ public boolean flushPending() {
+ return flushQueue.size() > 0 || worker.isFlushing();
+ }
- /**
- * Execute the command in hystrix to avoid slamming cassandra
- */
- new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
- @Override
- protected Void run() throws Exception {
- /**
- * Execute the batch asynchronously
- */
- batch.execute();
+ /**
+ * Check if we need to beginFlush. If we do, perform the beginFlush
+ */
+ private void checkFlush() {
- return null;
- }
+ //there's no beginFlush pending and we're past the timeout or count
+ if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
+ || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
+ beginFlush();
+ }
+ }
- @Override
- protected Object getFallback() {
- //we've failed to mutate. Merge this count back into the current one
- currentCounter.merge( flushPending );
+ /**
+ * Worker that will take from the queue
+ */
+ private static class FlushWorker implements Action0 {
- return null;
- }
- }.execute();
+ private final BlockingQueue<Counter> counterQueue;
+ private final NodeShardCounterSerialization nodeShardCounterSerialization;
- writeLockLock.lock();
+ private volatile Counter rollUp;
- try {
- flushPending = null;
- }
- finally {
- writeLockLock.unlock();
+
+ private FlushWorker( final BlockingQueue<Counter> counterQueue,
+ final NodeShardCounterSerialization nodeShardCounterSerialization ) {
+ this.counterQueue = counterQueue;
+ this.nodeShardCounterSerialization = nodeShardCounterSerialization;
}
- }
- /**
- * Check if we need to flush. If we do, perform the flush
- */
- private void checkFlush() {
+ @Override
+ public void call() {
- //there's no flush pending and we're past the timeout or count
- if ( flushPending == null && (
- currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
- || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) ) {
+ while ( true ) {
+ /**
+ * Block taking the first element. Once we take this, batch drain and roll up the rest
+ */
+
+ try {
+ rollUp = null;
+ rollUp = counterQueue.take();
+ }
+ catch ( InterruptedException e ) {
+ LOG.error( "Unable to read from counter queue", e );
+ throw new RuntimeException( "Unable to read from counter queue", e );
- /**
- * Fire the flush action asynchronously
- */
- Schedulers.immediate().createWorker().schedule( new Action0() {
- @Override
- public void call() {
- flush();
}
- } );
+
+
+
+
+ //copy to the batch outside of the command for performance
+ final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
+
+ /**
+ * Execute the command in hystrix to avoid slamming cassandra
+ */
+ new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+
+ @Override
+ protected Void run() throws Exception {
+ batch.execute();
+
+ return null;
+ }
+
+
+ @Override
+ protected Object getFallback() {
+ //we've failed to mutate. Merge this count back into the current one
+ counterQueue.offer( rollUp );
+
+ return null;
+ }
+ }.execute();
+ }
+
+ }
+
+
+ /**
+ * Return true if we're in the process of flushing
+ * @return
+ */
+ public boolean isFlushing(){
+ return rollUp != null;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
index 41eb525..4b05401 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
@@ -27,7 +27,7 @@ import com.netflix.astyanax.MutationBatch;
/**
* Serialization for flushing and reading counters
*/
-public interface NodeShardCounterSerialization extends Migration {
+public interface NodeShardCounterSerialization extends Migration {
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e9d652dd/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index da318bf..6b99e93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -29,15 +29,20 @@ import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKey;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -48,20 +53,23 @@ import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.serializers.LongSerializer;
-
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+import com.netflix.astyanax.serializers.BooleanSerializer;
@Singleton
public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
+ private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
+
/**
* Edge shards
*/
- private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
+ private static final MultiTennantColumnFamily<ApplicationScope, ShardKey, Boolean> EDGE_SHARD_COUNTS =
new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
- new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
+ new OrganizationScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
protected final Keyspace keyspace;
@@ -92,12 +100,11 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
final ShardKey key = entry.getKey();
final long value = entry.getValue().get();
- final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
- batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( key.getShardId(), value );
+ batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
}
@@ -108,14 +115,12 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
@Override
public long getCount( final ShardKey key ) {
- final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
-
- final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+ final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope, key );
try {
- OperationResult<Column<Long>> column =
- keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( key.getShardId() ).execute();
+ OperationResult<Column<Boolean>> column =
+ keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
return column.getResult().getLongValue();
}
@@ -133,7 +138,50 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
return Collections.singleton(
new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
- ColumnTypes.LONG_TYPE_REVERSED, CounterColumnType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+ ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
}
+
+
+
+ private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
+
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+ private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
+
+ ID_SER.toComposite( builder, key.scope.getApplication() );
+
+ EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
+
+ builder.addLong( key.shard.getShardIndex() );
+
+ builder.addLong( key.shard.getCreatedTime() );
+ }
+
+
+ @Override
+ public ShardKey fromComposite( final CompositeParser composite ) {
+
+ final Id applicationId = ID_SER.fromComposite( composite );
+
+ final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
+
+ final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
+
+ final long shardIndex = composite.readLong();
+
+ final long shardCreatedTime = composite.readLong();
+
+ return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
+ }
+
+
+ }
+
}