You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:14:59 UTC
[06/11] Finished refactor. Need to continue on shard allocation
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
deleted file mode 100644
index 12b1d7c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKey.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- * Used to store row keys by sourceId, targetId and edgeType
- */
-public class EdgeRowKey {
- public final Id nodeId;
- public final String[] edgeTypes;
-
-
- public EdgeRowKey( final Id nodeId, final String[] edgeTypes ) {
- this.nodeId = nodeId;
- this.edgeTypes = edgeTypes;
- }
-
-
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
index be97f95..90b264c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
@@ -21,8 +21,8 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
@@ -31,9 +31,8 @@ import com.netflix.astyanax.model.CompositeParser;
/**
* Class to perform serialization for row keys from edges
*/
-public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
- private static final EdgeRowKeySerializer INSTANCE = new EdgeRowKeySerializer();
+public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
@@ -42,13 +41,10 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
//add the row id to the composite
- ID_SER.toComposite( builder, key.nodeId );
-
- builder.addInteger( key.edgeTypes.length );
-
- for(String type: key.edgeTypes){
- builder.addString( type );
- }
+ ID_SER.toComposite( builder, key.sourceId );
+ builder.addString( key.edgeType );
+ ID_SER.toComposite( builder, key.targetId );
+ builder.addLong( key.shardId );
}
@@ -56,26 +52,12 @@ public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey
public EdgeRowKey fromComposite( final CompositeParser composite ) {
final Id sourceId = ID_SER.fromComposite( composite );
+ final String edgeType = composite.readString();
+ final Id targetId = ID_SER.fromComposite( composite );
+ final long shard = composite.readLong();
-
- final int length = composite.readInteger();
-
- String[] types = new String[length];
-
- for(int i = 0; i < length; i++){
- types[i] = composite.readString();
- }
-
- return new EdgeRowKey( sourceId, types );
-
+ return new EdgeRowKey( sourceId, edgeType, targetId, shard );
}
-
- /**
- * Get the singleton serializer
- */
- public static EdgeRowKeySerializer get() {
- return INSTANCE;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
new file mode 100644
index 0000000..3644210
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -0,0 +1,124 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
+
+import com.google.common.base.Optional;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Searcher to be used when performing the search. Performs I/O transformation as well as parsing for the iterator. If
+ * there are more row keys available to seek, the iterator will return true
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Iterator<ScopedRowKey<ApplicationScope, R>> {
+
+ protected final Optional<Edge> last;
+ protected final long maxTimestamp;
+ protected final ApplicationScope scope;
+ protected final Iterator<ShardEntries> shards;
+
+
+ protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
+ final Iterator<ShardEntries> shards ) {
+ this.scope = scope;
+ this.maxTimestamp = maxTimestamp;
+ this.last = last;
+ this.shards = shards;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ return shards.hasNext();
+ }
+
+
+ @Override
+ public ScopedRowKey<ApplicationScope, R> next() {
+ /**
+ * Todo, multi scan
+ */
+ return ScopedRowKey
+ .fromKey( scope, generateRowKey( shards.next().getEntries().iterator().next().getShardIndex() ) );
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Set the range on a search
+ */
+ public void setRange( final RangeBuilder builder ) {
+
+ //set our start range since it was supplied to us
+ if ( last.isPresent() ) {
+ C sourceEdge = getStartColumn( last.get() );
+
+
+ builder.setStart( sourceEdge, getSerializer() );
+ }
+ else {
+
+
+ }
+ }
+
+
+ public boolean hasPage() {
+ return last.isPresent();
+ }
+
+
+ @Override
+ public T parseColumn( final Column<C> column ) {
+ final C edge = column.getName();
+
+ return createEdge( edge, column.getBooleanValue() );
+ }
+
+
+ /**
+ * Get the column's serializer
+ */
+ protected abstract Serializer<C> getSerializer();
+
+
+ /**
+ * Create a row key for this search to use
+ *
+ * @param shard The shard to use in the row key
+ */
+ protected abstract R generateRowKey( final long shard );
+
+
+ /**
+ * Set the start column to begin searching from. The last is provided
+ */
+ protected abstract C getStartColumn( final Edge last );
+
+
+ /**
+ * Create an edge to return to the user based on the directed edge provided
+ *
+ * @param column The column name
+ * @param marked The marked flag in the column value
+ */
+ protected abstract T createEdge( final C column, final boolean marked );
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
new file mode 100644
index 0000000..d93f679
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.nio.ByteBuffer;
+
+import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+/**
+ * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization. Only the target
+ * Id and version.
+ */
+public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
+
+ private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+
+ @Override
+ public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
+
+ DynamicComposite composite = new DynamicComposite();
+
+ composite.addComponent( edge.timestamp, LONG_SERIALIZER );
+
+ ID_COL_SERIALIZER.toComposite( composite, edge.id );
+
+ return composite.serialize();
+ }
+
+
+ @Override
+ public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
+ DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+ Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
+
+
+ //return the version
+ final long timestamp = composite.get( 0, LONG_SERIALIZER );
+
+
+ //parse our id
+ final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
+
+
+ return new DirectedEdge( id, timestamp );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 5b7c901..745e02a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -66,7 +66,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
*/
private static final MultiTennantColumnFamily<ApplicationScope, DirectedRowKey, Long> EDGE_SHARDS =
new MultiTennantColumnFamily<>( "Edge_Shards",
- new OrganizationScopedRowKeySerializer<>( new DirectedEdgeRowKeySerializer() ), LongSerializer.get() );
+ new OrganizationScopedRowKeySerializer<>( new DirectedEdgeRowKeySerializer() ),
+ LongSerializer.get() );
private static final byte HOLDER = 0x00;
@@ -239,8 +240,6 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
return NodeType.TARGET;
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index c0a44a9..20cb921 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
@@ -55,17 +56,21 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private static final MinShardTimeComparator MIN_SHARD_TIME_COMPARATOR = new MinShardTimeComparator();
private final EdgeShardSerialization edgeShardSerialization;
-// private final NodeShardCounterSerialization edgeShardCounterSerialization;
+ private final EdgeSerialization edgeSerialization;
+ // private final NodeShardCounterSerialization edgeShardCounterSerialization;
private final NodeShardApproximation nodeShardApproximation;
private final TimeService timeService;
private final GraphFig graphFig;
private final Keyspace keyspace;
+
@Inject
public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
+ final EdgeSerialization edgeSerialization,
final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
final GraphFig graphFig, final Keyspace keyspace ) {
this.edgeShardSerialization = edgeShardSerialization;
+ this.edgeSerialization = edgeSerialization;
this.nodeShardApproximation = nodeShardApproximation;
this.timeService = timeService;
this.graphFig = graphFig;
@@ -74,11 +79,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final Optional<Shard> maxShardId,
- final String... edgeTypes ) {
+ public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+ final Optional<Shard> maxShardId, final String... edgeTypes ) {
final Iterator<Shard> existingShards =
- edgeShardSerialization.getEdgeMetaData( scope, nodeId,nodeType, maxShardId, edgeTypes );
+ edgeShardSerialization.getEdgeMetaData( scope, nodeId, nodeType, maxShardId, edgeTypes );
final PushbackIterator<Shard> pushbackIterator = new PushbackIterator( existingShards );
@@ -95,7 +100,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final Shard shard = pushbackIterator.next();
//we're done, our current time uuid is greater than the value stored
- if ( shard.getCreatedTime() < minConflictTime ) {
+ if ( shard.getCreatedTime() < minConflictTime ) {
//push it back into the iterator
pushbackIterator.pushback( shard );
break;
@@ -106,7 +111,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
//clean up our future
- Collections.sort(futures, MIN_SHARD_TIME_COMPARATOR);
+ Collections.sort( futures, MIN_SHARD_TIME_COMPARATOR );
//we have more than 1 future value, we need to remove it
@@ -114,10 +119,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
MutationBatch cleanup = keyspace.prepareMutationBatch();
//remove all futures except the last one, it is the only value we shouldn't lazy remove
- for ( int i = 1; i < futures.size() ; i++ ) {
+ for ( int i = 1; i < futures.size(); i++ ) {
final Shard toRemove = futures.get( i );
- final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId, nodeType, toRemove.getShardIndex(), edgeTypes );
+ final MutationBatch batch = edgeShardSerialization
+ .removeEdgeMeta( scope, nodeId, nodeType, toRemove.getShardIndex(), edgeTypes );
cleanup.mergeShallow( batch );
}
@@ -131,7 +137,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
- final int futuresSize = futures.size();
+ final int futuresSize = futures.size();
if ( futuresSize > 0 ) {
pushbackIterator.pushback( futures.get( 0 ) );
@@ -141,8 +147,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
* Nothing to iterate, return an iterator with 0.
*/
- if(!pushbackIterator.hasNext()){
- pushbackIterator.pushback( new Shard(0l, 0l) );
+ if ( !pushbackIterator.hasNext() ) {
+ pushbackIterator.pushback( new Shard( 0l, 0l ) );
}
return pushbackIterator;
@@ -150,7 +156,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId,final NodeType nodeType, final String... edgeType ) {
+ public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+ final String... edgeType ) {
final Iterator<Shard> maxShards = getShards( scope, nodeId, nodeType, Optional.<Shard>absent(), edgeType );
@@ -169,23 +176,32 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
- final long count = nodeShardApproximation.getCount( scope, nodeId, nodeType, maxShard.getShardIndex(), edgeType );
+ final long count =
+ nodeShardApproximation.getCount( scope, nodeId, nodeType, maxShard.getShardIndex(), edgeType );
if ( count < graphFig.getShardSize() ) {
return false;
}
+
+ /**
+ * TODO, use the EdgeShardStrategy and ShardEdgeSerialization to audit this shard
+ */
+
+ //get the max edge, in this shard, and write it.
+
+
//try to get a lock here, and fail if one isn't present
-// final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
-//
-//
-// try {
-// this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
-// }
-// catch ( ConnectionException e ) {
-// throw new GraphRuntimeException( "Unable to write the new edge metadata" );
-// }
+ // final long newShardTime = timeService.getCurrentTime() + graphFig.getShardCacheTimeout() * 2;
+ //
+ //
+ // try {
+ // this.edgeShardSerialization.writeEdgeMeta( scope, nodeId, newShardTime, edgeType ).execute();
+ // }
+ // catch ( ConnectionException e ) {
+ // throw new GraphRuntimeException( "Unable to write the new edge metadata" );
+ // }
return true;
@@ -194,7 +210,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
public long getMinTime() {
- return timeService.getCurrentTime() - (2 * graphFig.getShardCacheTimeout());
+ return timeService.getCurrentTime() - ( 2 * graphFig.getShardCacheTimeout() );
}
@@ -203,16 +219,15 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
private static final class MinShardTimeComparator implements Comparator<Shard> {
- @Override
- public int compare( final Shard s1, final Shard s2 ) {
- int result = Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
-
- if(result == 0){
- result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
- }
+ @Override
+ public int compare( final Shard s1, final Shard s2 ) {
+ int result = Long.compare( s1.getCreatedTime(), s2.getCreatedTime() );
- return result;
+ if ( result == 0 ) {
+ result = Long.compare( s1.getShardIndex(), s2.getShardIndex() );
}
- }
+ return result;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index a0bb0ef..73e8d4f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -47,7 +47,6 @@ import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
-
/**
* Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the
* shard, it will need to be searched via cassandra.
@@ -67,8 +66,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
*/
@Inject
public NodeShardCacheImpl( final NodeShardAllocation nodeShardAllocation, final GraphFig graphFig ) {
- Preconditions.checkNotNull(nodeShardAllocation, "nodeShardAllocation is required");
- Preconditions.checkNotNull(graphFig, "consistencyFig is required");
+ Preconditions.checkNotNull( nodeShardAllocation, "nodeShardAllocation is required" );
+ Preconditions.checkNotNull( graphFig, "consistencyFig is required" );
this.nodeShardAllocation = nodeShardAllocation;
this.graphFig = graphFig;
@@ -81,7 +80,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
public void propertyChange( final PropertyChangeEvent evt ) {
final String propertyName = evt.getPropertyName();
- if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName.equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+ if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
+ .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
updateCache();
}
}
@@ -95,8 +95,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long timestamp,
- final String... edgeType ) {
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+ final long timestamp, final String... edgeType ) {
final CacheKey key = new CacheKey( scope, nodeId, nodeType, edgeType );
@@ -121,27 +121,28 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType, final long maxTimestamp,
- final String... edgeType ) {
+ public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final NodeType nodeType,
+ final long maxTimestamp, final String... edgeType ) {
final CacheKey key = new CacheKey( scope, nodeId, nodeType, edgeType );
- CacheEntry entry;
+ CacheEntry entry;
- try {
- entry = this.graphs.get( key );
- }
- catch ( ExecutionException e ) {
- throw new GraphRuntimeException( "Unable to load shard key for graph", e );
- }
+ try {
+ entry = this.graphs.get( key );
+ }
+ catch ( ExecutionException e ) {
+ throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+ }
Iterator<ShardEntries> iterator = entry.getShards( maxTimestamp );
- if(iterator == null){
+ if ( iterator == null ) {
return Collections.<ShardEntries>emptyList().iterator();
}
return iterator;
}
+
/**
* This is a race condition. We could re-init the shard while another thread is reading it. This is fine, the read
* doesn't have to be precise. The algorithm accounts for stale data.
@@ -149,27 +150,31 @@ public class NodeShardCacheImpl implements NodeShardCache {
private void updateCache() {
this.graphs = CacheBuilder.newBuilder().maximumSize( graphFig.getShardCacheSize() )
- .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
- .build( new CacheLoader<CacheKey, CacheEntry>() {
+ .expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
+ .build( new CacheLoader<CacheKey, CacheEntry>() {
- @Override
- public CacheEntry load( final CacheKey key ) throws Exception {
+ @Override
+ public CacheEntry load( final CacheKey key ) throws Exception {
-//
-// /**
-// * Perform an audit in case we need to allocate a new shard
-// */
-// nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types );
-// //TODO, we need to put some sort of upper bounds on this, it could possibly get too large
+ //
+ // /**
+ // * Perform an audit in case we need to allocate
+ // a new shard
+ // */
+ // nodeShardAllocation.auditMaxShard( key.scope,
+ // key.id, key.types );
+ // //TODO, we need to put some sort of upper
+ // bounds on this, it could possibly get too large
- final Iterator<Shard> edges = nodeShardAllocation
- .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(), key.types );
+ final Iterator<Shard> edges = nodeShardAllocation
+ .getShards( key.scope, key.id, key.nodeType, Optional.<Shard>absent(),
+ key.types );
- return new CacheEntry( edges );
- }
- } );
+ return new CacheEntry( edges );
+ }
+ } );
}
@@ -241,7 +246,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
private CacheEntry( final Iterator<Shard> shards ) {
- this.shards = new TreeSet<>( );
+ this.shards = new TreeSet<>();
for ( Shard shard : IterableUtil.wrap( shards ) ) {
this.shards.add( shard );
@@ -254,23 +259,21 @@ public class NodeShardCacheImpl implements NodeShardCache {
*/
public ShardEntries getShardId( final Long seek ) {
return bootstrapEntry();
-// return this.shards.floor( seek );
+ // return this.shards.floor( seek );
}
/**
* Get all shards <= this one in decending order
- * @return
*/
- public Iterator<ShardEntries> getShards( final Long maxShard ){
- return Collections.singleton(bootstrapEntry() ).iterator();
-// return this.shards.headSet(maxShard, true ).descendingIterator();
+ public Iterator<ShardEntries> getShards( final Long maxShard ) {
+ return Collections.singleton( bootstrapEntry() ).iterator();
+ // return this.shards.headSet(maxShard, true ).descendingIterator();
}
- private ShardEntries bootstrapEntry(){
- return new ShardEntries( Collections.singleton( new Shard(0l, 0l) ) );
+
+ private ShardEntries bootstrapEntry() {
+ return new ShardEntries( Collections.singleton( new Shard( 0l, 0l ) ) );
}
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
new file mode 100644
index 0000000..f901699
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
@@ -0,0 +1,64 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+/**
+ * Class to perform serialization for row keys from edges
+ */
+public class RowSerializer implements CompositeFieldSerializer<RowKey> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKey key ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, key.nodeId );
+
+ builder.addLong( key.hash[0] );
+ builder.addLong( key.hash[1] );
+ builder.addLong( key.shardId );
+ }
+
+
+ @Override
+ public RowKey fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final long[] hash = new long[] { composite.readLong(), composite.readLong() };
+ final long shard = composite.readLong();
+
+
+ return new RowKey( id, hash, shard );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
new file mode 100644
index 0000000..6591d72
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowTypeSerializer.java
@@ -0,0 +1,60 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
+import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.model.CompositeBuilder;
+import com.netflix.astyanax.model.CompositeParser;
+
+
+public class RowTypeSerializer implements CompositeFieldSerializer<RowKeyType> {
+
+ private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
+
+
+ @Override
+ public void toComposite( final CompositeBuilder builder, final RowKeyType keyType ) {
+
+ //add the row id to the composite
+ ID_SER.toComposite( builder, keyType.nodeId );
+
+ builder.addLong( keyType.hash[0] );
+ builder.addLong( keyType.hash[1] );
+ builder.addLong( keyType.shardId );
+ }
+
+
+ @Override
+ public RowKeyType fromComposite( final CompositeParser composite ) {
+
+ final Id id = ID_SER.fromComposite( composite );
+ final long[] hash = new long[] { composite.readLong(), composite.readLong() };
+ final long shard = composite.readLong();
+
+ return new RowKeyType( id, hash, shard );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
new file mode 100644
index 0000000..1da85e1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
@@ -0,0 +1,112 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardRowIterator<R, C, T> implements Iterator<T> {
+
+ private final EdgeSearcher<R, C, T> searcher;
+
+ private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+ private Iterator<T> currentColumnIterator;
+
+ private final Keyspace keyspace;
+
+ private final int pageSize;
+
+ private final ConsistencyLevel consistencyLevel;
+
+
+ public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
+ final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+ final ConsistencyLevel consistencyLevel, final int pageSize ) {
+ this.searcher = searcher;
+ this.cf = cf;
+ this.keyspace = keyspace;
+ this.pageSize = pageSize;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ //we have more columns to return
+ if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
+ return true;
+ }
+
+ /**
+ * We have another row key, advance to it and re-check
+ */
+ if ( searcher.hasNext() ) {
+ advanceRow();
+ return hasNext();
+ }
+
+ //we have no more columns, and no more row keys, we're done
+ return false;
+ }
+
+
+ @Override
+ public T next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+ }
+
+ return currentColumnIterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+ */
+ private void advanceRow() {
+
+ /**
+ * If the edge is present, we need to being seeking from this
+ */
+
+ final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+ //set the range into the search
+ searcher.setRange( rangeBuilder );
+
+ final ScopedRowKey<ApplicationScope, R> rowKey = searcher.next();
+
+
+ RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+ currentColumnIterator = new ColumnNameIterator<C, T>( query, searcher, searcher.hasPage() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..a0d1e6e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,568 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+ protected final Keyspace keyspace;
+ protected final CassandraConfig cassandraConfig;
+ protected final GraphFig graphFig;
+ protected final EdgeShardStrategy writeEdgeShardStrategy;
+
+
+ @Inject
+ public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+ final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy ) {
+
+ checkNotNull( "keyspace required", keyspace );
+ checkNotNull( "cassandraConfig required", cassandraConfig );
+ checkNotNull( "consistencyFig required", graphFig );
+ checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+
+
+ this.keyspace = keyspace;
+ this.cassandraConfig = cassandraConfig;
+ this.graphFig = graphFig;
+ this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+ }
+
+
+ @Override
+ public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( timestamp.timestamp() );
+
+ final boolean isDeleted = markedEdge.isDeleted();
+
+
+ doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+ @Override
+ public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final RowKey rowKey, final DirectedEdge edge ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
+ }
+
+
+ @Override
+ public void countEdge( final Id rowId, final NodeType nodeType, final long shardId,
+ final String... types ) {
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, rowId, nodeType, shardId, 1l, types );
+ }
+ }
+
+
+ @Override
+ public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final EdgeRowKey rowKey, final long timestamp ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+ }
+ } );
+
+
+ return batch;
+ }
+
+
+ @Override
+ public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( timestamp.timestamp() );
+
+
+ doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
+ @Override
+ public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final RowKey rowKey, final DirectedEdge edge ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ }
+
+
+ @Override
+ public void countEdge( final Id rowId, final NodeType nodeType, final long shardId,
+ final String... types ) {
+ writeEdgeShardStrategy.increment( scope, rowId, nodeType, shardId, -1, types );
+ }
+
+
+ @Override
+ public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final EdgeRowKey rowKey, final long timestamp ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+ }
+ } );
+
+
+ return batch;
+ }
+
+
+ /**
+ * EdgeWrite the edges internally
+ *
+ * @param scope The scope to encapsulate
+ * @param edge The edge to write
+ * @param op The row operation to invoke
+ */
+ private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
+ final RowOp op ) {
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateEdge( edge );
+
+ final Id sourceNodeId = edge.getSourceNode();
+ final String sourceNodeType = sourceNodeId.getType();
+ final Id targetNodeId = edge.getTargetNode();
+ final String targetNodeType = targetNodeId.getType();
+ final long timestamp = edge.getTimestamp();
+ final String type = edge.getType();
+
+
+ /**
+ * Key in the serializers based on the edge
+ */
+
+
+ /**
+ * write edges from source->target
+ */
+
+
+ final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+
+ final ShardEntries sourceRowKeyShard =
+ writeEdgeShardStrategy.getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
+ columnFamilies.getSourceNodeCfName();
+
+
+ for ( Shard shard : sourceRowKeyShard.getEntries() ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+ op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type );
+ }
+
+
+ final ShardEntries sourceWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, sourceNodeId, NodeType.SOURCE, timestamp, type, targetNodeType );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+
+ for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
+
+ final long shardId = shard.getShardIndex();
+ final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+
+ op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
+ op.countEdge( sourceNodeId, NodeType.SOURCE, shardId, type, targetNodeType );
+ }
+
+
+ /**
+ * write edges from target<-source
+ */
+
+ final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+
+ final ShardEntries targetRowKeyShard =
+ writeEdgeShardStrategy.getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
+ columnFamilies.getTargetNodeCfName();
+
+ for ( Shard shard : targetRowKeyShard.getEntries() ) {
+ final long shardId = shard.getShardIndex();
+ final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+ op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type );
+ }
+
+
+ final ShardEntries targetWithTypeRowKeyShard = writeEdgeShardStrategy
+ .getWriteShards( scope, targetNodeId, NodeType.TARGET, timestamp, type, sourceNodeType );
+
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+
+
+ for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
+
+ final long shardId = shard.getShardIndex();
+
+ final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+
+
+ op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
+ op.countEdge( targetNodeId, NodeType.TARGET, shardId, type, sourceNodeType );
+ }
+
+ /**
+ * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
+ */
+ final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+
+
+ /**
+ * Write this in the timestamp log for this edge of source->target
+ */
+ op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdge search, final Iterator<ShardEntries> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateSearchByEdge( search );
+
+ final Id targetId = search.targetNode();
+ final Id sourceId = search.sourceNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily =
+ columnFamilies.getGraphEdgeVersions();
+ final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+ final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+ new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, maxTimestamp, search.last(), shards ) {
+
+ @Override
+ protected Serializer<Long> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ public void setRange( final RangeBuilder builder ) {
+
+
+ if ( last.isPresent() ) {
+ super.setRange( builder );
+ return;
+ }
+
+ //start seeking at a value < our max version
+ builder.setStart( maxTimestamp );
+ }
+
+
+ @Override
+ protected EdgeRowKey generateRowKey( long shard ) {
+ return new EdgeRowKey( sourceId, type, targetId, shard );
+ }
+
+
+ @Override
+ protected Long getStartColumn( final Edge last ) {
+ return last.getTimestamp();
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+ }
+ };
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final SearchByEdgeType edgeType,
+ final Iterator<ShardEntries> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateSearchByEdgeType( edgeType );
+
+ final Id sourceId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( sourceId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+ }
+ };
+
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType edgeType,
+ final Iterator<ShardEntries> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final String targetType = edgeType.getIdType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( long shard ) {
+ return new RowKeyType( targetId, type, targetType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+ }
+ };
+
+ return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdgeType edgeType,
+ final Iterator<ShardEntries> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( targetId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType edgeType,
+ final Iterator<ShardEntries> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ EdgeUtils.validateSearchByEdgeType( edgeType );
+
+ final Id targetId = edgeType.getNode();
+ final String sourceType = edgeType.getIdType();
+ final String type = edgeType.getType();
+ final long maxTimestamp = edgeType.getMaxTimestamp();
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( final long shard ) {
+ return new RowKeyType( targetId, type, sourceType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge getStartColumn( final Edge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+ return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ /**
+ * Simple callback to perform puts and deletes with a common row setup code
+ *
+ * @param <R> The row key type
+ */
+ private static interface RowOp<R> {
+
+ /**
+ * Write the edge with the given data
+ */
+ void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, R rowKey,
+ DirectedEdge edge );
+
+ /**
+ * Perform the count on the edge
+ */
+ void countEdge( final Id rowId, NodeType type, long shardId, String... types );
+
+ /**
+ * Write the edge into the version cf
+ */
+ void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ EdgeRowKey rowKey, long timestamp );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
new file mode 100644
index 0000000..9050b0a
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeColumnFamilies.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.DynamicCompositeType;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+
+import com.netflix.astyanax.serializers.LongSerializer;
+
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.LONG_TYPE_REVERSED;
+import static org.apache.usergrid.persistence.core.astyanax.ColumnTypes.UUID_TYPE_REVERSED;
+
+
+/**
+ * Implementation of size based column family
+ */
+public class SizebasedEdgeColumnFamilies implements EdgeColumnFamilies {
+
+
+ //Row key with no type
+ private static final RowSerializer ROW_SERIALIZER = new RowSerializer();
+
+ //row key with target id type
+ private static final RowTypeSerializer ROW_TYPE_SERIALIZER = new RowTypeSerializer();
+
+ private static final EdgeRowKeySerializer EDGE_ROW_KEY_SERIALIZER = new EdgeRowKeySerializer();
+
+ //Edge serializers
+ private static final EdgeSerializer EDGE_SERIALIZER = new EdgeSerializer();
+
+ private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+
+ private static final String EDGE_DYNAMIC_COMPOSITE_TYPE =
+ //we purposefully associate lower case "l" and "u" with reversed types. This way we can use
+ //the default serialization in Astayanax, but get reverse order in cassandra
+ DynamicCompositeType.class.getSimpleName() + "(s=>UTF8Type,l=>" + LONG_TYPE_REVERSED + ",u=>"
+ + UUID_TYPE_REVERSED + ")";
+
+
+ //initialize the CF's from our implementation
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> SOURCE_NODE_EDGES =
+ new MultiTennantColumnFamily<>( "Graph_Source_Node_Edges",
+ new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> TARGET_NODE_EDGES =
+ new MultiTennantColumnFamily<>( "Graph_Target_Node_Edges",
+ new OrganizationScopedRowKeySerializer<>( ROW_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> SOURCE_NODE_TARGET_TYPE =
+ new MultiTennantColumnFamily<>( "Graph_Source_Node_Target_Type",
+ new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ /**
+ * The edges that are to the target node with the source type. The target node is the row key
+ */
+ private static final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> TARGET_NODE_SOURCE_TYPE =
+ new MultiTennantColumnFamily<>( "Graph_Target_Node_Source_Type",
+ new OrganizationScopedRowKeySerializer<>( ROW_TYPE_SERIALIZER ), EDGE_SERIALIZER );
+
+
+ private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_VERSIONS =
+ new MultiTennantColumnFamily<>( "Graph_Edge_Versions",
+ new OrganizationScopedRowKeySerializer<>( EDGE_ROW_KEY_SERIALIZER ), LONG_SERIALIZER );
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getSourceNodeCfName() {
+ return SOURCE_NODE_EDGES;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getTargetNodeCfName() {
+ return TARGET_NODE_EDGES;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getSourceNodeTargetTypeCfName() {
+ return SOURCE_NODE_TARGET_TYPE;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getTargetNodeSourceTypeCfName() {
+ return TARGET_NODE_SOURCE_TYPE;
+ }
+
+
+ @Override
+ public MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getGraphEdgeVersions() {
+ return EDGE_VERSIONS;
+ }
+
+
+ @Override
+ public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+ return Arrays
+ .asList( graphCf( SOURCE_NODE_EDGES ), graphCf( TARGET_NODE_EDGES ), graphCf( SOURCE_NODE_TARGET_TYPE ),
+ graphCf( TARGET_NODE_SOURCE_TYPE ),
+ new MultiTennantColumnFamilyDefinition( EDGE_VERSIONS, BytesType.class.getSimpleName(),
+ ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+ }
+
+
+ /**
+ * Helper to generate an edge definition by the type
+ */
+ private MultiTennantColumnFamilyDefinition graphCf( MultiTennantColumnFamily cf ) {
+ return new MultiTennantColumnFamilyDefinition( cf, BytesType.class.getSimpleName(), EDGE_DYNAMIC_COMPOSITE_TYPE,
+ BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.KEYS );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index 7a55c53..a71960b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -54,52 +54,23 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
@Override
- public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType, final long timestamp,
- final String... types ) {
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType,
+ final long timestamp, final String... types ) {
return shardCache.getWriteShards( scope, rowKeyId, nodeType, timestamp, types );
}
@Override
- public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType,final long maxTimestamp,
- final String... types ) {
+ public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id rowKeyId,
+ final NodeType nodeType, final long maxTimestamp,
+ final String... types ) {
return shardCache.getReadShards( scope, rowKeyId, nodeType, maxTimestamp, types );
}
@Override
- public void increment( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType, final long shardId, final long count,
- final String... types ) {
+ public void increment( final ApplicationScope scope, final Id rowKeyId, final NodeType nodeType, final long shardId,
+ final long count, final String... types ) {
shardApproximation.increment( scope, rowKeyId, nodeType, shardId, count, types );
}
-
-
- @Override
- public String getSourceNodeCfName() {
- return "Graph_Source_Node_Edges";
- }
-
-
- @Override
- public String getTargetNodeCfName() {
- return "Graph_Target_Node_Edges";
- }
-
-
- @Override
- public String getSourceNodeTargetTypeCfName() {
- return "Graph_Source_Node_Target_Type";
- }
-
-
- @Override
- public String getTargetNodeSourceTypeCfName() {
- return "Graph_Target_Node_Source_Type";
- }
-
-
- @Override
- public String getGraphEdgeVersions() {
- return "Graph_Edge_Versions";
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
new file mode 100644
index 0000000..b33fcaf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SourceEdgeSearcher.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+public class SourceEdgeSearcher {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/69a3faa2/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
index adcb42e..501cb83 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
@@ -116,6 +116,8 @@ public class GraphManagerShardingIT {
}
+
+
long shardCount = nodeShardApproximation.getCount( scope, sourceId, NodeType.SOURCE, 0l, edgeType );
assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);