You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/24 23:46:03 UTC
[3/3] usergrid git commit: Updated tests and finished basic check +
allocate algorithm
Updated tests and finished basic check + allocate algorithm
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/9f3bf2b3
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/9f3bf2b3
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/9f3bf2b3
Branch: refs/heads/USERGRID-909
Commit: 9f3bf2b3a77777f67abb540b636b614fed851b94
Parents: 42a4eee
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Aug 24 15:44:33 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Aug 24 15:44:33 2015 -0600
----------------------------------------------------------------------
.../core/astyanax/CassandraConfig.java | 1 +
.../usergrid/persistence/graph/GraphFig.java | 24 +
.../persistence/graph/guice/GraphModule.java | 6 +-
.../impl/shard/EdgeShardSerialization.java | 16 +-
.../impl/shard/NodeShardAllocation.java | 6 -
.../impl/shard/ShardConsistency.java | 46 +
.../impl/shard/ShardEntryGroup.java | 57 +-
.../shard/ShardedEdgeSerializationImpl.java | 1008 ++++++++++++++++++
.../shard/impl/EdgeShardSerializationImpl.java | 71 +-
.../shard/impl/NodeShardAllocationImpl.java | 109 +-
.../impl/shard/impl/ShardConsistencyImpl.java | 58 +
.../impl/ShardedEdgeSerializationImpl.java | 1006 -----------------
.../graph/GraphManagerShardConsistencyIT.java | 16 +-
.../impl/shard/EdgeShardSerializationTest.java | 14 +-
.../impl/shard/NodeShardAllocationTest.java | 142 +--
.../impl/shard/NodeShardCacheTest.java | 14 +-
.../impl/shard/ShardEntryGroupTest.java | 62 +-
.../impl/shard/ShardGroupCompactionTest.java | 2 +-
.../shard/count/NodeShardApproximationTest.java | 37 +-
.../shard/impl/ShardEntryGroupIteratorTest.java | 8 +-
.../SearchRequestBuilderStrategyV2.java | 2 +-
.../index/impl/EsEntityIndexImpl.java | 2 +-
22 files changed, 1361 insertions(+), 1346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
index 817aee2..bbefe0f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/CassandraConfig.java
@@ -55,4 +55,5 @@ public interface CassandraConfig {
public int[] getShardSettings();
+
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 38506f3..d0df2eb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -54,6 +54,12 @@ public interface GraphFig extends GuicyFig {
String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+ String SHARD_WRITE_CONSISTENCY = "usergrid.graph.shard.write.consistency";
+
+ String SHARD_READ_CONSISTENCY = "usergrid.graph.shard.read.consistency";
+
+ String SHARD_AUDIT_CONSISTENCY = "usergrid.graph.shard.audit.consistency";
+
String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
@@ -103,5 +109,23 @@ public interface GraphFig extends GuicyFig {
@Default( "1000" )
@Key( COUNTER_WRITE_FLUSH_QUEUE_SIZE )
int getCounterFlushQueueSize();
+
+ @Default( "CL_EACH_QUORUM" )
+ @Key( SHARD_WRITE_CONSISTENCY )
+ String getShardWriteConsistency();
+
+ /**
+ * Get the consistency level for doing reads
+ */
+ @Default( "CL_LOCAL_QUORUM" )
+ @Key( SHARD_READ_CONSISTENCY )
+ String getShardReadConsistency();
+
+ /**
+ * Get the consistency level for performing a shard audit
+ */
+ @Default( "CL_EACH_QUORUM" )
+ @Key( SHARD_AUDIT_CONSISTENCY )
+ String getShardAuditConsistency();
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 4b628d1..1cca5b2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -61,6 +61,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
@@ -69,8 +70,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.Node
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardConsistencyImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerializationImpl;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -114,6 +116,8 @@ public abstract class GraphModule extends AbstractModule {
bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
+ bind( ShardConsistency.class).to( ShardConsistencyImpl.class);
+
/**
* Bind our strategies based on their internal annotations.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
index 93fd685..d8c561f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerialization.java
@@ -43,13 +43,25 @@ public interface EdgeShardSerialization extends Migration{
public MutationBatch writeShardMeta( ApplicationScope scope, Shard shard, DirectedEdgeMeta directedEdgeMeta );
/**
- * Get an iterator of all meta data and types. Returns a range from High to low
+ * Get an iterator of all meta data and types. Returns a range from High to low. Only reads the local region
* @param scope The organization scope
* @param start The shard time to start seeking from. Values <= this value will be returned.
* @param directedEdgeMeta The edge meta data to use
* @return
*/
- public Iterator<Shard> getShardMetaData( ApplicationScope scope, Optional<Shard> start, DirectedEdgeMeta directedEdgeMeta);
+ public Iterator<Shard> getShardMetaDataLocal( ApplicationScope scope, Optional<Shard> start,
+ DirectedEdgeMeta directedEdgeMeta );
+
+
+ /**
+ * Get an iterator of all meta data and types. Returns a range from High to low. Reads quorum of all regions
+ * @param scope The organization scope
+ * @param start The shard time to start seeking from. Values <= this value will be returned.
+ * @param directedEdgeMeta The edge meta data to use
+ * @return
+ */
+ Iterator<Shard> getShardMetaDataAudit( ApplicationScope scope, Optional<Shard> start,
+ DirectedEdgeMeta directedEdgeMeta );
/**
* Remove the shard from the edge meta data from the types.
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index cadd0db..5039b35 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -57,11 +57,5 @@ public interface NodeShardAllocation {
boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
final DirectedEdgeMeta directedEdgeMeta );
- /**
- * Get the minimum time that a created shard should be considered "new", and be used for both new writes and reads
- * @return
- */
- long getMinTime();
-
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
new file mode 100644
index 0000000..5c52af6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardConsistency.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import com.netflix.astyanax.model.ConsistencyLevel;
+
+
+/**
+ * Interface for shard consistency levels
+ */
+public interface ShardConsistency {
+
+ /**
+ * Get the consistency level for wiring new shards
+ * @return
+ */
+ ConsistencyLevel getShardWriteConsistency();
+
+ /**
+ * Get the consistency level for doing reads
+ * @return
+ */
+ ConsistencyLevel getShardReadConsistency();
+
+ /**
+ * Get the consistency level for performing a shard audit
+ * @return
+ */
+ ConsistencyLevel getShardAuditConsistency();
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 90f703d..f1bc42f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,8 +54,7 @@ public class ShardEntryGroup {
/**
* The max delta we accept in milliseconds for create time to be considered a member of this group
*/
- public ShardEntryGroup( ) {
-
+ public ShardEntryGroup() {
this.shards = new ArrayList<>();
this.maxCreatedTime = 0;
}
@@ -95,8 +93,6 @@ public class ShardEntryGroup {
}
-
-
return false;
}
@@ -129,6 +125,22 @@ public class ShardEntryGroup {
/**
+ * Get the max shard based on time indexes
+ *
+ * @return
+ */
+ public Shard getMaxShard() {
+ final int size = shards.size();
+
+ if ( size < 1 ) {
+ return null;
+ }
+
+ return shards.get( size - 1 );
+ }
+
+
+ /**
* Get the entries that we should read from.
*/
public Collection<Shard> getReadShards() {
@@ -138,20 +150,17 @@ public class ShardEntryGroup {
final Shard compactionTarget = getCompactionTarget();
-
- if(compactionTarget != null){
+ if ( compactionTarget != null ) {
LOG.debug( "Returning shards {} and {} as read shards", compactionTarget, staticShard );
return Arrays.asList( compactionTarget, staticShard );
}
LOG.debug( "Returning shards {} read shard", staticShard );
- return Collections.singleton( staticShard );
+ return Collections.singleton( staticShard );
}
-
-
/**
* Get the entries, with the max shard time being first. We write to all shards until they're migrated
*/
@@ -165,19 +174,17 @@ public class ShardEntryGroup {
final Shard compactionTarget = getCompactionTarget();
- LOG.debug( "Returning shard {} as write shard", compactionTarget);
-
- return Collections.singleton( compactionTarget );
+ LOG.debug( "Returning shard {} as write shard", compactionTarget );
+ return Collections.singleton( compactionTarget );
}
final Shard staticShard = getRootShard();
- LOG.debug( "Returning shard {} as write shard", staticShard);
+ LOG.debug( "Returning shard {} as write shard", staticShard );
return Collections.singleton( staticShard );
-
}
@@ -191,22 +198,22 @@ public class ShardEntryGroup {
/**
* Get the root shard that was created in this group
- * @return
*/
- private Shard getRootShard(){
- if(rootShard != null){
+ private Shard getRootShard() {
+ if ( rootShard != null ) {
return rootShard;
}
- final Shard rootCandidate = shards.get( shards.size() -1 );
+ final Shard rootCandidate = shards.get( shards.size() - 1 );
- if(rootCandidate.isCompacted()){
+ if ( rootCandidate.isCompacted() ) {
rootShard = rootCandidate;
}
return rootShard;
}
+
/**
* Get the shard all compactions should write to. Null indicates we cannot find a shard that could be used as a
* compaction target. Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
@@ -297,16 +304,16 @@ public class ShardEntryGroup {
return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex() != shard
- .getShardIndex() );
+ .getShardIndex() );
}
@Override
public String toString() {
return "ShardEntryGroup{" +
- "shards=" + shards +
- ", maxCreatedTime=" + maxCreatedTime +
- ", compactionTarget=" + compactionTarget +
- '}';
+ "shards=" + shards +
+ ", maxCreatedTime=" + maxCreatedTime +
+ ", compactionTarget=" + compactionTarget +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
new file mode 100644
index 0000000..3329ff1
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerializationImpl.java
@@ -0,0 +1,1008 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.SearchByIdType;
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeSearcher;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.DescendingTimestampComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators.OrderedComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+ .SourceDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators
+ .TargetDirectedEdgeDescendingComparator;
+import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Function;
+import com.google.common.base.Optional;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.Serializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * TODO: Rafactor this to use shards only, no shard groups, just collections of shards. The parent caller can aggregate
+ * the results of multiple groups together, this has an impedance mismatch in the API layer.
+ */
+@Singleton
+public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
+
+ protected final Keyspace keyspace;
+ protected final CassandraConfig cassandraConfig;
+ protected final GraphFig graphFig;
+ protected final EdgeShardStrategy writeEdgeShardStrategy;
+ protected final TimeService timeService;
+
+
+ @Inject
+ public ShardedEdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+ final GraphFig graphFig, final EdgeShardStrategy writeEdgeShardStrategy,
+ final TimeService timeService ) {
+
+
+ checkNotNull( "keyspace required", keyspace );
+ checkNotNull( "cassandraConfig required", cassandraConfig );
+ checkNotNull( "consistencyFig required", graphFig );
+ checkNotNull( "writeEdgeShardStrategy required", writeEdgeShardStrategy );
+ checkNotNull( "timeService required", timeService );
+
+
+ this.keyspace = keyspace;
+ this.cassandraConfig = cassandraConfig;
+ this.graphFig = graphFig;
+ this.writeEdgeShardStrategy = writeEdgeShardStrategy;
+ this.timeService = timeService;
+ }
+
+
+ @Override
+ public MutationBatch writeEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+ return new SourceWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch writeEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch writeEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta targetEdgeMeta, final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ return new TargetWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
+
+
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch writeEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+ .putColumn( edge, isDeleted );
+
+
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch writeEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+
+ return new EdgeVersions( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
+ final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+ final boolean isDeleted ) {
+ batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+ .putColumn( column, isDeleted );
+
+
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+ return new SourceWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+
+ batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+ .deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+ return new TargetWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+
+ batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+ .deleteColumn( edge );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public MutationBatch deleteEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+
+ return new EdgeVersions( columnFamilies, markedEdge ) {
+
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily,
+ final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+ final boolean isDeleted ) {
+ batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
+ .deleteColumn( column );
+ writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdge search, final Collection<Shard> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdge( search );
+
+ final Id targetId = search.targetNode();
+ final Id sourceId = search.sourceNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily =
+ columnFamilies.getGraphEdgeVersions();
+ final Serializer<Long> serializer = columnFamily.getColumnSerializer();
+
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( DescendingTimestampComparator.INSTANCE, search.getOrder());
+
+
+
+
+ final EdgeSearcher<EdgeRowKey, Long, MarkedEdge> searcher =
+ new EdgeSearcher<EdgeRowKey, Long, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+
+
+ @Override
+ protected Serializer<Long> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ public void buildRange( final RangeBuilder builder ) {
+
+
+ if ( last.isPresent() ) {
+ super.buildRange( builder );
+ return;
+ }
+
+ //start seeking at a value < our max version
+ builder.setStart( maxTimestamp );
+ }
+
+
+ @Override
+ protected EdgeRowKey generateRowKey( long shard ) {
+ return new EdgeRowKey( sourceId, type, targetId, shard );
+ }
+
+
+ @Override
+ protected Long createColumn( final MarkedEdge last ) {
+ return last.getTimestamp();
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final Long column, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, targetId, column.longValue(), marked );
+ }
+
+
+
+ };
+
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final SearchByEdgeType search,
+ final Collection<Shard> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( search );
+
+ final Id sourceId = search.getNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( sourceId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
+ }
+ };
+
+
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType search,
+ final Collection<Shard> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( search );
+
+ final Id targetId = search.getNode();
+ final String type = search.getType();
+ final String targetType = search.getIdType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
+ columnFamilies.getSourceNodeTargetTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( TargetDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( long shard ) {
+ return new RowKeyType( targetId, type, targetType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
+ }
+ };
+
+ return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final SearchByEdgeType search, final Collection<Shard> shards ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( search );
+
+ final Id targetId = search.getNode();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+ final EdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(),comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKey generateRowKey( long shard ) {
+ return new RowKey( targetId, type, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
+ return new DirectedEdge( last.getSourceNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+ @Override
+ public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope,
+ final SearchByIdType search,
+ final Collection<Shard> shards ) {
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateSearchByEdgeType( search );
+
+ final Id targetId = search.getNode();
+ final String sourceType = search.getIdType();
+ final String type = search.getType();
+ final long maxTimestamp = search.getMaxTimestamp();
+ final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily =
+ columnFamilies.getTargetNodeSourceTypeCfName();
+ final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
+
+ final OrderedComparator<MarkedEdge> comparator = new OrderedComparator<>( SourceDirectedEdgeDescendingComparator.INSTANCE, search.getOrder());
+
+
+ final EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
+ new EdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, shards, search.getOrder(), comparator, maxTimestamp,
+ search.last().transform( TRANSFORM ) ) {
+ @Override
+ protected Serializer<DirectedEdge> getSerializer() {
+ return serializer;
+ }
+
+
+ @Override
+ protected RowKeyType generateRowKey( final long shard ) {
+ return new RowKeyType( targetId, type, sourceType, shard );
+ }
+
+
+ @Override
+ protected DirectedEdge createColumn( final MarkedEdge last ) {
+ return new DirectedEdge( last.getTargetNode(), last.getTimestamp() );
+ }
+
+
+ @Override
+ protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
+ return new SimpleMarkedEdge( edge.id, type, targetId, edge.timestamp, marked );
+ }
+ };
+
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ graphFig.getScanPageSize() );
+ }
+
+
+
+
+
+ /**
+ * Simple callback to perform puts and deletes with a common row setup code
+ *
+ * @param <R> The row key type
+ * @param <C> The column type
+ */
+ private abstract class RowOp<R, C> {
+
+
+ /**
+ * Return the column family used for the write
+ */
+ protected abstract MultiTennantColumnFamily<ScopedRowKey<R>, C> getColumnFamily();
+
+ /**
+ * Get the row key
+ */
+ public abstract R getRowKey( final Shard shard );
+
+ /**
+ * Get the column family value
+ */
+ protected abstract C getDirectedEdge();
+
+ /**
+ * Get the flag on if it's deleted
+ */
+ protected abstract boolean isDeleted();
+
+
+ /**
+ * Write the edge with the given data
+ */
+ abstract void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily,
+ final ApplicationScope scope, final R rowKey, final C column, final Shard shard,
+ final boolean isDeleted );
+
+
+ /**
+ * Create a mutation batch
+ */
+ public MutationBatch createBatch( final ApplicationScope scope, final Collection<Shard> shards,
+ final UUID opTimestamp ) {
+
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( opTimestamp.timestamp() );
+
+
+ final C column = getDirectedEdge();
+ final MultiTennantColumnFamily<ScopedRowKey<R>, C> columnFamily = getColumnFamily();
+ final boolean isDeleted = isDeleted();
+
+
+ for ( Shard shard : shards ) {
+ final R rowKey = getRowKey( shard );
+ writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+ }
+
+
+ return batch;
+ }
+ }
+
+
+ /**
+ * Perform a write of the source->target
+ */
+ private abstract class SourceWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
+ private final Id sourceNodeId;
+
+
+ private final String type;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
+
+ this.sourceNodeId = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKey getRowKey( final Shard shard ) {
+ return new RowKey( sourceNodeId, type, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the source->target with target type
+ */
+ private abstract class SourceTargetTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
+ private final Id sourceNodeId;
+ private final String type;
+ private Id targetId;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+ this.sourceNodeId = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.targetId = markedEdge.getTargetNode();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKeyType getRowKey( final Shard shard ) {
+ return new RowKeyType( sourceNodeId, type, targetId, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the target <-- source
+ */
+ private abstract class TargetWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> columnFamily;
+ private final Id targetNode;
+
+
+ private final String type;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
+
+ this.targetNode = markedEdge.getTargetNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ScopedRowKey<RowKey>, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKey getRowKey( final Shard shard ) {
+ return new RowKey( targetNode, type, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the target<--source with source type
+ */
+ private abstract class TargetSourceTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> columnFamily;
+ private final Id targetNode;
+
+ private final Id sourceNode;
+
+ final String type;
+
+ final boolean isDeleted;
+ final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+ this.targetNode = markedEdge.getTargetNode();
+ this.sourceNode = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ScopedRowKey<RowKeyType>, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKeyType getRowKey( final Shard shard ) {
+ return new RowKeyType( targetNode, type, sourceNode, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the edge versions
+ */
+ private abstract class EdgeVersions extends RowOp<EdgeRowKey, Long> {
+
+ private final MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> columnFamily;
+ private final Id targetNode;
+
+ private final Id sourceNode;
+
+ final String type;
+
+ final boolean isDeleted;
+ final Long edgeVersion;
+
+
+ /**
+ * Write the source write operation
+ */
+ private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
+
+ this.targetNode = markedEdge.getTargetNode();
+ this.sourceNode = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.edgeVersion = markedEdge.getTimestamp();
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ScopedRowKey<EdgeRowKey>, Long> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public EdgeRowKey getRowKey( final Shard shard ) {
+ return new EdgeRowKey( sourceNode, type, targetNode, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected Long getDirectedEdge() {
+ return edgeVersion;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+
+
+
+
+ private static final Function<Edge, MarkedEdge> TRANSFORM = new Function<Edge, MarkedEdge>() {
+ @Nullable
+ @Override
+ public MarkedEdge apply( @Nullable final Edge input ) {
+
+ if ( input == null ) {
+ return null;
+ }
+
+ if ( input instanceof MarkedEdge ) {
+ return ( MarkedEdge ) input;
+ }
+
+ return new SimpleMarkedEdge( input.getSourceNode(), input.getType(), input.getTargetNode(),
+ input.getTimestamp(), false );
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index f107307..8ccf809 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -32,14 +32,15 @@ import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
@@ -50,6 +51,7 @@ import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ConsistencyLevel;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.util.RangeBuilder;
@@ -62,8 +64,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
* Edge shards
*/
private static final MultiTennantColumnFamily<ScopedRowKey<DirectedEdgeMeta>, Long> EDGE_SHARDS =
- new MultiTennantColumnFamily<>( "Edge_Shards",
- new ScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
+ new MultiTennantColumnFamily<>( "Edge_Shards",
+ new ScopedRowKeySerializer<>( EdgeShardRowKeySerializer.INSTANCE ), LongSerializer.get() );
private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
@@ -72,20 +74,22 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
protected final Keyspace keyspace;
protected final CassandraConfig cassandraConfig;
protected final GraphFig graphFig;
+ protected final ShardConsistency shardConsistency;
@Inject
public EdgeShardSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
- final GraphFig graphFig ) {
+ final GraphFig graphFig, final ShardConsistency shardConsistency ) {
this.keyspace = keyspace;
this.cassandraConfig = cassandraConfig;
this.graphFig = graphFig;
+ this.shardConsistency = shardConsistency;
}
@Override
- public MutationBatch writeShardMeta( final ApplicationScope scope,
- final Shard shard, final DirectedEdgeMeta metaData) {
+ public MutationBatch writeShardMeta( final ApplicationScope scope, final Shard shard,
+ final DirectedEdgeMeta metaData ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateDirectedEdgeMeta( metaData );
@@ -98,7 +102,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope.getApplication(), metaData );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch().withConsistencyLevel( shardConsistency.getShardWriteConsistency() );
batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
.putColumn( shard.getShardIndex(), shard.isCompacted() );
@@ -108,8 +113,31 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
- public Iterator<Shard> getShardMetaData( final ApplicationScope scope,
- final Optional<Shard> start, final DirectedEdgeMeta metaData ) {
+ public Iterator<Shard> getShardMetaDataLocal( final ApplicationScope scope, final Optional<Shard> start,
+ final DirectedEdgeMeta metaData ) {
+
+ return getShardMetaDataInternal( scope, start, metaData, shardConsistency.getShardReadConsistency() );
+ }
+
+
+ @Override
+ public Iterator<Shard> getShardMetaDataAudit( final ApplicationScope scope, final Optional<Shard> start,
+ final DirectedEdgeMeta directedEdgeMeta ) {
+ return getShardMetaDataInternal( scope, start, directedEdgeMeta, shardConsistency.getShardAuditConsistency() );
+ }
+
+
+ /**
+ * Get the shard meta data, allowing the caller to specify the consistency level
+ * @param scope
+ * @param start
+ * @param metaData
+ * @param consistencyLevel
+ * @return
+ */
+ private Iterator<Shard> getShardMetaDataInternal( final ApplicationScope scope, final Optional<Shard> start,
+ final DirectedEdgeMeta metaData,
+ final ConsistencyLevel consistencyLevel ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateDirectedEdgeMeta( metaData );
@@ -134,8 +162,8 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
final RowQuery<ScopedRowKey<DirectedEdgeMeta>, Long> query =
- keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( cassandraConfig.getReadCL() ).getKey( rowKey )
- .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+ keyspace.prepareQuery( EDGE_SHARDS ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
return new ColumnNameIterator<>( query, COLUMN_PARSER, false );
@@ -143,18 +171,17 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
- public MutationBatch removeShardMeta( final ApplicationScope scope,
- final Shard shard, final DirectedEdgeMeta metaData) {
-
+ public MutationBatch removeShardMeta( final ApplicationScope scope, final Shard shard,
+ final DirectedEdgeMeta metaData ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.valiateShard( shard );
GraphValidation.validateDirectedEdgeMeta( metaData );
-
final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope.getApplication(), metaData );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch().withConsistencyLevel( shardConsistency.getShardWriteConsistency( ));
batch.withRow( EDGE_SHARDS, rowKey ).deleteColumn( shard.getShardIndex() );
@@ -164,19 +191,13 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
@Override
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-
-
return Collections.singleton(
- new MultiTennantColumnFamilyDefinition( EDGE_SHARDS, BytesType.class.getSimpleName(),
- ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
- MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+ new MultiTennantColumnFamilyDefinition( EDGE_SHARDS, BytesType.class.getSimpleName(),
+ ColumnTypes.LONG_TYPE_REVERSED, BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
}
-
-
-
-
private static class ShardColumnParser implements ColumnParser<Long, Shard> {
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 80c6a5f..592d308 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,7 +32,6 @@ import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
@@ -46,6 +45,8 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -62,6 +63,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private static final Shard MIN_SHARD = new Shard( 0, 0, true );
+ private static final NoOpCompaction NO_OP_COMPACTION = new NoOpCompaction();
+
private final EdgeShardSerialization edgeShardSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
private final ShardedEdgeSerialization shardedEdgeSerialization;
@@ -103,7 +106,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
else {
- existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+ existingShards = edgeShardSerialization.getShardMetaDataLocal( scope, maxShardId, directedEdgeMeta );
}
if ( existingShards == null || !existingShards.hasNext() ) {
@@ -120,8 +123,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
- return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope,
- directedEdgeMeta );
+ return new ShardEntryGroupIterator( existingShards, shardGroupCompaction, scope, directedEdgeMeta );
}
@@ -135,6 +137,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
Preconditions.checkNotNull( shardEntryGroup, "shardEntryGroup cannot be null" );
+ //we have to read our state from cassandra first to ensure we have an up to date view from other regions
+
+
+
/**
* Nothing to do, it's been created very recently, we don't create a new one
@@ -155,11 +161,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final Shard shard = shardEntryGroup.getMinShard();
- if ( shard.getCreatedTime() >= getMinTime() ) {
- return false;
- }
-
-
/**
* Check out if we have a count for our shard allocation
*/
@@ -173,8 +174,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
return false;
}
- if(LOG.isDebugEnabled()){
- LOG.debug("Count of {} has exceeded shard config of {} will begin compacting", count, shardSize);
+ if ( LOG.isDebugEnabled() ) {
+ LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
}
/**
@@ -193,13 +194,13 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
final Iterator<MarkedEdge> edges = directedEdgeMeta
- .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
- SearchByEdgeType.Order.ASCENDING );
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+ SearchByEdgeType.Order.ASCENDING );
if ( !edges.hasNext() ) {
- LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
- + "but no max value could be found in that row", directedEdgeMeta );
+ LOG.warn( "Tried to allocate a new shard for edge meta data {}, " + "but no max value could be found in that row",
+ directedEdgeMeta );
return false;
}
@@ -214,12 +215,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
- for(long i = 1; edges.hasNext(); i++){
+ for ( long i = 1; edges.hasNext(); i++ ) {
//we hit a pivot shard, set it since it could be the last one we encounter
- if(i% shardSize == 0){
+ if ( i % shardSize == 0 ) {
marked = edges.next();
}
- else{
+ else {
edges.next();
}
}
@@ -228,8 +229,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
* Sanity check in case our counters become severely out of sync with our edge state in cassandra.
*/
- if(marked == null){
- LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+ if ( marked == null ) {
+ LOG.warn( "Incorrect shard count for shard group {}, ignoring", shardEntryGroup );
return false;
}
@@ -248,59 +249,61 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
throw new RuntimeException( "Unable to connect to casandra", e );
}
-
return true;
}
+ /**
+ * Return true if the node has been created within our timeout. If this is the case, we dont' need to check
+ * cassandra, we know it won't exist
+ */
+ private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
- @Override
- public long getMinTime() {
- final long minimumAllowed = 2 * graphFig.getShardCacheTimeout();
+ //TODO: TN this is broken....
+ //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
+ final long timeNow = timeService.getCurrentTime();
- final long minDelta = graphFig.getShardMinDelta();
+ boolean isNew = true;
+ for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
- if ( minDelta < minimumAllowed ) {
- throw new GraphRuntimeException( String.format(
- "You must configure the property %s to be >= 2 x %s. Otherwise you risk losing data",
- GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
- }
+ //short circuit
+ if(!isNew || node.getId().getUuid().version() > 2){
+ return false;
+ }
- return timeService.getCurrentTime() - minDelta;
- }
+ final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+ //take our uuid time and add 10 seconds, if the uuid is within 10 seconds of system time, we can consider it "new"
+ final long newExpirationTimeout = uuidTime + 10000 ;
- /**
- * Return true if the node has been created within our timeout. If this is the case, we dont' need to check
- * cassandra, we know it won't exist
- */
- private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
+ //our expiration is after our current time, treat it as new
+ isNew = isNew && newExpirationTimeout > timeNow;
+ }
+ return isNew;
+ }
- //TODO: TN this is broken....
- //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
- final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+ private ShardEntryGroupIterator getCurrentStateIterator(final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+ final DirectedEdgeMeta directedEdgeMeta ){
- final long timeNow = timeService.getCurrentTime();
+ final Shard start = shardEntryGroup.getMaxShard();
- boolean isNew = true;
+ final Iterator<Shard> shards = this.edgeShardSerialization.getShardMetaDataAudit( scope, Optional.fromNullable( start ), directedEdgeMeta );
- for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+ return new ShardEntryGroupIterator( shards, NO_OP_COMPACTION, scope, directedEdgeMeta );
+ }
- //short circuit
- if(!isNew || node.getId().getUuid().version() > 2){
- return false;
- }
- final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+ private final static class NoOpCompaction implements ShardGroupCompaction{
- final long newExpirationTimeout = uuidTime + timeoutDelta;
+ @Override
+ public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+ final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
- //our expiration is after our current time, treat it as new
- isNew = isNew && newExpirationTimeout > timeNow;
+ //deliberately a no op
+ return Futures.immediateFuture( AuditResult.NOT_CHECKED );
}
-
- return isNew;
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/9f3bf2b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
new file mode 100644
index 0000000..c76fae6
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardConsistencyImpl.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardConsistency;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.model.ConsistencyLevel;
+
+
+/**
+ * Implementation wrapper for enums
+ */
+@Singleton
+public class ShardConsistencyImpl implements ShardConsistency{
+
+ private final GraphFig graphFig;
+
+
+ @Inject
+ public ShardConsistencyImpl( final GraphFig graphFig ) {this.graphFig = graphFig;}
+
+
+ @Override
+ public ConsistencyLevel getShardWriteConsistency() {
+ return ConsistencyLevel.valueOf( graphFig.getShardWriteConsistency() );
+ }
+
+
+ @Override
+ public ConsistencyLevel getShardReadConsistency() {
+ return null;
+ }
+
+
+ @Override
+ public ConsistencyLevel getShardAuditConsistency() {
+ return null;
+ }
+}