You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/05 00:14:55 UTC
[02/11] git commit: WIP overwrite
WIP overwrite
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b1434ddf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b1434ddf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b1434ddf
Branch: refs/heads/USERGRID-188
Commit: b1434ddfff6bbb9b7e65a007071b258c4d7234bb
Parents: 4cfc849
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Jul 2 09:35:36 2014 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 8 14:42:39 2014 -0600
----------------------------------------------------------------------
.../impl/EdgeSerializationImpl.java | 142 ++---
.../impl/shard/EdgeShardSerialization.java | 2 +-
.../impl/shard/EdgeShardStrategy.java | 5 +-
.../impl/shard/NodeShardAllocation.java | 2 +-
.../impl/shard/NodeShardCache.java | 10 +-
.../graph/serialization/impl/shard/Shard.java | 103 ++++
.../serialization/impl/shard/ShardEntries.java | 45 ++
.../shard/impl/EdgeShardSerializationImpl.java | 13 +-
.../shard/impl/NodeShardAllocationImpl.java | 15 +-
.../impl/shard/impl/NodeShardCacheImpl.java | 39 +-
.../shard/impl/SizebasedEdgeShardStrategy.java | 11 +-
.../impl/shard/EdgeShardSerializationTest.java | 30 +-
.../impl/shard/NodeShardAllocationTest.java | 35 +-
.../impl/shard/NodeShardCacheTest.java | 516 +++++++++----------
14 files changed, 573 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 10d1048..7c630c1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -55,6 +55,8 @@ import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
import org.apache.usergrid.persistence.graph.serialization.util.EdgeHasher;
import org.apache.usergrid.persistence.graph.serialization.util.EdgeUtils;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -65,7 +67,6 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.Serializer;
-import com.netflix.astyanax.model.AbstractComposite;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.CompositeBuilder;
import com.netflix.astyanax.model.CompositeParser;
@@ -73,8 +74,6 @@ import com.netflix.astyanax.model.DynamicComposite;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.AbstractSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
-import com.netflix.astyanax.serializers.StringSerializer;
-import com.netflix.astyanax.serializers.UUIDSerializer;
import com.netflix.astyanax.util.RangeBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -213,7 +212,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
public void countEdge( final Id rowId, final long shardId, final String... types ) {
- if(!isDeleted){
+ if ( !isDeleted ) {
edgeShardStrategy.increment( scope, rowId, shardId, 1l, types );
}
}
@@ -291,62 +290,82 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
* Key in the serializers based on the edge
*/
- final long sourceRowKeyShard = edgeShardStrategy.getWriteShard( scope, sourceNodeId, timestamp, type );
- final RowKey sourceRowKey = new RowKey( sourceNodeId, type, sourceRowKeyShard);
+ /**
+ * write edges from source->target
+ */
- final long sourceWithTypeRowKeyShard = edgeShardStrategy.getWriteShard( scope, sourceNodeId, timestamp, type, targetNodeType );
+ final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
- final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, sourceWithTypeRowKeyShard );
+ final ShardEntries sourceRowKeyShard = edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type );
- final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+ for ( Shard shard : sourceRowKeyShard.getEntries() ) {
+ final long shardId = shard.getShardIndex();
+ final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
+ op.writeEdge( sourceNodeEdgesCf, sourceRowKey, sourceEdge );
+ op.countEdge( sourceNodeId, shardId, type );
+ }
- final long targetRowKeyShard = edgeShardStrategy.getWriteShard( scope, targetNodeId, timestamp, type );
- final RowKey targetRowKey = new RowKey( targetNodeId, type, targetRowKeyShard);
- final long targetWithTypeRowKeyShard = edgeShardStrategy.getWriteShard( scope, targetNodeId, timestamp, type, souceNodeType );
- final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, targetWithTypeRowKeyShard );
+ final ShardEntries sourceWithTypeRowKeyShard =
+ edgeShardStrategy.getWriteShards( scope, sourceNodeId, timestamp, type, targetNodeType );
- final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+ for ( Shard shard : sourceWithTypeRowKeyShard.getEntries() ) {
+ final long shardId = shard.getShardIndex();
+ final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
- final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, edgeShardStrategy
- .getWriteShard( scope, sourceNodeId, timestamp, type, targetNodeId.getUuid().toString(),
- targetNodeId.getType() ) );
+ op.writeEdge( sourceNodeTargetTypeCf, sourceRowKeyType, sourceEdge );
+ op.countEdge( sourceNodeId, shardId, type, targetNodeType );
+ }
/**
- * write edges from source->target
+ * write edges from target<-source
*/
- op.writeEdge( sourceNodeEdgesCf, sourceRowKey, sourceEdge );
- op.countEdge( sourceNodeId, sourceRowKeyShard, type );
+ final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+
+
+ final ShardEntries targetRowKeyShard = edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type );
+
+ for ( Shard shard : targetRowKeyShard.getEntries() ) {
+ final long shardId = shard.getShardIndex();
+ final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+
+ op.writeEdge( targetNodeEdgesCf, targetRowKey, targetEdge );
+ op.countEdge( targetNodeId, shardId, type );
+ }
+
+
+ final ShardEntries targetWithTypeRowKeyShard =
+ edgeShardStrategy.getWriteShards( scope, targetNodeId, timestamp, type, souceNodeType );
+
- op.writeEdge( sourceNodeTargetTypeCf, sourceRowKeyType, sourceEdge );
- op.countEdge( sourceNodeId, sourceWithTypeRowKeyShard, type, targetNodeType );
+ for ( Shard shard : targetWithTypeRowKeyShard.getEntries() ) {
+ final long shardId = shard.getShardIndex();
+ final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+ op.writeEdge( targetNodeSourceTypeCf, targetRowKeyType, targetEdge );
+ op.countEdge( targetNodeId, shardId, type, souceNodeType );
+ }
+
/**
- * write edges from target<-source
+ * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
*/
- op.writeEdge( targetNodeEdgesCf, targetRowKey, targetEdge );
- op.countEdge( targetNodeId, targetRowKeyShard, type );
-
- op.writeEdge( targetNodeSourceTypeCf, targetRowKeyType, targetEdge );
- op.countEdge( targetNodeId, targetWithTypeRowKeyShard, type, souceNodeType );
+ final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l);
/**
* Write this in the timestamp log for this edge of source->target
*/
op.writeVersion( graphEdgeVersionsCf, edgeRowKey, timestamp );
-
-
}
@@ -622,10 +641,10 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
*/
private static class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
- private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
-// private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
-// private static final StringSerializer STRING_SERIALIZER = StringSerializer.get().getString(;
-// )
+ private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
+ // private static final UUIDSerializer UUID_SERIALIZER = UUIDSerializer.get();
+ // private static final StringSerializer STRING_SERIALIZER = StringSerializer.get().getString(;
+ // )
@Override
@@ -633,20 +652,20 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
DynamicComposite composite = new DynamicComposite();
-// //add our edge
-// composite.addComponent( edge.timestamp, LONG_SERIALIZER, LONG_TYPE_REVERSED,
-// AbstractComposite.ComponentEquality.EQUAL );
+ // //add our edge
+ // composite.addComponent( edge.timestamp, LONG_SERIALIZER, LONG_TYPE_REVERSED,
+ // AbstractComposite.ComponentEquality.EQUAL );
-// //we do this explicity instead of re-using the id composite serializer b/c we want high order
-// //time uuids first, not second. In this column family, there is no sort
-// composite.addComponent( edge.id.getUuid(), UUID_SERIALIZER, UUID_TYPE_REVERSED,
-// AbstractComposite.ComponentEquality.EQUAL );
-//
-// composite.addComponent( edge.id.getType(), STRING_SERIALIZER );
+ // //we do this explicity instead of re-using the id composite serializer b/c we want high order
+ // //time uuids first, not second. In this column family, there is no sort
+ // composite.addComponent( edge.id.getUuid(), UUID_SERIALIZER, UUID_TYPE_REVERSED,
+ // AbstractComposite.ComponentEquality.EQUAL );
+ //
+ // composite.addComponent( edge.id.getType(), STRING_SERIALIZER );
composite.addComponent( edge.timestamp, LONG_SERIALIZER );
- ID_COL_SERIALIZER.toComposite( composite, edge.id);
+ ID_COL_SERIALIZER.toComposite( composite, edge.id );
return composite.serialize();
}
@@ -678,24 +697,24 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
private static class RowKey {
public final Id nodeId;
public final long[] hash;
- public final long shard;
+ public final long shardId;
/**
* Create a row key with the node and the edgeType
*/
- public RowKey( Id nodeId, String edgeType, final long shard ) {
- this( nodeId, EdgeHasher.createEdgeHash( edgeType ), shard );
+ public RowKey( Id nodeId, String edgeType, final long shardId ) {
+ this( nodeId, EdgeHasher.createEdgeHash( edgeType ), shardId );
}
/**
* Create a new row key with the hash, should only be used in deserialization or internal callers.
*/
- protected RowKey( Id nodeId, long[] hash, final long shard ) {
+ protected RowKey( Id nodeId, long[] hash, final long shardId ) {
this.nodeId = nodeId;
this.hash = hash;
- this.shard = shard;
+ this.shardId = shardId;
}
}
@@ -712,24 +731,24 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
* @param edgeType The type of the edge
* @param typeId The type of hte id
*/
- public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shard ) {
- this( nodeId, edgeType, typeId.getType(), shard );
+ public RowKeyType( final Id nodeId, final String edgeType, final Id typeId, final long shardId ) {
+ this( nodeId, edgeType, typeId.getType(), shardId );
}
/**
* Create a row key with the node id in the row key, the edge type, adn the target type from the id
*/
- public RowKeyType( final Id nodeId, final String edgeType, final String targetType, final long shard ) {
- super( nodeId, EdgeHasher.createEdgeHash( edgeType, targetType ), shard );
+ public RowKeyType( final Id nodeId, final String edgeType, final String targetType, final long shardId ) {
+ super( nodeId, EdgeHasher.createEdgeHash( edgeType, targetType ), shardId );
}
/**
* Internal use in de-serializing. Should only be used in this case or by internal callers
*/
- private RowKeyType( final Id nodeId, final long[] hash, final long shard ) {
- super( nodeId, hash, shard );
+ private RowKeyType( final Id nodeId, final long[] hash, final long shardId ) {
+ super( nodeId, hash, shardId );
}
}
@@ -767,11 +786,11 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
protected final Optional<Edge> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
- protected final Iterator<Long> shards;
+ protected final Iterator<ShardEntries> shards;
protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Iterator<Long> shards ) {
+ final Iterator<ShardEntries> shards ) {
this.scope = scope;
this.maxTimestamp = maxTimestamp;
this.last = last;
@@ -787,7 +806,10 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
@Override
public ScopedRowKey<ApplicationScope, R> next() {
- return ScopedRowKey.fromKey( scope, generateRowKey( shards.next() ) );
+ /**
+ * Todo, multi scan
+ */
+ return ScopedRowKey.fromKey( scope, generateRowKey( shards.next().getEntries().iterator().next().getShardIndex() ) );
}
@@ -875,7 +897,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
builder.addLong( key.hash[0] );
builder.addLong( key.hash[1] );
- builder.addLong( key.shard );
+ builder.addLong( key.shardId );
}
@@ -908,7 +930,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
builder.addLong( keyType.hash[0] );
builder.addLong( keyType.hash[1] );
- builder.addLong( keyType.shard );
+ builder.addLong( keyType.shardId );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index d49cfdf..480b71d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -52,7 +52,7 @@ public interface EdgeShardSerialization extends Migration{
* @param types The types to use
* @return
*/
- public Iterator<Long> getEdgeMetaData(ApplicationScope scope, Id nodeId, Optional<Long> start, String... types);
+ public Iterator<Shard> getEdgeMetaData(ApplicationScope scope, Id nodeId, Optional<Shard> start, String... types);
/**
* Remove the shard from the edge meta data from the types.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 09436ac..22c9470 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -37,7 +37,8 @@ public interface EdgeShardStrategy {
* @param timestamp The timestamp on the edge
* @param types The types in the edge
*/
- public long getWriteShard(final ApplicationScope scope, final Id rowKeyId, final long timestamp, final String... types );
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId, final long timestamp,
+ final String... types );
/**
@@ -48,7 +49,7 @@ public interface EdgeShardStrategy {
* @param maxTimestamp The max timestamp to use
* @param types the types in the edge
*/
- public Iterator<Long> getReadShards(final ApplicationScope scope,final Id rowKeyId, final long maxTimestamp,final String... types );
+ public Iterator<ShardEntries> getReadShards(final ApplicationScope scope,final Id rowKeyId, final long maxTimestamp,final String... types );
/**
* Increment our count meta data by the passed value. Can be a positive or a negative number.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 1097ced..6ecadbb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -44,7 +44,7 @@ public interface NodeShardAllocation {
* @param edgeTypes
* @return A list of all shards <= the current shard. This will always return 0l if no shards are allocated
*/
- public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId,
+ public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, Optional<Shard> maxShardId,
final String... edgeTypes );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 667fdbf..3e1675a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -35,21 +35,23 @@ public interface NodeShardCache {
/**
- * Get the time meta data for the given node
+ * Get the shard for the given timestamp
* @param nodeId
* @param timestamp The time to select the slice for.
* @param edgeType
*/
- public long getSlice(final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType);
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final long timestamp,
+ final String... edgeType );
/**
- * Get an iterator of all versions <= the version
+ * Get an iterator of all versions <= the version for iterating shard entry sets
* @param scope
* @param nodeId
* @param maxTimestamp The highest timestamp
* @param edgeType
* @return
*/
- public Iterator<Long> getVersions(final ApplicationScope scope, final Id nodeId, final long maxTimestamp, final String... edgeType);
+ public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final long maxTimestamp,
+ final String... edgeType );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
new file mode 100644
index 0000000..d0c3e01
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+public class Shard implements Comparable<Shard> {
+
+ private final long shardIndex;
+ private final long createdTime;
+
+
+ public Shard( final long shardIndex, final long createdTime ) {
+ this.shardIndex = shardIndex;
+ this.createdTime = createdTime;
+ }
+
+
+ /**
+ * Get the long shard index
+ */
+ public long getShardIndex() {
+ return shardIndex;
+ }
+
+
+ /**
+ * Get the timestamp in epoch millis this shard was created
+ */
+ public long getCreatedTime() {
+ return createdTime;
+ }
+
+
+ @Override
+ public int compareTo( final Shard o ) {
+ if ( o == null ) {
+ return 1;
+ }
+
+ if ( shardIndex > o.shardIndex ) {
+ return 1;
+ }
+
+ else if ( shardIndex == o.shardIndex ) {
+ if ( createdTime > o.createdTime ) {
+ return 1;
+ }
+ else if ( createdTime < o.createdTime ) {
+ return -1;
+ }
+
+ return 0;
+ }
+
+ return -1;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof Shard ) ) {
+ return false;
+ }
+
+ final Shard shard = ( Shard ) o;
+
+ if ( createdTime != shard.createdTime ) {
+ return false;
+ }
+ if ( shardIndex != shard.shardIndex ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = ( int ) ( shardIndex ^ ( shardIndex >>> 32 ) );
+ result = 31 * result + ( int ) ( createdTime ^ ( createdTime >>> 32 ) );
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntries.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntries.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntries.java
new file mode 100644
index 0000000..6ec5e20
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntries.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Set;
+
+
+/**
+ * There are cases where we need to read or write to more than 1 shard.
+ */
+public class ShardEntries {
+
+
+ private Set<Shard> shards;
+
+
+ public ShardEntries( Set<Shard> shards ) {
+ this.shards = shards;
+ }
+
+
+ public Set<Shard> getEntries() {
+ return shards;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index a08ec3b..63075b3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySer
import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Optional;
@@ -65,7 +66,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
private static final byte HOLDER = 0x00;
- private static final LongColumnParser COLUMN_PARSER = new LongColumnParser();
+ private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
protected final Keyspace keyspace;
@@ -105,7 +106,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
- public Iterator<Long> getEdgeMetaData( final ApplicationScope scope, final Id nodeId, final Optional<Long> start,
+ public Iterator<Shard> getEdgeMetaData( final ApplicationScope scope, final Id nodeId, final Optional<Shard> start,
final String... types ) {
/**
* If the edge is present, we need to being seeking from this
@@ -114,7 +115,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( graphFig.getScanPageSize() );
if ( start.isPresent() ) {
- rangeBuilder.setStart( start.get() );
+ rangeBuilder.setStart( start.get().getShardIndex() );
}
final EdgeRowKey key = new EdgeRowKey( nodeId, types );
@@ -163,11 +164,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
}
- private static class LongColumnParser implements ColumnParser<Long, Long> {
+ private static class ShardColumnParser implements ColumnParser<Long,Shard> {
@Override
- public Long parseColumn( final Column<Long> column ) {
- return column.getName();
+ public Shard parseColumn( final Column<Long> column ) {
+ return new Shard(column.getName(), column.getTimestamp());
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index cf70669..5cece93 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -69,12 +70,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<Long> getShards( final ApplicationScope scope, final Id nodeId, Optional<Long> maxShardId, final String... edgeTypes ) {
+ public Iterator<Shard> getShards( final ApplicationScope scope, final Id nodeId, Optional<Shard> maxShardId, final String... edgeTypes ) {
- final Iterator<Long> existingShards =
+ final Iterator<Shard> existingShards =
edgeShardSerialization.getEdgeMetaData( scope, nodeId, maxShardId, edgeTypes );
- final PushbackIterator<Long> pushbackIterator = new PushbackIterator( existingShards );
+ final PushbackIterator<Shard> pushbackIterator = new PushbackIterator( existingShards );
//
//
// final long now = timeService.getCurrentTime();
@@ -132,7 +133,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* Nothing to iterate, return an iterator with 0.
*/
if(!pushbackIterator.hasNext()){
- pushbackIterator.pushback( 0l );
+ pushbackIterator.pushback( new Shard(0l, 0l) );
}
return pushbackIterator;
@@ -142,7 +143,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
public boolean auditMaxShard( final ApplicationScope scope, final Id nodeId, final String... edgeType ) {
- final Iterator<Long> maxShards = getShards( scope, nodeId, Optional.<Long>absent(), edgeType );
+ final Iterator<Shard> maxShards = getShards( scope, nodeId, Optional.<Shard>absent(), edgeType );
//if the first shard has already been allocated, do nothing.
@@ -152,14 +153,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return false;
}
- final long maxShard = maxShards.next();
+ final Shard maxShard = maxShards.next();
/**
* Check out if we have a count for our shard allocation
*/
- final long count = nodeShardApproximation.getCount( scope, nodeId, maxShard, edgeType );
+ final long count = nodeShardApproximation.getCount( scope, nodeId, maxShard.getShardIndex(), edgeType );
if ( count < graphFig.getShardSize() ) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 3b78898..eeefb3a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -34,6 +34,8 @@ import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
import org.apache.usergrid.persistence.graph.serialization.util.IterableUtil;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -93,7 +95,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public long getSlice( final ApplicationScope scope, final Id nodeId, final long timestamp, final String... edgeType ) {
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id nodeId, final long timestamp,
+ final String... edgeType ) {
final CacheKey key = new CacheKey( scope, nodeId, edgeType );
@@ -106,7 +109,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
throw new GraphRuntimeException( "Unable to load shard key for graph", e );
}
- final Long shardId = entry.getShardId( timestamp );
+ final ShardEntries shardId = entry.getShardId( timestamp );
if ( shardId != null ) {
return shardId;
@@ -118,8 +121,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public Iterator<Long> getVersions( final ApplicationScope scope, final Id nodeId, final long maxTimestamp,
- final String... edgeType ) {
+ public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id nodeId, final long maxTimestamp,
+ final String... edgeType ) {
final CacheKey key = new CacheKey( scope, nodeId, edgeType );
CacheEntry entry;
@@ -130,10 +133,10 @@ public class NodeShardCacheImpl implements NodeShardCache {
throw new GraphRuntimeException( "Unable to load shard key for graph", e );
}
- Iterator<Long> iterator = entry.getShards( maxTimestamp );
+ Iterator<ShardEntries> iterator = entry.getShards( maxTimestamp );
if(iterator == null){
- return Collections.<Long>emptyList().iterator();
+ return Collections.<ShardEntries>emptyList().iterator();
}
return iterator;
@@ -161,8 +164,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
// //TODO, we need to put some sort of upper bounds on this, it could possibly get too large
- final Iterator<Long> edges = nodeShardAllocation
- .getShards( key.scope, key.id, Optional.<Long>absent(), key.types );
+ final Iterator<Shard> edges = nodeShardAllocation
+ .getShards( key.scope, key.id, Optional.<Shard>absent(), key.types );
return new CacheEntry( edges );
}
@@ -224,13 +227,13 @@ public class NodeShardCacheImpl implements NodeShardCache {
/**
* Get the list of all segments
*/
- private TreeSet<Long> shards;
+ private TreeSet<Shard> shards;
- private CacheEntry( final Iterator<Long> shards ) {
+ private CacheEntry( final Iterator<Shard> shards ) {
this.shards = new TreeSet<>( );
- for ( Long shard : IterableUtil.wrap( shards ) ) {
+ for ( Shard shard : IterableUtil.wrap( shards ) ) {
this.shards.add( shard );
}
}
@@ -239,8 +242,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
/**
* Get the shard's UUID for the uuid we're attempting to seek from
*/
- public Long getShardId( final Long seek ) {
- return this.shards.floor( seek );
+ public ShardEntries getShardId( final Long seek ) {
+ return bootstrapEntry();
+// return this.shards.floor( seek );
}
@@ -248,8 +252,13 @@ public class NodeShardCacheImpl implements NodeShardCache {
* Get all shards <= this one in decending order
* @return
*/
- public Iterator<Long> getShards( final Long maxShard ){
- return this.shards.headSet(maxShard, true ).descendingIterator();
+ public Iterator<ShardEntries> getShards( final Long maxShard ){
+ return Collections.singleton(bootstrapEntry() ).iterator();
+// return this.shards.headSet(maxShard, true ).descendingIterator();
+ }
+
+ private ShardEntries bootstrapEntry(){
+ return new ShardEntries( Collections.singleton( new Shard(0l, 0l) ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index f246f23..17791cb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -26,6 +26,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntries;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
@@ -52,16 +53,16 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
@Override
- public long getWriteShard( final ApplicationScope scope, final Id rowKeyId, final long timestamp,
- final String... types ) {
- return shardCache.getSlice( scope, rowKeyId, timestamp, types );
+ public ShardEntries getWriteShards( final ApplicationScope scope, final Id rowKeyId, final long timestamp,
+ final String... types ) {
+ return shardCache.getWriteShards( scope, rowKeyId, timestamp, types );
}
@Override
- public Iterator<Long> getReadShards( final ApplicationScope scope, final Id rowKeyId, final long maxTimestamp,
+ public Iterator<ShardEntries> getReadShards( final ApplicationScope scope, final Id rowKeyId, final long maxTimestamp,
final String... types ) {
- return shardCache.getVersions( scope, rowKeyId, maxTimestamp, types );
+ return shardCache.getReadShards( scope, rowKeyId, maxTimestamp, types );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 479e1bf..89d89b8 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -103,22 +103,22 @@ public class EdgeShardSerializationTest {
batch.execute();
- Iterator<Long> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Long>absent(), types );
+ Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
- assertEquals( slice3, results.next().longValue() );
+ assertEquals( slice3, results.next().getShardIndex() );
- assertEquals( slice2, results.next().longValue() );
+ assertEquals( slice2, results.next().getShardIndex() );
- assertEquals( slice1, results.next().longValue() );
+ assertEquals( slice1, results.next().getShardIndex() );
assertFalse( results.hasNext() );
//test paging and size
- results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.of( slice2 ), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.of( new Shard( slice2, 0l ) ), types );
- assertEquals( slice2, results.next().longValue() );
+ assertEquals( slice2, results.next().getShardIndex() );
- assertEquals( slice1, results.next().longValue() );
+ assertEquals( slice1, results.next().getShardIndex() );
assertFalse( results.hasNext() );
@@ -147,24 +147,24 @@ public class EdgeShardSerializationTest {
batch.execute();
- Iterator<Long> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Long>absent(), types );
+ Iterator<Shard> results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
- assertEquals( slice3, results.next().longValue() );
+ assertEquals( slice3, results.next().getShardIndex() );
- assertEquals( slice2, results.next().longValue() );
+ assertEquals( slice2, results.next().getShardIndex() );
- assertEquals( slice1, results.next().longValue() );
+ assertEquals( slice1, results.next().getShardIndex() );
assertFalse( results.hasNext() );
//test paging and size
edgeShardSerialization.removeEdgeMeta( scope, now, slice1, types ).execute();
- results = edgeShardSerialization.getEdgeMetaData( scope, now,Optional.<Long>absent(), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now,Optional.<Shard>absent(), types );
- assertEquals( slice3, results.next().longValue() );
+ assertEquals( slice3, results.next().getShardIndex() );
- assertEquals( slice2, results.next().longValue() );
+ assertEquals( slice2, results.next().getShardIndex() );
assertFalse( results.hasNext() );
@@ -173,7 +173,7 @@ public class EdgeShardSerializationTest {
edgeShardSerialization.removeEdgeMeta( scope, now, slice3, types ).execute();
- results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Long>absent(), types );
+ results = edgeShardSerialization.getEdgeMetaData( scope, now, Optional.<Shard>absent(), types );
assertFalse( results.hasNext() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 5c846f1..6824e74 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -31,8 +31,6 @@ import org.mockito.ArgumentCaptor;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -46,7 +44,6 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -109,7 +106,7 @@ public class NodeShardAllocationTest {
*/
when( edgeShardSerialization
.getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+ same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
@@ -148,7 +145,7 @@ public class NodeShardAllocationTest {
when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
- final long futureShard = timeservicetime + graphFig.getShardCacheTimeout() * 2 ;
+ final Shard futureShard = new Shard(timeservicetime + graphFig.getShardCacheTimeout() * 2, timeservicetime) ;
/**
* Mock up returning a min shard, and a future shard
@@ -199,7 +196,7 @@ public class NodeShardAllocationTest {
*/
when( edgeShardSerialization
.getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+ same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
//return a shard size < our max by 1
@@ -251,7 +248,7 @@ public class NodeShardAllocationTest {
*/
when( edgeShardSerialization
.getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList( 0l ).iterator() );
+ same( subType ) ) ).thenReturn( Arrays.asList( new Shard(0l, 0l) ).iterator() );
final long shardCount = graphFig.getShardSize();
@@ -331,14 +328,16 @@ public class NodeShardAllocationTest {
final long futureTime = timeService.getCurrentTime() + 2 * graphFig.getShardCacheTimeout();
+ final Shard minShard = new Shard(0l, 0l);
+
/**
* Simulate slow node
*/
- final long futureShard1 = futureTime - 1;
+ final Shard futureShard1 = new Shard(futureTime - 1, timeservicetime);
- final long futureShard2 = futureTime + 10000;
+ final Shard futureShard2 = new Shard(futureTime + 10000, timeservicetime);
- final long futureShard3 = futureShard2 + 10000;
+ final Shard futureShard3 = new Shard(futureShard2.getShardIndex() + 10000, timeservicetime);
final int pageSize = 100;
@@ -348,7 +347,7 @@ public class NodeShardAllocationTest {
*/
when( edgeShardSerialization
.getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, 0l).iterator() );
+ same( subType ) ) ).thenReturn( Arrays.asList(futureShard3, futureShard2, futureShard1, minShard).iterator() );
@@ -363,17 +362,17 @@ public class NodeShardAllocationTest {
.thenReturn( mock( MutationBatch.class ) );
- final Iterator<Long>
- result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
+ final Iterator<Shard>
+ result = approximation.getShards( scope, nodeId, Optional.<Shard>absent(), type, subType );
assertTrue( "Shards present", result.hasNext() );
- assertEquals("Only single next shard returned", futureShard1, result.next().longValue());
+ assertEquals("Only single next shard returned", futureShard1, result.next());
assertTrue("Shards present", result.hasNext());
- assertEquals("Previous shard present", 0l, result.next().longValue());
+ assertEquals("Previous shard present", 0l, result.next().getShardIndex());
assertFalse("No shards left", result.hasNext());
@@ -422,11 +421,11 @@ public class NodeShardAllocationTest {
*/
when( edgeShardSerialization
.getEdgeMetaData( same( scope ), same( nodeId ), any( Optional.class ), same( type ),
- same( subType ) ) ).thenReturn( Collections.<Long>emptyList().iterator() );
+ same( subType ) ) ).thenReturn( Collections.<Shard>emptyList().iterator() );
- final Iterator<Long> result = approximation.getShards( scope, nodeId, Optional.<Long>absent(), type, subType );
+ final Iterator<Shard> result = approximation.getShards( scope, nodeId, Optional.<Shard>absent(), type, subType );
- assertEquals("0 shard allocated", 0l, result.next().longValue());
+ assertEquals("0 shard allocated", 0l, result.next().getShardIndex());
assertFalse( "No shard allocated", result.hasNext() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b1434ddf/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
index 6c46c32..a8fdbc4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCacheTest.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
-import java.util.UUID;
import org.junit.Before;
import org.junit.Test;
@@ -30,7 +29,6 @@ import org.junit.Test;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
-import org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -67,263 +65,259 @@ public class NodeShardCacheTest {
when( scope.getApplication() ).thenReturn( orgId );
}
-
-
- @Test
- public void testNoShards() throws ConnectionException {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- final long newTime = 10000l;
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- final Optional max = Optional.absent();
- /**
- * Simulate returning no shards at all.
- */
- when( allocation
- .getShards( same( scope ), same( id ), same( max), same( edgeType ),
- same( otherIdType ) ) )
- .thenReturn( Collections.singletonList( 0l ).iterator() );
-
-
- long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals(0l, slice );
-
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testSingleExistingShard() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- final long newTime = 10000l;
-
- final long min = 0;
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- final Optional max = Optional.absent();
-
- /**
- * Simulate returning single shard
- */
- when( allocation.getShards( same( scope ), same( id ), same(max),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Collections.singletonList( min ).iterator() );
-
-
- long slice = cache.getSlice( scope, id, newTime, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testRangeShard() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- /**
- * Set our min mid and max
- */
- final long min = 0;
-
-
- final long mid = 10000;
-
-
- final long max = 20000;
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- /**
- * Simulate returning all shards
- */
- when( allocation.getShards( same( scope ), same( id ), any( Optional.class ),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
-
-
- //check getting equal to our min, mid and max
-
- long slice = cache.getSlice( scope, id, min, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- slice = cache.getSlice( scope, id, mid,
- edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
- slice = cache.getSlice( scope, id, max ,
- edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
-
- //now test in between
- slice = cache.getSlice( scope, id, min+1, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
- slice = cache.getSlice( scope, id, mid-1, edgeType, otherIdType );
-
-
- //we return the min UUID possible, all edges should start by writing to this edge
- assertEquals( min, slice );
-
-
- slice = cache.getSlice( scope, id, mid+1, edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
- slice = cache.getSlice( scope, id, max-1, edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( mid, slice );
-
-
- slice = cache.getSlice( scope, id, max, edgeType, otherIdType );
-
-
- //we return the mid UUID possible, all edges should start by writing to this edge
- assertEquals( max, slice );
-
- /**
- * Verify that we fired the audit
- */
- verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
- }
-
-
- @Test
- public void testRangeShardIterator() {
-
- final GraphFig graphFig = getFigMock();
-
- final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
-
- final Id id = createId( "test" );
-
- final String edgeType = "edge";
-
- final String otherIdType = "type";
-
-
- /**
- * Set our min mid and max
- */
- final long min = 1;
-
-
- final long mid = 100;
-
-
- final long max = 200;
-
-
- NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
-
-
- /**
- * Simulate returning all shards
- */
- when( allocation.getShards( same( scope ), same( id ), any(Optional.class),
- same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
-
-
- //check getting equal to our min, mid and max
-
- Iterator<Long> slice =
- cache.getVersions( scope, id, max, edgeType, otherIdType );
-
-
- assertEquals( max, slice.next().longValue() );
- assertEquals( mid, slice.next().longValue() );
- assertEquals( min, slice.next().longValue() );
-
-
- slice = cache.getVersions( scope, id, mid,
- edgeType, otherIdType );
-
- assertEquals( mid, slice.next().longValue() );
- assertEquals( min, slice.next().longValue() );
-
-
- slice = cache.getVersions( scope, id, min,
- edgeType, otherIdType );
-
- assertEquals( min, slice.next().longValue() );
-
-
- }
-
-
- private GraphFig getFigMock() {
- final GraphFig graphFig = mock( GraphFig.class );
- when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
- when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
-
- return graphFig;
- }
+//
+//
+// @Test
+// public void testNoShards() throws ConnectionException {
+//
+// final GraphFig graphFig = getFigMock();
+//
+// final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+//
+// final Id id = createId( "test" );
+//
+// final String edgeType = "edge";
+//
+// final String otherIdType = "type";
+//
+//
+// final long newTime = 10000l;
+//
+//
+// NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+//
+//
+// final Optional max = Optional.absent();
+// /**
+// * Simulate returning no shards at all.
+// */
+// when( allocation
+// .getShards( same( scope ), same( id ), same( max), same( edgeType ),
+// same( otherIdType ) ) )
+// .thenReturn( Collections.singletonList( 0l ).iterator() );
+//
+//
+// long slice = cache.getSliceShard( scope, id, newTime, edgeType, otherIdType );
+//
+//
+// //we return the min UUID possible, all edges should start by writing to this edge
+// assertEquals(0l, slice );
+//
+//
+// /**
+// * Verify that we fired the audit
+// */
+// verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+// }
+//
+//
+// @Test
+// public void testSingleExistingShard() {
+//
+// final GraphFig graphFig = getFigMock();
+//
+// final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+//
+//
+// final Id id = createId( "test" );
+//
+// final String edgeType = "edge";
+//
+// final String otherIdType = "type";
+//
+//
+// final long newTime = 10000l;
+//
+// final long min = 0;
+//
+//
+// NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+//
+//
+// final Optional max = Optional.absent();
+//
+// /**
+// * Simulate returning single shard
+// */
+// when( allocation.getShards( same( scope ), same( id ), same(max),
+// same( edgeType ), same( otherIdType ) ) ).thenReturn( Collections.singletonList( min ).iterator() );
+//
+//
+// long slice = cache.getSliceShard( scope, id, newTime, edgeType, otherIdType );
+//
+//
+// //we return the min UUID possible, all edges should start by writing to this edge
+// assertEquals( min, slice );
+//
+// /**
+// * Verify that we fired the audit
+// */
+// verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+// }
+//
+//
+// @Test
+// public void testRangeShard() {
+//
+// final GraphFig graphFig = getFigMock();
+//
+// final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+//
+// final Id id = createId( "test" );
+//
+// final String edgeType = "edge";
+//
+// final String otherIdType = "type";
+//
+//
+// /**
+// * Set our min mid and max
+// */
+// final long min = 0;
+//
+//
+// final long mid = 10000;
+//
+//
+// final long max = 20000;
+//
+//
+// NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+//
+//
+// /**
+// * Simulate returning all shards
+// */
+// when( allocation.getShards( same( scope ), same( id ), any( Optional.class ),
+// same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+//
+//
+// //check getting equal to our min, mid and max
+//
+// long slice = cache.getSliceShard( scope, id, min, edgeType, otherIdType );
+//
+//
+// //we return the min UUID possible, all edges should start by writing to this edge
+// assertEquals( min, slice );
+//
+// slice = cache.getSliceShard( scope, id, mid, edgeType, otherIdType );
+//
+//
+// //we return the mid UUID possible, all edges should start by writing to this edge
+// assertEquals( mid, slice );
+//
+// slice = cache.getSliceShard( scope, id, max, edgeType, otherIdType );
+//
+//
+// //we return the mid UUID possible, all edges should start by writing to this edge
+// assertEquals( max, slice );
+//
+// //now test in between
+// slice = cache.getSliceShard( scope, id, min + 1, edgeType, otherIdType );
+//
+//
+// //we return the min UUID possible, all edges should start by writing to this edge
+// assertEquals( min, slice );
+//
+// slice = cache.getSliceShard( scope, id, mid - 1, edgeType, otherIdType );
+//
+//
+// //we return the min UUID possible, all edges should start by writing to this edge
+// assertEquals( min, slice );
+//
+//
+// slice = cache.getSliceShard( scope, id, mid + 1, edgeType, otherIdType );
+//
+//
+// //we return the mid UUID possible, all edges should start by writing to this edge
+// assertEquals( mid, slice );
+//
+// slice = cache.getSliceShard( scope, id, max - 1, edgeType, otherIdType );
+//
+//
+// //we return the mid UUID possible, all edges should start by writing to this edge
+// assertEquals( mid, slice );
+//
+//
+// slice = cache.getSliceShard( scope, id, max, edgeType, otherIdType );
+//
+//
+// //we return the mid UUID possible, all edges should start by writing to this edge
+// assertEquals( max, slice );
+//
+// /**
+// * Verify that we fired the audit
+// */
+// verify( allocation ).auditMaxShard( scope, id, edgeType, otherIdType );
+// }
+//
+//
+// @Test
+// public void testRangeShardIterator() {
+//
+// final GraphFig graphFig = getFigMock();
+//
+// final NodeShardAllocation allocation = mock( NodeShardAllocation.class );
+//
+// final Id id = createId( "test" );
+//
+// final String edgeType = "edge";
+//
+// final String otherIdType = "type";
+//
+//
+// /**
+// * Set our min mid and max
+// */
+// final long min = 1;
+//
+//
+// final long mid = 100;
+//
+//
+// final long max = 200;
+//
+//
+// NodeShardCache cache = new NodeShardCacheImpl( allocation, graphFig );
+//
+//
+// /**
+// * Simulate returning all shards
+// */
+// when( allocation.getShards( same( scope ), same( id ), any(Optional.class),
+// same( edgeType ), same( otherIdType ) ) ).thenReturn( Arrays.asList( min, mid, max ).iterator() );
+//
+//
+// //check getting equal to our min, mid and max
+//
+// Iterator<Long> slice =
+// cache.getShards( scope, id, max, edgeType, otherIdType );
+//
+//
+// assertEquals( max, slice.next().longValue() );
+// assertEquals( mid, slice.next().longValue() );
+// assertEquals( min, slice.next().longValue() );
+//
+//
+// slice = cache.getShards( scope, id, mid, edgeType, otherIdType );
+//
+// assertEquals( mid, slice.next().longValue() );
+// assertEquals( min, slice.next().longValue() );
+//
+//
+// slice = cache.getShards( scope, id, min, edgeType, otherIdType );
+//
+// assertEquals( min, slice.next().longValue() );
+//
+//
+// }
+//
+//
+// private GraphFig getFigMock() {
+// final GraphFig graphFig = mock( GraphFig.class );
+// when( graphFig.getShardCacheSize() ).thenReturn( 1000l );
+// when( graphFig.getShardCacheTimeout() ).thenReturn( 30000l );
+//
+// return graphFig;
+// }
}