You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/08/18 21:42:12 UTC
[1/3] git commit: Upgraded All external calls to use Hystrix.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-188 72684dfca -> c69c1974a
Upgraded All external calls to use Hystrix.
Added shard compaction tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cef3de96
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cef3de96
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cef3de96
Branch: refs/heads/USERGRID-188
Commit: cef3de96c0a38b22420935c941bba422a32310b6
Parents: 72684df
Author: Todd Nine <to...@apache.org>
Authored: Thu Aug 14 17:28:40 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Aug 14 17:28:40 2014 -0600
----------------------------------------------------------------------
.../collection/guice/TestModule.java | 2 +
.../core/hystrix/HystrixCassandra.java | 63 +---
.../core/cassandra/CassandraRule.java | 15 +-
.../persistence/graph/guice/GraphModule.java | 52 +--
.../graph/guice/StorageEdgeSerialization.java | 34 --
.../graph/impl/GraphManagerImpl.java | 48 +--
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 11 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 56 ++-
.../impl/stage/NodeDeleteListenerImpl.java | 19 +-
.../impl/NodeSerializationImpl.java | 70 ++--
.../NodeShardCounterSerializationImpl.java | 17 +-
.../shard/impl/NodeShardAllocationImpl.java | 71 ++--
.../graph/GraphManagerShardConsistencyIT.java | 337 +++++++++++++++++++
.../graph/StorageGraphManagerIT.java | 2 +-
.../graph/impl/EdgeDeleteListenerTest.java | 4 +-
.../graph/impl/NodeDeleteListenerTest.java | 2 -
.../graph/impl/stage/EdgeDeleteRepairTest.java | 2 -
.../graph/impl/stage/EdgeMetaRepairTest.java | 2 -
.../EdgeSerializationChopTest.java | 2 -
.../PermanentSerializationTest.java | 2 -
.../src/test/resources/usergrid-UNIT.properties | 1 +
stack/corepersistence/pom.xml | 2 +-
22 files changed, 493 insertions(+), 321 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
index a049b4d..44a8fe3 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/guice/TestModule.java
@@ -37,7 +37,9 @@ public abstract class TestModule extends AbstractModule {
try {
//load up the properties
+ ConfigurationManager.getDeploymentContext().setDeploymentEnvironment( "UNIT" );
ConfigurationManager.loadCascadedPropertiesFromResources( "usergrid" );
+
}
catch ( IOException e ) {
throw new RuntimeException( "Cannot do much without properly loading our configuration.", e );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
index d503a6c..7758651 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.core.hystrix;
+import com.netflix.astyanax.Execution;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -27,6 +28,7 @@ import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
/**
@@ -36,6 +38,7 @@ public class HystrixCassandra {
+
/**
* Command group used for realtime user commands
*/
@@ -48,17 +51,17 @@ public class HystrixCassandra {
/**
- * Execute an user mutation
+ * Execute an user operation
*/
- public static OperationResult<Void> userMutation( final MutationBatch batch ) {
+ public static <R> OperationResult<R> user( final Execution<R> execution) {
- return new HystrixCommand<OperationResult<Void>>( USER_GROUP ) {
+ return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
@Override
- protected OperationResult<Void> run() {
+ protected OperationResult<R> run() {
try {
- return batch.execute();
+ return execution.execute();
}
catch ( ConnectionException e ) {
throw new RuntimeException( e );
@@ -69,17 +72,17 @@ public class HystrixCassandra {
/**
- * Execute an user mutation
+ * Execute an an async operation
*/
- public static <K, C> OperationResult<ColumnList<C>> userQuery( final RowQuery<K, C> query ) {
+ public static <R> OperationResult<R> async( final Execution<R> execution) {
- return new HystrixCommand<OperationResult<ColumnList<C>>>( USER_GROUP ) {
+ return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
@Override
- protected OperationResult<ColumnList<C>> run() {
+ protected OperationResult<R> run() {
try {
- return query.execute();
+ return execution.execute();
}
catch ( ConnectionException e ) {
throw new RuntimeException( e );
@@ -89,44 +92,4 @@ public class HystrixCassandra {
}
- /**
- * Execute an asynchronous mutation
- */
- public static OperationResult<Void> asyncMutation( final MutationBatch batch ) {
-
-
- return new HystrixCommand<OperationResult<Void>>( ASYNC_GROUP ) {
-
- @Override
- protected OperationResult<Void> run() {
- try {
- return batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
-
-
- /**
- * Execute an asynchronous query
- */
- public static <K, C> OperationResult<ColumnList<C>> asyncQuery( final RowQuery<K, C> query ) {
-
-
- return new HystrixCommand<OperationResult<ColumnList<C>>>( ASYNC_GROUP ) {
-
- @Override
- protected OperationResult<ColumnList<C>> run() {
- try {
- return query.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( e );
- }
- }
- }.execute();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
index caf89c5..ee89e0f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
@@ -5,6 +5,8 @@ import com.google.common.io.Files;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.netflix.astyanax.test.EmbeddedCassandra;
+import com.netflix.config.ConfigurationManager;
+
import java.io.File;
import java.io.IOException;
import org.apache.cassandra.io.util.FileUtils;
@@ -32,9 +34,19 @@ public class CassandraRule extends EnvironResource {
private final CassandraFig cassandraFig;
+ static{
+ ConfigurationManager.getDeploymentContext().setDeploymentEnvironment( "UNIT" );
+ try {
+ ConfigurationManager.loadCascadedPropertiesFromResources( "usergrid" );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException(e);
+ }
+ }
public CassandraRule() {
super( Env.UNIT );
+
Injector injector = Guice.createInjector( new GuicyFigModule( CassandraFig.class ) );
cassandraFig = injector.getInstance( CassandraFig.class );
}
@@ -43,7 +55,8 @@ public class CassandraRule extends EnvironResource {
protected void before() throws Throwable {
if ( !cassandraFig.isEmbedded()) {
- LOG.info("Using external Cassandra");
+ LOG.info("Using external Cassandra");
+ return;
}
if ( started ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index ca7c270..64f0fbb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.graph.guice;
import org.safehaus.guicyfig.GuicyFigModule;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
import org.apache.usergrid.persistence.core.migration.Migration;
@@ -61,13 +60,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
import com.google.inject.Key;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
-import com.netflix.astyanax.Keyspace;
public class GraphModule extends AbstractModule {
@@ -116,7 +111,13 @@ public class GraphModule extends AbstractModule {
bind( NodeDeleteListener.class ).to( NodeDeleteListenerImpl.class );
bind( EdgeDeleteListener.class ).to( EdgeDeleteListenerImpl.class );
+ bind( EdgeSerialization.class ).to( EdgeSerializationImpl.class );
+ bind( EdgeShardStrategy.class ).to( SizebasedEdgeShardStrategy.class );
+
+ bind( ShardedEdgeSerialization.class ).to( ShardedEdgeSerializationImpl.class );
+
+ bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
/**
@@ -134,50 +135,11 @@ public class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( Key.get( EdgeMetadataSerialization.class ) );
//bind each singleton to the multi set. Otherwise we won't migrate properly
- migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class, StorageEdgeSerialization.class ) );
+ migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
}
-
-
- /**
- * Our permanent serialization strategy
- */
- @Provides
- @Singleton
- @Inject
- @StorageEdgeSerialization
- public EdgeSerialization storageSerialization( final NodeShardCache cache, final Keyspace keyspace,
- final CassandraConfig cassandraConfig, final GraphFig graphFig,
- final NodeShardApproximation shardApproximation,
- final TimeService timeService,
- @StorageEdgeSerialization
- final EdgeColumnFamilies edgeColumnFamilies ) {
-
- final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache, shardApproximation );
-
-
- final ShardedEdgeSerialization serialization = new ShardedEdgeSerializationImpl(keyspace, cassandraConfig, graphFig, sizeBasedStrategy,
- timeService );
-
-
- final EdgeSerializationImpl edgeSerialization =
- new EdgeSerializationImpl( keyspace, cassandraConfig, graphFig, sizeBasedStrategy, edgeColumnFamilies,
- serialization );
-
-
- return edgeSerialization;
- }
-
-
- @Provides
- @Singleton
- @Inject
- @StorageEdgeSerialization
- public EdgeColumnFamilies storageSerializationColumnFamilies() {
- return new SizebasedEdgeColumnFamilies();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
deleted file mode 100644
index 267be03..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/StorageEdgeSerialization.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
-@BindingAnnotation
-public @interface StorageEdgeSerialization {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 473f49e..ae38372 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -40,7 +41,6 @@ import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchByIdType;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.SearchIdType;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -82,7 +82,6 @@ public class GraphManagerImpl implements GraphManager {
private final EdgeDeleteListener edgeDeleteListener;
private final NodeDeleteListener nodeDeleteListener;
- private Observer<Integer> edgeWriteSubcriber;
private Observer<Integer> edgeDeleteSubcriber;
private Observer<Integer> nodeDelete;
@@ -92,7 +91,7 @@ public class GraphManagerImpl implements GraphManager {
@Inject
public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
- @StorageEdgeSerialization final EdgeSerialization storageEdgeSerialization,
+ final EdgeSerialization storageEdgeSerialization,
final NodeSerialization nodeSerialization, final GraphFig graphFig,
@Assisted final ApplicationScope scope, final EdgeDeleteListener edgeDeleteListener,
final NodeDeleteListener nodeDeleteListener ) {
@@ -113,7 +112,6 @@ public class GraphManagerImpl implements GraphManager {
this.edgeDeleteListener = edgeDeleteListener;
this.nodeDeleteListener = nodeDeleteListener;
- this.edgeWriteSubcriber = MetricSubscriber.INSTANCE;
this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
this.nodeDelete = MetricSubscriber.INSTANCE;
}
@@ -138,14 +136,7 @@ public class GraphManagerImpl implements GraphManager {
mutation.mergeShallow( edgeMutation );
-
- try {
- LOG.debug( "Writing edge {} to metadata and commit log", edge );
- mutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
+ HystrixCassandra.user( mutation );
return edge;
}
@@ -170,13 +161,8 @@ public class GraphManagerImpl implements GraphManager {
final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
- try {
- LOG.debug( "Marking edge {} as deleted to commit log", edge );
- edgeMutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
+ LOG.debug( "Marking edge {} as deleted to commit log", edge );
+ HystrixCassandra.user( edgeMutation );
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -205,13 +191,9 @@ public class GraphManagerImpl implements GraphManager {
final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
- try {
- LOG.debug( "Marking node {} as deleted to node mark", node );
- nodeMutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to connect to cassandra", e );
- }
+ LOG.debug( "Marking node {} as deleted to node mark", node );
+ HystrixCassandra.user( nodeMutation );
+
//HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
// Schedulers.io() ).subscribe( nodeDelete );
@@ -349,7 +331,7 @@ public class GraphManagerImpl implements GraphManager {
* used in conjunction with the max version filter to filter any edges that should not be returned
*
* @return An observable that emits only edges that can be consumed. There could be multiple versions of the
- * same edge so those need de-duped.
+ * same edge so those need de-duped.
*/
@Override
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
@@ -409,18 +391,6 @@ public class GraphManagerImpl implements GraphManager {
}
}
- /**
- * Used for testing and callback hooks. TODO: Refactor
- */
-
- /**
- * Set the subcription for the edge write
- */
- public void setEdgeWriteSubcriber( final Observer<Integer> edgeWriteSubcriber ) {
- Preconditions.checkNotNull( edgeWriteSubcriber, "Subscriber cannot be null" );
- this.edgeWriteSubcriber = edgeWriteSubcriber;
- }
-
/**
* Set the subscription for the edge delete
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 6eb7a85..5be3541 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,13 +26,13 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -61,7 +61,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
@Inject
- public EdgeDeleteRepairImpl( @StorageEdgeSerialization final EdgeSerialization storageSerialization,
+ public EdgeDeleteRepairImpl( final EdgeSerialization storageSerialization,
final GraphFig graphFig, final Keyspace keyspace ) {
Preconditions.checkNotNull( "storageSerialization is required", storageSerialization );
@@ -95,12 +95,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
//remove from storage
- try {
- storageSerialization.deleteEdge( scope, edge, timestamp ).execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to remove edge from storage", e );
- }
+ HystrixCassandra.async(storageSerialization.deleteEdge( scope, edge, timestamp ));
}
}
} );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index a0769f5..055b867 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -27,13 +27,13 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -72,8 +72,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Inject
public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization, final Keyspace keyspace,
- final GraphFig graphFig,
- @StorageEdgeSerialization final EdgeSerialization storageEdgeSerialization ) {
+ final GraphFig graphFig, final EdgeSerialization storageEdgeSerialization ) {
Preconditions.checkNotNull( "edgeMetadataSerialization is required", edgeMetadataSerialization );
@@ -115,9 +114,9 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
Preconditions.checkNotNull( serialization, "serialization is required" );
- Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, maxTimestamp )
- .buffer( graphFig.getRepairConcurrentSize() )
- //buffer them into concurrent groups based on the concurrent repair size
+ Observable<Integer> deleteCounts = serialization.loadEdgeSubTypes( scope, node, edgeType, maxTimestamp ).buffer(
+ graphFig.getRepairConcurrentSize() )
+ //buffer them into concurrent groups based on the concurrent repair size
.flatMap( new Func1<List<String>, Observable<Integer>>() {
@Override
@@ -154,15 +153,15 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
**/
if ( count != 0 ) {
LOG.debug( "Found edge with nodeId {}, type {}, "
- + "and subtype {}. Not removing subtype. ", node,
- edgeType, subType );
+ + "and subtype {}. Not removing subtype. ",
+ node, edgeType, subType );
return;
}
LOG.debug( "No edges with nodeId {}, type {}, "
- + "and subtype {}. Removing subtype.", node, edgeType,
- subType );
+ + "and subtype {}. Removing subtype.", node,
+ edgeType, subType );
batch.mergeShallow( serialization
.removeEdgeSubType( scope, node, edgeType, subType,
maxTimestamp ) );
@@ -179,22 +178,22 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
*/
return MathObservable.sumInteger( Observable.merge( checks ) )
.doOnNext( new Action1<Integer>() {
- @Override
- public void call( final Integer count ) {
+ @Override
+ public void call( final Integer count ) {
- LOG.debug( "Executing batch for subtype deletion with type {}. "
- + "Mutation has {} rows to mutate ", edgeType,
- batch.getRowCount() );
+ LOG.debug(
+ "Executing batch for subtype deletion with " +
+ "type {}. "
+ + "Mutation has {} rows to mutate ",
+ edgeType, batch.getRowCount() );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to execute mutation", e );
- }
- }
- } );
+ HystrixCassandra.async( batch );
+ }
+ }
+
+
+ );
}
} )
//if we get no edges, emit a 0 so the caller knows we can delete the type
@@ -217,15 +216,10 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
return;
}
- try {
- LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType,
- maxTimestamp );
- serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to execute mutation" );
- }
+ LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}. Deleting type.", edgeType,
+ maxTimestamp );
+ HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ) );
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index d7cf742..962da21 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -28,13 +28,13 @@ import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -77,7 +77,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
public NodeDeleteListenerImpl( final NodeSerialization nodeSerialization,
final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeMetaRepair edgeMetaRepair, final GraphFig graphFig,
- @StorageEdgeSerialization final EdgeSerialization storageSerialization,
+ final EdgeSerialization storageSerialization,
final Keyspace keyspace ) {
@@ -129,13 +129,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
.doOnCompleted( new Action0() {
@Override
public void call() {
- try {
- nodeSerialization.delete( scope, node, maxVersion.get() ).execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to delete marked graph node " + node,
- e );
- }
+ HystrixCassandra.async(nodeSerialization.delete( scope, node, maxVersion.get() ));
}
} );
}
@@ -216,12 +210,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
}
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to delete edges", e );
- }
+ HystrixCassandra.async( batch );
//now delete meta data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 788fab4..b14fad6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.migration.Migration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -81,18 +82,17 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
/**
- * Columns are always a byte, and the entire value is contained within a row key. This is intentional
- * This allows us to make heavy use of Cassandra's bloom filters, as well as key caches.
- * Since most nodes will only exist for a short amount of time in this CF, we'll most likely have them in the key
- * cache, and we'll also bounce from the BloomFilter on read. This means our performance will be no worse
- * than checking a distributed cache in RAM for the existence of a marked node.
+ * Columns are always a byte, and the entire value is contained within a row key. This is intentional This allows
+ * us to make heavy use of Cassandra's bloom filters, as well as key caches. Since most nodes will only exist for a
+ * short amount of time in this CF, we'll most likely have them in the key cache, and we'll also bounce from the
+ * BloomFilter on read. This means our performance will be no worse than checking a distributed cache in RAM for
+ * the existence of a marked node.
*/
private static final MultiTennantColumnFamily<ApplicationScope, Id, Boolean> GRAPH_DELETE =
new MultiTennantColumnFamily<ApplicationScope, Id, Boolean>( "Graph_Marked_Nodes",
new OrganizationScopedRowKeySerializer<Id>( ROW_SERIALIZER ), BOOLEAN_SERIALIZER );
-
protected final Keyspace keyspace;
protected final CassandraConfig fig;
@@ -108,7 +108,8 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
return Collections.singleton(
new MultiTennantColumnFamilyDefinition( GRAPH_DELETE, BytesType.class.getSimpleName(),
- BooleanType.class.getSimpleName(), BytesType.class.getSimpleName(), MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
+ BooleanType.class.getSimpleName(), BytesType.class.getSimpleName(),
+ MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
}
@@ -120,7 +121,7 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( fig.getWriteCL() );
- batch.withRow( GRAPH_DELETE, ScopedRowKey.fromKey( scope, node ) ).setTimestamp(timestamp )
+ batch.withRow( GRAPH_DELETE, ScopedRowKey.fromKey( scope, node ) ).setTimestamp( timestamp )
.putColumn( COLUMN_NAME, timestamp );
return batch;
@@ -147,23 +148,26 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
ValidationUtils.validateApplicationScope( scope );
ValidationUtils.verifyIdentity( node );
- ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query = keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel(
- fig.getReadCL() );
+ ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query =
+ keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
try {
- Column<Boolean> result =
- query.getKey( ScopedRowKey.fromKey( scope, node ) ).getColumn( COLUMN_NAME ).execute().getResult();
+ Column<Boolean> result = HystrixCassandra
+ .user( query.getKey( ScopedRowKey.fromKey( scope, node ) ).getColumn( COLUMN_NAME ) )
+ .getResult();
return Optional.of( result.getLongValue() );
}
- catch ( NotFoundException e ) {
- //swallow, there's just no column
- return Optional.absent();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to connect to casandra", e );
+ catch (RuntimeException re ) {
+ if(re.getCause().getCause() instanceof NotFoundException) {
+ //swallow, there's just no column
+ return Optional.absent();
+ }
+
+ throw re;
}
+
}
@@ -173,36 +177,34 @@ public class NodeSerializationImpl implements NodeSerialization, Migration {
Preconditions.checkNotNull( edges, "edges cannot be null" );
- final ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query = keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
+ final ColumnFamilyQuery<ScopedRowKey<ApplicationScope, Id>, Boolean> query =
+ keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL() );
- final List<ScopedRowKey<ApplicationScope, Id>> keys = new ArrayList<ScopedRowKey<ApplicationScope, Id>>(edges.size());
+ final List<ScopedRowKey<ApplicationScope, Id>> keys =
+ new ArrayList<ScopedRowKey<ApplicationScope, Id>>( edges.size() );
//worst case all are marked
- final Map<Id, Long> versions = new HashMap<>(edges.size());
+ final Map<Id, Long> versions = new HashMap<>( edges.size() );
- for(final Edge edge: edges){
+ for ( final Edge edge : edges ) {
keys.add( ScopedRowKey.fromKey( scope, edge.getSourceNode() ) );
keys.add( ScopedRowKey.fromKey( scope, edge.getTargetNode() ) );
}
- try {
- final Rows<ScopedRowKey<ApplicationScope, Id>, Boolean>
- results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ).execute().getResult();
-
- for(Row<ScopedRowKey<ApplicationScope, Id>, Boolean> row: results){
- Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
- if(column != null){
- versions.put( row.getKey().getKey(), column.getLongValue() );
- }
+ final Rows<ScopedRowKey<ApplicationScope, Id>, Boolean> results = HystrixCassandra
+ .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList( COLUMN_NAME ) ) )
+ .getResult();
+ for ( Row<ScopedRowKey<ApplicationScope, Id>, Boolean> row : results ) {
+ Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME );
+ if ( column != null ) {
+ versions.put( row.getKey().getKey(), column.getLongValue() );
}
}
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to execute multiget for all max versions", e );
- }
+
return versions;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 6b99e93..d3d92ea 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -119,18 +120,20 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
try {
- OperationResult<Column<Boolean>> column =
- keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
+ OperationResult<Column<Boolean>> column = HystrixCassandra.user(
+ keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ) );
return column.getResult().getLongValue();
}
//column not found, return 0
- catch ( NotFoundException nfe ) {
- return 0;
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "An error occurred connecting to cassandra", e );
+ catch ( RuntimeException re ) {
+ if(re.getCause() instanceof NotFoundException) {
+ return 0;
+ }
+
+ throw re;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 3c3b99c..6c79814 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -45,6 +46,7 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -56,7 +58,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class );
- private static final Shard MIN_SHARD = new Shard(0, 0, true);
+ private static final Shard MIN_SHARD = new Shard( 0, 0, true );
private final EdgeShardSerialization edgeShardSerialization;
private final EdgeColumnFamilies edgeColumnFamilies;
@@ -82,32 +84,29 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope,
- final Optional<Shard> maxShardId, final DirectedEdgeMeta directedEdgeMeta ) {
+ public Iterator<ShardEntryGroup> getShards( final ApplicationScope scope, final Optional<Shard> maxShardId,
+ final DirectedEdgeMeta directedEdgeMeta ) {
- ValidationUtils.validateApplicationScope(scope);
+ ValidationUtils.validateApplicationScope( scope );
Preconditions.checkNotNull( maxShardId, "maxShardId cannot be null" );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
Iterator<Shard> existingShards;
//its a new node, it doesn't need to check cassandra, it won't exist
- if(isNewNode(directedEdgeMeta)){
+ if ( isNewNode( directedEdgeMeta ) ) {
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
- else{
+ else {
existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
}
- if(existingShards == null || !existingShards.hasNext()){
+ if ( existingShards == null || !existingShards.hasNext() ) {
- try {
- edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta ).execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to allocate minimum shard" );
- }
+
+ final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD, directedEdgeMeta );
+ HystrixCassandra.user( batch );
existingShards = Collections.singleton( MIN_SHARD ).iterator();
}
@@ -117,9 +116,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
@Override
- public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup, final DirectedEdgeMeta directedEdgeMeta) {
+ public boolean auditShard( final ApplicationScope scope, final ShardEntryGroup shardEntryGroup,
+ final DirectedEdgeMeta directedEdgeMeta ) {
- ValidationUtils.validateApplicationScope(scope);
+ ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateShardEntryGroup( shardEntryGroup );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
@@ -129,12 +129,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
* Nothing to do, it's been created very recently, we don't create a new one
*/
- if (shardEntryGroup.isCompactionPending()) {
+ if ( shardEntryGroup.isCompactionPending() ) {
return false;
}
//we can't allocate, we have more than 1 write shard currently. We need to compact first
- if(shardEntryGroup.entrySize() != 1){
+ if ( shardEntryGroup.entrySize() != 1 ) {
return false;
}
@@ -145,7 +145,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final Shard shard = shardEntryGroup.getMinShard();
- if (shard.getCreatedTime() >= getMinTime()){
+ if ( shard.getCreatedTime() >= getMinTime() ) {
return false;
}
@@ -154,8 +154,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
* Check out if we have a count for our shard allocation
*/
- final long count =
- nodeShardApproximation.getCount( scope, shard, directedEdgeMeta);
+ final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
if ( count < graphFig.getShardSize() ) {
@@ -163,13 +162,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
}
-
-
/**
* Allocate the shard
*/
- final Iterator<MarkedEdge> edges = directedEdgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+ final Iterator<MarkedEdge> edges = directedEdgeMeta
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
if ( !edges.hasNext() ) {
@@ -184,17 +182,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
final long createTimestamp = timeService.getCurrentTime();
- final Shard newShard = new Shard(marked.getTimestamp(), createTimestamp, false);
+ final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false );
- try {
- this.edgeShardSerialization
- .writeShardMeta( scope, newShard, directedEdgeMeta )
- .execute();
- }
- catch ( ConnectionException e ) {
- throw new GraphRuntimeException( "Unable to write the new edge metadata" );
- }
+ final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
+
+ HystrixCassandra.user( batch );
return true;
@@ -222,28 +215,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
/**
* Return true if the node has been created within our timeout. If this is the case, we dont' need to check
* cassandra, we know it won't exist
- *
- * @param directedEdgeMeta
- * @return
*/
- private boolean isNewNode(DirectedEdgeMeta directedEdgeMeta){
+ private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
/**
* the max time in microseconds we can allow
*/
- final long maxTime = (timeService.getCurrentTime() + graphFig.getShardCacheTimeout() )* 10000;
+ final long maxTime = ( timeService.getCurrentTime() + graphFig.getShardCacheTimeout() ) * 10000;
- for(DirectedEdgeMeta.NodeMeta node: directedEdgeMeta.getNodes()){
+ for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
final long uuidTime = node.getId().getUuid().timestamp();
- if(uuidTime < maxTime){
+ if ( uuidTime < maxTime ) {
return true;
}
}
return false;
-
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
new file mode 100644
index 0000000..8f5171b
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -0,0 +1,337 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph;
+
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runners.model.InitializationError;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.commons.lang.time.StopWatch;
+
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.migration.MigrationException;
+import org.apache.usergrid.persistence.core.migration.MigrationManager;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.common.base.Optional;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import rx.Observable;
+import rx.Subscriber;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+
+@Ignore("A stress test, not part of functional testing")
+public class GraphManagerShardConsistencyIT {
+ private static final Logger log = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
+
+
+ @ClassRule
+ public static CassandraRule rule = new CassandraRule();
+
+
+ protected ApplicationScope scope;
+
+ protected int numWorkers;
+
+ @Before
+ public void setupOrg() {
+
+
+
+ //get the system property of the UUID to use. If one is not set, use the defualt
+ String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
+
+ scope = new ApplicationScopeImpl( createId( UUID.fromString( uuidString ), "test" ) );
+
+ numWorkers = Integer.parseInt( System.getProperty( "numWorkers", "4" ) );
+// readCount = Integer.parseInt( System.getProperty( "readCount", "20000" ) );
+ }
+
+
+// @Test
+// public void writeThousandsSingleSource() throws InterruptedException, ExecutionException {
+// EdgeGenerator generator = new EdgeGenerator() {
+//
+// private Id sourceId = createId( "source" );
+//
+//
+// @Override
+// public Edge newEdge() {
+// Edge edge = createEdge( sourceId, "test", createId( "target" ) );
+//
+//
+// return edge;
+// }
+//
+//
+// @Override
+// public Observable<Edge> doSearch( final GraphManager manager ) {
+// return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
+// }
+// };
+//
+//
+//
+// doTest( generator );
+// }
+
+
+ @Test
+ public void writeThousandsSingleTarget() throws InterruptedException, ExecutionException, MigrationException {
+
+ final Id targetId = createId("target");
+ final String edgeType = "test";
+
+ EdgeGenerator generator = new EdgeGenerator() {
+
+
+ @Override
+ public Edge newEdge() {
+ Edge edge = createEdge( createId( "source" ), edgeType, targetId );
+
+
+ return edge;
+ }
+
+
+ @Override
+ public Observable<Edge> doSearch( final GraphManager manager ) {
+ return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+ }
+ };
+
+
+ /**
+ * create 3 injectors. This way all the caches are independent of one another. This is the same as
+ * multiple nodes
+ */
+ final List<Injector> injectors = createInjectors(3);
+
+
+ final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+ final long shardSize = graphFig.getShardSize();
+
+ /**
+ * Do 4x shard size so we should have approximately 4 shards
+ */
+ final long numberOfEdges = shardSize * 4;
+
+ final long countPerWorker = numberOfEdges/numWorkers;
+
+ final long writeLimit = countPerWorker;
+
+
+// HystrixCassandra.ASYNC_GROUP.
+
+
+
+
+
+ final List<Future<Boolean>> futures = new ArrayList<>();
+
+ for(Injector injector: injectors) {
+ final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
+
+ futures.addAll( doTest( gmf, generator, writeLimit ) );
+ }
+
+ for(Future<Boolean> future: futures){
+ future.get();
+ }
+
+ //now get all our shards
+ final NodeShardAllocation allocation = getInstance( injectors, NodeShardAllocation.class );
+
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
+
+ int count = 0;
+
+ while(count < 4) {
+
+ //reset our count. Ultimately we'll have 4 groups once our compaction completes
+ count = 0;
+
+ final Iterator<ShardEntryGroup> groups = allocation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+ while(groups.hasNext()){
+ final ShardEntryGroup group = groups.next();
+
+ log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
+
+ count++;
+
+ }
+ }
+
+
+
+
+ }
+
+ private <T> T getInstance(final List<Injector> injectors, Class<T> clazz ){
+ return injectors.get( 0 ).getInstance( clazz );
+ }
+
+
+ /**
+ * Create new Guice injector environments and return them
+ * @param count
+ */
+ private List<Injector> createInjectors( int count ) throws MigrationException {
+
+ final List<Injector> injectors = new ArrayList<>(count);
+
+ for(int i = 0; i < count; i++){
+ final Injector injector = Guice.createInjector( new TestGraphModule() );
+ injectors.add( injector );
+ }
+
+
+ final MigrationManager migrationManager = getInstance( injectors, MigrationManager.class );
+
+ migrationManager.migrate();
+
+ return injectors;
+
+
+ }
+
+ /**
+ * Execute the test with the generator
+ */
+ private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount ) throws InterruptedException, ExecutionException {
+
+ ExecutorService executor = Executors.newFixedThreadPool( numWorkers );
+
+ List<Future<Boolean>> futures = new ArrayList<>( numWorkers );
+
+ for ( int i = 0; i < numWorkers; i++ ) {
+ Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount ) );
+
+ futures.add( future );
+ }
+
+
+ return futures;
+ }
+
+
+ private class Worker implements Callable<Boolean> {
+ private final GraphManagerFactory factory;
+ private final EdgeGenerator generator;
+ private final long writeLimit;
+
+
+ private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit) {
+ this.factory = factory;
+ this.generator = generator;
+ this.writeLimit = writeLimit;
+ }
+
+
+ @Override
+ public Boolean call() throws Exception {
+ GraphManager manager = factory.createEdgeManager( scope );
+
+
+ final StopWatch timer = new StopWatch();
+ timer.start();
+
+ for ( long i = 0; i < writeLimit; i++ ) {
+
+ Edge edge = generator.newEdge();
+
+ Edge returned = manager.writeEdge( edge ).toBlocking().last();
+
+
+ assertNotNull( "Returned has a version", returned.getTimestamp() );
+
+
+ if ( i % 1000 == 0 ) {
+ log.info( " Wrote: " + i );
+ }
+ }
+
+ timer.stop();
+ log.info( "Total time to write {} entries {} ms", writeLimit, timer.getTime() );
+
+ return true;
+ }
+ }
+
+
+ private interface EdgeGenerator {
+
+ /**
+ * Create a new edge to persiste
+ */
+ public Edge newEdge();
+
+ /**
+ * Perform the search returning an observable edge
+ * @param manager
+ * @return
+ */
+ public Observable<Edge> doSearch( final GraphManager manager );
+ }
+
+
+}
+
+
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
index 3c71ae8..27f8479 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/StorageGraphManagerIT.java
@@ -86,7 +86,7 @@ public class StorageGraphManagerIT extends GraphManagerIT {
};
gmi.setEdgeDeleteSubcriber(subscriber );
- gmi.setEdgeWriteSubcriber( subscriber );
+// gmi.setEdgeWriteSubcriber( subscriber );
gmi.setNodeDelete( subscriber );
return helper;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
index 3bcbfd0..1f6df4c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/EdgeDeleteListenerTest.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -80,7 +79,6 @@ public class EdgeDeleteListenerTest {
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization storageEdgeSerialization;
@@ -220,7 +218,7 @@ public class EdgeDeleteListenerTest {
final UUID foobar = UUIDGenerator.newTimeUUID();
- storageEdgeSerialization.writeEdge( scope, edgeV1, foobar ).execute();
+ storageEdgeSerialization.writeEdge( scope, edgeV1, foobar ).execute();
storageEdgeSerialization.writeEdge( scope, edgeV2, foobar ).execute();
storageEdgeSerialization.writeEdge( scope, edgeV3, foobar ).execute();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index f2a8cee..3b66a31 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -39,7 +39,6 @@ import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -83,7 +82,6 @@ public class NodeDeleteListenerTest {
protected NodeDeleteListener deleteListener;
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization edgeSerialization;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index 91767b0..4b62ad1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -34,7 +34,6 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.cassandra.ITRunner;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -71,7 +70,6 @@ public class EdgeDeleteRepairTest {
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization storageEdgeSerialization;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
index 6be02c4..f14e007 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairTest.java
@@ -35,7 +35,6 @@ import org.apache.usergrid.persistence.core.cassandra.ITRunner;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
@@ -73,7 +72,6 @@ public class EdgeMetaRepairTest {
protected EdgeMetaRepair edgeMetaRepair;
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization storageEdgeSerialization;
@Inject
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
index 6da3506..54eec6b 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/EdgeSerializationChopTest.java
@@ -32,7 +32,6 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.core.cassandra.ITRunner;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -64,7 +63,6 @@ public class EdgeSerializationChopTest {
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization serialization;
protected ApplicationScope scope;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
index 2ff571b..caf1833 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/PermanentSerializationTest.java
@@ -24,7 +24,6 @@ import org.jukito.UseModules;
import org.junit.runner.RunWith;
import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.graph.guice.StorageEdgeSerialization;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import com.google.inject.Inject;
@@ -35,7 +34,6 @@ import com.google.inject.Inject;
public class PermanentSerializationTest extends EdgeSerializationTest {
@Inject
- @StorageEdgeSerialization
protected EdgeSerialization edgeSerialization;
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index fd24f43..61612f6 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -6,6 +6,7 @@ cassandra.hosts=localhost
cassandra.cluster_name=Usergrid
collections.keyspace=Usergrid_Collections
cassandra.timeout=5000
+#cassandra.embedded=true
collections.keyspace.strategy.options=replication_factor:1
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cef3de96/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9565543..a9223f9 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -48,7 +48,7 @@
<guava.version>15.0</guava.version>
<guice.version>3.0</guice.version>
<guicyfig.version>3.2</guicyfig.version>
- <hystrix.version>1.3.15</hystrix.version>
+ <hystrix.version>1.3.16</hystrix.version>
<jackson-2-version>2.3.1</jackson-2-version>
<jackson-smile.verson>1.9.9</jackson-smile.verson>
<jukito.version>1.4-UG</jukito.version>
[3/3] git commit: Finished refactor of low level serialization to
remove impedance mismatch of apis between read and write.
Posted by to...@apache.org.
Finished refactor of low level serialization to remove impedance mismatch of apis between read and write.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c69c1974
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c69c1974
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c69c1974
Branch: refs/heads/USERGRID-188
Commit: c69c1974ade1a0a580ccf633f6a1cb8dc14dd1f0
Parents: cef3de9
Author: Todd Nine <to...@apache.org>
Authored: Fri Aug 15 16:49:25 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Mon Aug 18 13:41:31 2014 -0600
----------------------------------------------------------------------
.../core/astyanax/ColumnNameIterator.java | 24 +-
.../core/hystrix/HystrixCassandra.java | 10 +-
stack/corepersistence/graph/pom.xml | 7 +
.../usergrid/persistence/graph/GraphFig.java | 13 +
.../graph/impl/CollectionIndexObserver.java | 2 +-
.../impl/EdgeSerializationImpl.java | 256 ++++++-
.../impl/shard/DirectedEdgeMeta.java | 65 +-
.../impl/shard/ShardEntryGroup.java | 16 +-
.../impl/shard/ShardGroupCompaction.java | 39 +-
.../impl/shard/ShardedEdgeSerialization.java | 91 ++-
.../NodeShardCounterSerializationImpl.java | 2 +-
.../impl/shard/impl/EdgeSearcher.java | 32 +-
.../shard/impl/NodeShardAllocationImpl.java | 17 +-
.../impl/shard/impl/NodeShardCacheImpl.java | 127 ++--
.../shard/impl/ShardGroupColumnIterator.java | 130 ++++
.../shard/impl/ShardGroupCompactionImpl.java | 242 ++++++-
.../impl/shard/impl/ShardRowIterator.java | 134 ----
.../impl/ShardedEdgeSerializationImpl.java | 707 ++++++++++++++-----
.../impl/shard/impl/ShardsColumnIterator.java | 128 ++++
.../graph/GraphManagerShardConsistencyIT.java | 126 +++-
.../impl/shard/NodeShardAllocationTest.java | 2 +-
.../shard/count/NodeShardApproximationTest.java | 12 +
.../test/resources/usergrid-SHARD.properties | 23 +
23 files changed, 1682 insertions(+), 523 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
index 050157a..af4e1f9 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
@@ -40,20 +40,15 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
private final RowQuery<?, C> rowQuery;
private final ColumnParser<C, T> parser;
+ private final boolean skipFirst;
private Iterator<Column<C>> sourceIterator;
- public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst) {
+ public ColumnNameIterator( RowQuery<?, C> rowQuery, final ColumnParser<C, T> parser, final boolean skipFirst ) {
this.rowQuery = rowQuery.autoPaginate( true );
this.parser = parser;
-
- advanceIterator();
-
- //if we are to skip the first element, we need to advance the iterator
- if ( skipFirst && sourceIterator.hasNext() ) {
- sourceIterator.next();
- }
+ this.skipFirst = skipFirst;
}
@@ -65,6 +60,19 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
@Override
public boolean hasNext() {
+
+ if ( sourceIterator == null ) {
+ advanceIterator();
+
+
+ //if we are to skip the first element, we need to advance the iterator
+ if ( skipFirst && sourceIterator.hasNext() ) {
+ sourceIterator.next();
+ }
+
+ return sourceIterator.hasNext();
+ }
+
//if we've exhausted this iterator, try to advance to the next set
if ( sourceIterator.hasNext() ) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
index 7758651..356850e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
@@ -42,20 +42,22 @@ public class HystrixCassandra {
/**
* Command group used for realtime user commands
*/
- public static final HystrixCommandGroupKey USER_GROUP = HystrixCommandGroupKey.Factory.asKey( "user" );
+ public static final HystrixCommand.Setter
+ USER_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
+ HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
/**
* Command group for asynchronous operations
*/
- public static final HystrixCommandGroupKey ASYNC_GROUP = HystrixCommandGroupKey.Factory.asKey( "async" );
+ public static final HystrixCommand.Setter
+ ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey( "async" ) ).andThreadPoolPropertiesDefaults(
+ HystrixThreadPoolProperties.Setter().withCoreSize( 50 ) );
/**
* Execute an user operation
*/
public static <R> OperationResult<R> user( final Execution<R> execution) {
-
-
return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index eee82fd..8174a56 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -72,6 +72,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.codahale.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>3.0.2</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 45064c9..d434db7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -45,6 +45,12 @@ public interface GraphFig extends GuicyFig {
*/
public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+ public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+
+ public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+
+
+
/**
* The minimum amount of time than can occur (in millis) between shard allocation. Must be at least 2x the cache timeout.
*
@@ -74,6 +80,10 @@ public interface GraphFig extends GuicyFig {
int getRepairConcurrentSize();
+ @Default( ".10" )
+ @Key( SHARD_REPAIR_CHANCE )
+ double getShardRepairChance();
+
@Default("500000")
@Key(SHARD_SIZE)
@@ -94,6 +104,9 @@ public interface GraphFig extends GuicyFig {
long getShardCacheSize();
+ @Default( "2" )
+ @Key( SHARD_CACHE_REFRESH_WORKERS )
+ int getShardCacheRefreshWorkerCount();
@Default( "10000" )
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
index a4a7a70..d792463 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/CollectionIndexObserver.java
@@ -61,7 +61,7 @@ public class CollectionIndexObserver{
//
// //entity exists, write the edge
// if(entity.getEntity().isPresent()){
-// em.writeEdge( edge ).toBlocking().last();
+// em.writeEdgeFromSource( edge ).toBlocking().last();
// }
// //entity does not exist, it's been removed, mark the edge
// else{
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index d3f29a0..4c1ae79 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -20,12 +20,14 @@
package org.apache.usergrid.persistence.graph.serialization.impl;
+import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
import javax.inject.Inject;
import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.graph.GraphFig;
@@ -37,8 +39,10 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -53,7 +57,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
* Serialization for edges. Delegates partitioning to the sharding strategy.
*/
@Singleton
-public class EdgeSerializationImpl implements EdgeSerialization {
+public class EdgeSerializationImpl implements EdgeSerialization {
protected final Keyspace keyspace;
@@ -62,18 +66,23 @@ public class EdgeSerializationImpl implements EdgeSerialization {
protected final EdgeShardStrategy edgeShardStrategy;
protected final EdgeColumnFamilies edgeColumnFamilies;
protected final ShardedEdgeSerialization shardedEdgeSerialization;
+ protected final TimeService timeService;
@Inject
public EdgeSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
final GraphFig graphFig, final EdgeShardStrategy edgeShardStrategy,
final EdgeColumnFamilies edgeColumnFamilies,
- final ShardedEdgeSerialization shardedEdgeSerialization ) {
+ final ShardedEdgeSerialization shardedEdgeSerialization,
+ final TimeService timeService ) {
- checkNotNull( "keyspace required", keyspace );
- checkNotNull( "cassandraConfig required", cassandraConfig );
- checkNotNull( "consistencyFig required", graphFig );
+ checkNotNull( keyspace, "keyspace required" );
+ checkNotNull( cassandraConfig, "cassandraConfig required" );
+ checkNotNull( edgeShardStrategy, "edgeShardStrategy required" );
+ checkNotNull( edgeColumnFamilies, "edgeColumnFamilies required" );
+ checkNotNull( shardedEdgeSerialization, "shardedEdgeSerialization required" );
+ checkNotNull( timeService, "timeService required" );
this.keyspace = keyspace;
@@ -82,18 +91,186 @@ public class EdgeSerializationImpl implements EdgeSerialization {
this.edgeShardStrategy = edgeShardStrategy;
this.edgeColumnFamilies = edgeColumnFamilies;
this.shardedEdgeSerialization = shardedEdgeSerialization;
+ this.timeService = timeService;
}
@Override
public MutationBatch writeEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
- return shardedEdgeSerialization.writeEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+ final long now = timeService.getCurrentTime();
+ final Id sourceNode = markedEdge.getSourceNode();
+ final Id targetNode = markedEdge.getTargetNode();
+ final String edgeType = markedEdge.getType();
+ final long edgeTimestamp = markedEdge.getTimestamp();
+
+ /**
+ * Source write
+ */
+ final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+ final Collection<Shard> sourceWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+ final MutationBatch batch = shardedEdgeSerialization
+ .writeEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+ timestamp );
+
+
+ /**
+ * Source with target type write
+ */
+ final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+ DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+ final Collection<Shard> sourceTargetTypeWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+ .getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .writeEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+ sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+ /**
+ * Target write
+ *
+ */
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+ final Collection<Shard> targetWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .writeEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+ timestamp ) );
+
+
+ /**
+ * Target with source type write
+ */
+
+ final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+ DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+ final Collection<Shard> targetSourceTypeWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+ .getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .writeEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+ targetSourceTypeEdgeMeta, timestamp ) );
+
+
+ /**
+ * Version write
+ */
+
+ final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+ final Collection<Shard> edgeVersionsShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .writeEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+ edgeVersionsMeta, timestamp ) );
+
+
+ return batch;
}
@Override
public MutationBatch deleteEdge( final ApplicationScope scope, final MarkedEdge markedEdge, final UUID timestamp ) {
- return shardedEdgeSerialization.deleteEdge( edgeColumnFamilies, scope, markedEdge, timestamp );
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+
+ final long now = timeService.getCurrentTime();
+ final Id sourceNode = markedEdge.getSourceNode();
+ final Id targetNode = markedEdge.getTargetNode();
+ final String edgeType = markedEdge.getType();
+ final long edgeTimestamp = markedEdge.getTimestamp();
+
+ /**
+ * Source write
+ */
+ final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNode, edgeType );
+
+ final Collection<Shard> sourceWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceEdgeMeta ).getWriteShards( now );
+
+ final MutationBatch batch = shardedEdgeSerialization
+ .deleteEdgeFromSource( edgeColumnFamilies, scope, markedEdge, sourceWriteShards, sourceEdgeMeta,
+ timestamp );
+
+
+ /**
+ * Source with target type write
+ */
+ final DirectedEdgeMeta sourceTargetTypeEdgeMeta =
+ DirectedEdgeMeta.fromSourceNodeTargetType( sourceNode, edgeType, targetNode.getType() );
+
+ final Collection<Shard> sourceTargetTypeWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, sourceTargetTypeEdgeMeta )
+ .getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .deleteEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, markedEdge, sourceTargetTypeWriteShards,
+ sourceTargetTypeEdgeMeta, timestamp ) );
+
+
+ /**
+ * Target write
+ *
+ */
+
+ final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNode, edgeType );
+
+ final Collection<Shard> targetWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetEdgeMeta ).getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .deleteEdgeToTarget( edgeColumnFamilies, scope, markedEdge, targetWriteShards, targetEdgeMeta,
+ timestamp ) );
+
+
+ /**
+ * Target with source type write
+ */
+
+ final DirectedEdgeMeta targetSourceTypeEdgeMeta =
+ DirectedEdgeMeta.fromTargetNodeSourceType( targetNode, edgeType, sourceNode.getType() );
+
+ final Collection<Shard> targetSourceTypeWriteShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, targetSourceTypeEdgeMeta )
+ .getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .deleteEdgeToTargetWithSourceType( edgeColumnFamilies, scope, markedEdge, targetSourceTypeWriteShards,
+ targetSourceTypeEdgeMeta, timestamp ) );
+
+
+ /**
+ * Version write
+ */
+
+ final DirectedEdgeMeta edgeVersionsMeta = DirectedEdgeMeta.fromEdge( sourceNode, targetNode, edgeType );
+
+ final Collection<Shard> edgeVersionsShards =
+ edgeShardStrategy.getWriteShards( scope, edgeTimestamp, edgeVersionsMeta ).getWriteShards( now );
+
+ batch.mergeShallow( shardedEdgeSerialization
+ .deleteEdgeVersions( edgeColumnFamilies, scope, markedEdge, edgeVersionsShards,
+ edgeVersionsMeta, timestamp ) );
+
+
+ return batch;
}
@@ -111,9 +288,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final Iterator<ShardEntryGroup> readShards =
- edgeShardStrategy.getReadShards(scope, maxTimestamp, versionMetaData );
+ edgeShardStrategy.getReadShards( scope, maxTimestamp, versionMetaData );
+
+
+ //now create a result iterator with our iterator of read shards
- return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+ return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+ return shardedEdgeSerialization.getEdgeVersions( edgeColumnFamilies, scope, search, readShards );
+ }
+ };
}
@@ -129,13 +314,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final long maxTimestamp = edgeType.getMaxTimestamp();
- final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, type );
- final Iterator<ShardEntryGroup> readShards =
- edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+ final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+ return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+ return shardedEdgeSerialization.getEdgesFromSource( edgeColumnFamilies, scope, edgeType, readShards );
+ }
+ };
}
@@ -151,14 +340,19 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final String targetType = edgeType.getIdType();
final long maxTimestamp = edgeType.getMaxTimestamp();
- final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, type, targetType );
- final Iterator<ShardEntryGroup> readShards =
- edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+ final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return shardedEdgeSerialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+ return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+ return shardedEdgeSerialization
+ .getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, edgeType, readShards );
+ }
+ };
}
@@ -173,15 +367,17 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final long maxTimestamp = edgeType.getMaxTimestamp();
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
- final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromTargetNode( targetId, type );
+ final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- final Iterator<ShardEntryGroup> readShards =
- edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
-
-
- return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+ return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+ return shardedEdgeSerialization.getEdgesToTarget( edgeColumnFamilies, scope, edgeType, readShards );
+ }
+ };
}
@@ -198,14 +394,18 @@ public class EdgeSerializationImpl implements EdgeSerialization {
final long maxTimestamp = edgeType.getMaxTimestamp();
- final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNodeTargetType( targetId, type, sourceType );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, type, sourceType );
- final Iterator<ShardEntryGroup> readShards =
- edgeShardStrategy.getReadShards(scope, maxTimestamp, sourceMeta );
+ final Iterator<ShardEntryGroup> readShards = edgeShardStrategy.getReadShards( scope, maxTimestamp, directedEdgeMeta );
- return shardedEdgeSerialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+ return new ShardGroupColumnIterator<MarkedEdge>( readShards ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator( final Collection<Shard> readShards ) {
+ return shardedEdgeSerialization
+ .getEdgesToTargetBySourceType( edgeColumnFamilies, scope, edgeType, readShards );
+ }
+ };
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 89e46d9..92f2548 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -87,6 +88,36 @@ public abstract class DirectedEdgeMeta {
public NodeType getNodeType() {
return nodeType;
}
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( !( o instanceof NodeMeta ) ) {
+ return false;
+ }
+
+ final NodeMeta nodeMeta = ( NodeMeta ) o;
+
+ if ( !id.equals( nodeMeta.id ) ) {
+ return false;
+ }
+ if ( nodeType != nodeMeta.nodeType ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = id.hashCode();
+ result = 31 * result + nodeType.hashCode();
+ return result;
+ }
}
@@ -125,9 +156,11 @@ public abstract class DirectedEdgeMeta {
*/
public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard> shards,
final long maxValue );
+
+
/**
* Get the type of this directed edge
*/
@@ -192,7 +225,7 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard> shards,
final long maxValue ) {
final Id sourceId = nodes[0].id;
@@ -200,8 +233,7 @@ public abstract class DirectedEdgeMeta {
final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
- return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search,
- Collections.singleton( group ).iterator() );
+ return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards );
}
@@ -233,9 +265,9 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard>shards,
final long maxValue ) {
-
+//
final Id sourceId = nodes[0].id;
final String edgeType = types[0];
final String targetType = types[1];
@@ -243,8 +275,8 @@ public abstract class DirectedEdgeMeta {
final SearchByIdType search =
new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
- return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search,
- Collections.singleton( group ).iterator() );
+ return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards);
+
}
@@ -272,7 +304,7 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard> shards,
final long maxValue ) {
@@ -281,8 +313,7 @@ public abstract class DirectedEdgeMeta {
final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
- return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search,
- Collections.singleton( group ).iterator() );
+ return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards );
}
@@ -311,7 +342,7 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard> shards,
final long maxValue ) {
final Id targetId = nodes[0].id;
@@ -322,8 +353,7 @@ public abstract class DirectedEdgeMeta {
final SearchByIdType search =
new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
- return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search,
- Collections.singleton( group ).iterator() );
+ return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards);
}
@@ -355,7 +385,7 @@ public abstract class DirectedEdgeMeta {
@Override
public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
final EdgeColumnFamilies edgeColumnFamilies,
- final ApplicationScope scope, final ShardEntryGroup group,
+ final ApplicationScope scope, final Collection<Shard> shards,
final long maxValue ) {
final Id sourceId = nodes[0].id;
@@ -365,8 +395,8 @@ public abstract class DirectedEdgeMeta {
final SimpleSearchByEdge search =
new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
- return serialization.getEdgeVersions( edgeColumnFamilies, scope, search,
- Collections.singleton( group ).iterator() );
+ return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards);
+
}
@@ -378,6 +408,7 @@ public abstract class DirectedEdgeMeta {
}
+
/**
* Create a directed edge from the stored meta data
* @param metaType The meta type stored
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 59ea9fb..70569fd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -256,13 +256,13 @@ public class ShardEntryGroup {
}
- /**
- * Helper method to create a shard entry group with a single shard
- */
- public static ShardEntryGroup singletonGroup( final Shard shard, final long delta ) {
- ShardEntryGroup group = new ShardEntryGroup( delta );
- group.addShard( shard );
-
- return group;
+ @Override
+ public String toString() {
+ return "ShardEntryGroup{" +
+ "shards=" + shards +
+ ", delta=" + delta +
+ ", maxCreatedTime=" + maxCreatedTime +
+ ", compactionTarget=" + compactionTarget +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index 13c5596..bf4d3c9 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,6 +22,10 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import java.util.Set;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
import rx.Observable;
@@ -36,8 +40,39 @@ public interface ShardGroupCompaction {
/**
* Execute the compaction task. Will return the number of edges that have
* @param group The shard entry group to compact
- * @return The number of edges that are now compacted into the target shard
+ * @return The shards that were compacted
+ */
+ public Set<Shard> compact(final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group);
+
+ /**
+ * Possibly audit the shard entry group. This is asynchronous and returns immediately
+ * @param group
+ * @return
*/
- public Observable<Integer> compact(ShardEntryGroup group);
+ public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group );
+
+
+ public enum AuditResult{
+ /**
+ * We didn't check this shard
+ */
+ NOT_CHECKED,
+ /**
+ * This shard was checked, but nothing was allocated
+ */
+ CHECKED_NO_OP,
+
+ /**
+ * We checked and created a new shard
+ */
+ CHECKED_CREATED,
+
+ /**
+ * The shard group is already compacting
+ */
+ COMPACTING
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
index d1ad18d..51fdf0c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardedEdgeSerialization.java
@@ -19,7 +19,9 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.Set;
import java.util.UUID;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -44,19 +46,84 @@ public interface ShardedEdgeSerialization {
* @param markedEdge The edge to write
* @param timestamp The timestamp to use
*/
- MutationBatch writeEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
- UUID timestamp );
+ MutationBatch writeEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+ /**
+ * Write the edge from source->target
+ */
+ MutationBatch writeEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+ /**
+ * Write the edge from target to source
+ */
+ MutationBatch writeEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+ /**
+ * Write the edge from target to source with source type
+ */
+ MutationBatch writeEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+ /**
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp to use
+ */
+ MutationBatch writeEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
/**
- * EdgeWrite both the source -->target edge and the target<--- source edge into the mutation
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
*
* @param columnFamilies The column families to use
* @param scope The org scope of the graph
* @param markedEdge The edge to write
- * @param timestamp The timestamp of the uuid
+ * @param timestamp The timestamp to use
+ */
+ MutationBatch deleteEdgeFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+ /**
+ * Write the edge from source->target
+ */
+ MutationBatch deleteEdgeFromSourceWithTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+ /**
+ * Write the edge from target to source
+ */
+ MutationBatch deleteEdgeToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+
+ /**
+ * Write the edge from target to source with source type
*/
- MutationBatch deleteEdge( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
- UUID timestamp );
+ MutationBatch deleteEdgeToTargetWithSourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
+ MarkedEdge markedEdge, Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
+
+ /**
+ * EdgeWrite both the source--->Target edge and the target <----- source edge into the mutation
+ *
+ * @param columnFamilies The column families to use
+ * @param scope The org scope of the graph
+ * @param markedEdge The edge to write
+ * @param timestamp The timestamp to use
+ */
+ MutationBatch deleteEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope, MarkedEdge markedEdge,
+ Collection<Shard> shards, DirectedEdgeMeta sourceEdgeMeta, UUID timestamp );
/**
@@ -65,10 +132,10 @@ public interface ShardedEdgeSerialization {
* @param columnFamilies The column families to use
* @param scope The application scope
* @param search The search criteria
- * @param shards The shards to iterate when searching
+ * @param shards The shards multiget when reading
*/
Iterator<MarkedEdge> getEdgeVersions( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
- SearchByEdge search, Iterator<ShardEntryGroup> shards );
+ SearchByEdge search, Collection<Shard> shards );
/**
* Get an iterator of all edges by edge type originating from source node
@@ -79,7 +146,7 @@ public interface ShardedEdgeSerialization {
* @param shards The shards to iterate when searching
*/
Iterator<MarkedEdge> getEdgesFromSource( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
- SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+ SearchByEdgeType search, Collection<Shard> shards );
/**
@@ -91,7 +158,7 @@ public interface ShardedEdgeSerialization {
* @param shards The shards to iterate when searching
*/
Iterator<MarkedEdge> getEdgesFromSourceByTargetType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
- SearchByIdType search, Iterator<ShardEntryGroup> shards );
+ SearchByIdType search, Collection<Shard> shards );
/**
* Get an iterator of all edges by edge type pointing to the target node. Returns all versions
@@ -102,7 +169,7 @@ public interface ShardedEdgeSerialization {
* @param shards The shards to iterate when searching
*/
Iterator<MarkedEdge> getEdgesToTarget( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
- SearchByEdgeType search, Iterator<ShardEntryGroup> shards );
+ SearchByEdgeType search, Collection<Shard> shards );
/**
@@ -115,5 +182,5 @@ public interface ShardedEdgeSerialization {
* @param shards The shards to iterate when searching
*/
Iterator<MarkedEdge> getEdgesToTargetBySourceType( EdgeColumnFamilies columnFamilies, ApplicationScope scope,
- SearchByIdType search, Iterator<ShardEntryGroup> shards );
+ SearchByIdType search, Collection<Shard> shards );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index d3d92ea..ed3daaf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -127,7 +127,7 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
}
//column not found, return 0
catch ( RuntimeException re ) {
- if(re.getCause() instanceof NotFoundException) {
+ if(re.getCause().getCause() instanceof NotFoundException) {
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index b7cc25d..27862d0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -6,6 +6,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
@@ -29,19 +30,18 @@ import com.netflix.astyanax.util.RangeBuilder;
* @param <C> The column type
* @param <T> The parsed return type
*/
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T>,
- Iterator<List<ScopedRowKey<ApplicationScope, R>>> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T> {
protected final Optional<Edge> last;
protected final long maxTimestamp;
protected final ApplicationScope scope;
- protected final Iterator<ShardEntryGroup> shards;
+ protected final Collection<Shard> shards;
protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
- Preconditions.checkArgument(shards.hasNext(), "Cannot search with no possible shards");
+ Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards");
this.scope = scope;
this.maxTimestamp = maxTimestamp;
@@ -50,19 +50,12 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
}
- @Override
- public boolean hasNext() {
- return shards.hasNext();
- }
+ public List<ScopedRowKey<ApplicationScope, R>> getRowKeys() {
- @Override
- public List<ScopedRowKey<ApplicationScope, R>> next() {
- Collection<Shard> readShards = shards.next().getReadShards();
+ List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(shards.size());
- List<ScopedRowKey<ApplicationScope, R>> rowKeys = new ArrayList<>(readShards.size());
-
- for(Shard shard : readShards){
+ for(Shard shard : shards){
final ScopedRowKey<ApplicationScope, R> rowKey = ScopedRowKey
.fromKey( scope, generateRowKey(shard.getShardIndex() ) );
@@ -75,11 +68,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
}
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-
/**
* Set the range on a search
@@ -93,10 +81,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
builder.setStart( sourceEdge, getSerializer() );
}
- else {
-
- }
}
@@ -113,6 +98,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
}
+
/**
* Get the column's serializer
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6c79814..b919ad7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -42,7 +42,9 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.fasterxml.uuid.impl.UUIDUtil;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -167,7 +169,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
final Iterator<MarkedEdge> edges = directedEdgeMeta
- .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup, Long.MAX_VALUE );
+ .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), Long.MAX_VALUE );
if ( !edges.hasNext() ) {
@@ -218,15 +220,18 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
*/
private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
- /**
- * the max time in microseconds we can allow
- */
- final long maxTime = ( timeService.getCurrentTime() + graphFig.getShardCacheTimeout() ) * 10000;
+ //The timeout is in milliseconds. Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
+ final long uuidDelta = graphFig.getShardCacheTimeout() * 10000;
+
+ final long timeNow = UUIDGenerator.newTimeUUID().timestamp();
for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
+
final long uuidTime = node.getId().getUuid().timestamp();
- if ( uuidTime < maxTime ) {
+ final long uuidTimeDelta = uuidTime + uuidDelta;
+
+ if ( uuidTimeDelta < timeNow ) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d3dd86e..d444eec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -22,10 +22,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
import java.beans.PropertyChangeEvent;
import java.beans.PropertyChangeListener;
import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -49,11 +55,19 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
+import rx.Observable;
+import rx.Subscriber;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
/**
* Simple implementation of the shard. Uses a local Guava shard with a timeout. If a value is not present in the
@@ -69,10 +83,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
*/
private static final int MAX_WEIGHT_PER_ELEMENT = 10000;
+
private final NodeShardAllocation nodeShardAllocation;
private final GraphFig graphFig;
private final TimeService timeservice;
+
+
+ private ListeningScheduledExecutorService refreshExecutors;
private LoadingCache<CacheKey, CacheEntry> graphs;
@@ -93,6 +111,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
this.graphFig = graphFig;
this.timeservice = timeservice;
+
/**
* Add our listener to reconstruct the shard
*/
@@ -102,7 +121,9 @@ public class NodeShardCacheImpl implements NodeShardCache {
final String propertyName = evt.getPropertyName();
if ( propertyName.equals( GraphFig.SHARD_CACHE_SIZE ) || propertyName
- .equals( GraphFig.SHARD_CACHE_TIMEOUT ) ) {
+ .equals( GraphFig.SHARD_CACHE_TIMEOUT ) || propertyName
+ .equals( GraphFig.SHARD_CACHE_REFRESH_WORKERS ) ) {
+
updateCache();
}
@@ -120,7 +141,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
public ShardEntryGroup getWriteShardGroup( final ApplicationScope scope, final long timestamp,
final DirectedEdgeMeta directedEdgeMeta ) {
- ValidationUtils.validateApplicationScope(scope);
+ ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -147,7 +168,7 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
public Iterator<ShardEntryGroup> getReadShardGroup( final ApplicationScope scope, final long maxTimestamp,
final DirectedEdgeMeta directedEdgeMeta ) {
- ValidationUtils.validateApplicationScope(scope);
+ ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateDirectedEdgeMeta( directedEdgeMeta );
final CacheKey key = new CacheKey( scope, directedEdgeMeta );
@@ -175,11 +196,25 @@ public class NodeShardCacheImpl implements NodeShardCache {
* doesn't have to be precise. The algorithm accounts for stale data.
*/
private void updateCache() {
+ if ( this.refreshExecutors != null ) {
+ this.refreshExecutors.shutdown();
+ }
+
+ this.refreshExecutors = MoreExecutors
+ .listeningDecorator( Executors.newScheduledThreadPool( graphFig.getShardCacheRefreshWorkerCount() ) );
+
+
+ this.graphs = CacheBuilder.newBuilder()
+
+ //we want to asynchronously load new values for existing ones, that way we wont' have to
+ //wait for a trip to cassandra
+ .refreshAfterWrite( graphFig.getShardCacheTimeout(), TimeUnit.MILLISECONDS )
+
+ //set our weight function, since not all shards are equal
+ .maximumWeight(MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() ).weigher( new ShardWeigher() )
- this.graphs = CacheBuilder.newBuilder().expireAfterWrite( graphFig.getShardCacheSize(), TimeUnit.MILLISECONDS )
- .removalListener( new ShardRemovalListener() )
- .maximumWeight( MAX_WEIGHT_PER_ELEMENT * graphFig.getShardCacheSize() )
- .weigher( new ShardWeigher() ).build( new ShardCacheLoader() );
+ //set our shard loader
+ .build( new ShardCacheLoader() );
}
@@ -296,77 +331,43 @@ public class NodeShardCacheImpl implements NodeShardCache {
@Override
- public CacheEntry load( final CacheKey key ) throws Exception {
+ public CacheEntry load( final CacheKey key ) {
final Iterator<ShardEntryGroup> edges =
nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
- return new CacheEntry( edges );
+ final CacheEntry cacheEntry = new CacheEntry( edges );
+
+ return cacheEntry;
}
- }
- /**
- * Calculates the weight of the entry by geting the size of the cache
- */
- final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
-
@Override
- public int weigh( final CacheKey key, final CacheEntry value ) {
- return value.getCacheSize();
+ public ListenableFuture<CacheEntry> reload( final CacheKey key, final CacheEntry oldValue ) throws Exception {
+ ListenableFutureTask<CacheEntry> task = ListenableFutureTask.create( new Callable<CacheEntry>() {
+ public CacheEntry call() {
+ return load( key );
+ }
+ } );
+ //load via the refresh executor
+ refreshExecutors.execute( task );
+ return task;
}
+
+ //TODO, use RX for sliding window buffering and duplicate removal
}
+
/**
- * On removal from the cache, we want to audit the maximum shard. If it needs to allocate a new shard, we want to
- * do so. IF there's a compaction pending, we want to run the compaction task
+ * Calculates the weight of the entry by geting the size of the cache
*/
- final class ShardRemovalListener implements RemovalListener<CacheKey, CacheEntry> {
+ final class ShardWeigher implements Weigher<CacheKey, CacheEntry> {
@Override
- public void onRemoval( final RemovalNotification<CacheKey, CacheEntry> notification ) {
-
-
- final CacheKey key = notification.getKey();
- final CacheEntry entry = notification.getValue();
-
-
- Iterator<ShardEntryGroup> groups = entry.getShards( Long.MAX_VALUE );
-
-
- /**
- * Start at our max, then
- */
-
- //audit all our groups
- while ( groups.hasNext() ) {
- ShardEntryGroup group = groups.next();
-
- /**
- * We don't have a compaction pending. Run an audit on the shards
- */
- if ( !group.isCompactionPending() ) {
- LOG.debug( "No compaction pending, checking max shard on expiration" );
- /**
- * Check if we should allocate, we may want to
- */
-
-
- nodeShardAllocation.auditShard( key.scope, group, key.directedEdgeMeta );
- continue;
- }
- /**
- * Do the compaction
- */
- if ( group.shouldCompact( timeservice.getCurrentTime() ) ) {
- //launch the compaction
- }
-
- //no op, there's nothing we need to do to this shard
-
- }
+ public int weigh( final CacheKey key, final CacheEntry value ) {
+ return value.getCacheSize();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
new file mode 100644
index 0000000..8779b96
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupColumnIterator.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ * Iterator to keep iterating over multiple shard groups to stream results
+ *
+ * @param <T> The parsed return type
+ */
+public abstract class ShardGroupColumnIterator<T> implements Iterator<T> {
+
+
+ private final Iterator<ShardEntryGroup> entryGroupIterator;
+ private Iterator<T> elements;
+
+
+ public ShardGroupColumnIterator( final Iterator<ShardEntryGroup> entryGroupIterator ){
+ this.entryGroupIterator = entryGroupIterator;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+
+
+ if(elements == null){
+ return advance();
+ }
+
+ if(elements.hasNext()){
+ return true;
+ }
+
+ //we've exhausted our shard groups and we don't have a next, we can't continue
+ if(!entryGroupIterator.hasNext()){
+ return false;
+ }
+
+
+ return advance();
+ }
+
+
+ @Override
+ public T next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+ }
+
+ return elements.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Get an iterator for the shard entry group
+ * @param readShards the read shards to use
+ * @return
+ */
+ protected abstract Iterator<T> getIterator(Collection<Shard> readShards);
+
+
+ public boolean advance(){
+
+ while(entryGroupIterator.hasNext()){
+
+ final ShardEntryGroup group = entryGroupIterator.next();
+
+ elements = getIterator( group.getReadShards() );
+
+ /**
+ * We're done, we have some columns to return
+ */
+ if(elements.hasNext()){
+ return true;
+ }
+
+ }
+
+
+ return false;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 48336cd..c566d43 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -22,18 +22,45 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+import java.nio.charset.Charset;
+import java.util.BitSet;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
import com.google.common.base.Preconditions;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import rx.Observable;
+import rx.functions.Action0;
+import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -43,20 +70,56 @@ import rx.Observable;
public class ShardGroupCompactionImpl implements ShardGroupCompaction {
+ private static final Charset CHARSET = Charset.forName( "UTF-8" );
+
+ private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
+
private final TimeService timeService;
+ private final GraphFig graphFig;
+ private final NodeShardAllocation nodeShardAllocation;
+ private final ShardedEdgeSerialization shardedEdgeSerialization;
+ private final EdgeColumnFamilies edgeColumnFamilies;
+ private final EdgeShardSerialization edgeShardSerialization;
+
+
+ private final Random random;
+ private final ShardCompactionTaskTracker shardCompactionTaskTracker;
@Inject
- public ShardGroupCompactionImpl( final TimeService timeService ) {this.timeService = timeService;}
+ public ShardGroupCompactionImpl( final TimeService timeService, final GraphFig graphFig,
+ final NodeShardAllocation nodeShardAllocation,
+ final ShardedEdgeSerialization shardedEdgeSerialization,
+ final EdgeColumnFamilies edgeColumnFamilies,
+ final EdgeShardSerialization edgeShardSerialization ) {
+
+ this.timeService = timeService;
+ this.graphFig = graphFig;
+ this.nodeShardAllocation = nodeShardAllocation;
+ this.shardedEdgeSerialization = shardedEdgeSerialization;
+ this.edgeColumnFamilies = edgeColumnFamilies;
+ this.edgeShardSerialization = edgeShardSerialization;
+
+ this.random = new Random();
+ this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
+ }
@Override
- public Observable<Integer> compact( final ShardEntryGroup group ) {
+ public Set<Shard> compact( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
final long startTime = timeService.getCurrentTime();
- Preconditions.checkNotNull(group, "group cannot be null");
+ Preconditions.checkNotNull( group, "group cannot be null" );
Preconditions.checkArgument( group.isCompactionPending(), "Compaction is pending" );
- Preconditions.checkArgument( group.shouldCompact(startTime ), "Compaction can now be run" );
+ Preconditions.checkArgument( group.shouldCompact( startTime ), "Compaction can now be run" );
+
+ /**
+ * It's already compacting, don't do anything
+ */
+ if (!shardCompactionTaskTracker.shouldStartCompaction( scope, edgeMeta, group )){
+ return Collections.emptySet();
+ }
final Shard targetShard = group.getCompactionTarget();
@@ -64,12 +127,181 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final Collection<Shard> sourceShards = group.getReadShards();
- //now get iterators for each of the source shards, and then copy them to the compaction target shard
+
+ Observable.create( new ObservableIterator<MarkedEdge>( "Shard_Repair" ) {
+ @Override
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, group.getReadShards(), Long.MAX_VALUE );
+ }
+ } ).buffer( graphFig.getScanPageSize() ).doOnNext( new Action1<List<MarkedEdge>>() {
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+
+ }
+ }).doOnNext( new Action1<List<MarkedEdge>>() {
+ @Override
+ public void call( final List<MarkedEdge> markedEdges ) {
+
+ }
+ } );
+
return null;
+ }
+
+
+ @Override
+ public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+
+
+ final double repairChance = random.nextDouble();
+
+ //don't repair
+ if ( repairChance > graphFig.getShardRepairChance() ) {
+ return AuditResult.NOT_CHECKED;
+ }
+
+
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
+
+ /**
+ * Check if we should allocate, we may want to
+ */
+
+
+ final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+
+
+ if ( !created ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+
+
+ return AuditResult.CHECKED_CREATED;
+ }
+
+ //check our taskmanager
+
+
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+ compact( scope, edgeMeta, group );
+ return AuditResult.COMPACTING;
+ }
+
+ //no op, there's nothing we need to do to this shard
+ return AuditResult.NOT_CHECKED;
+ }
+
+
+ private static final class ShardCompactionTaskTracker {
+ private BitSet runningTasks = new BitSet();
+
+
+ /**
+ * Sets this data into our scope to signal it's running to stop other threads from attempting to run
+ * @param scope
+ * @param edgeMeta
+ * @param group
+ * @return
+ */
+ public boolean shouldStartCompaction( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ ShardEntryGroup group ) {
+ final int hash = doHash( scope, edgeMeta, group ).asInt();
+
+ if(runningTasks.get( hash )){
+ return false;
+ }
+
+ runningTasks.set( hash );
+
+ return true;
+ }
+
+
+ /**
+ * Mark this entry group as complete
+ * @param scope
+ * @param edgeMeta
+ * @param group
+ */
+ public void complete( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ ShardEntryGroup group ) {
+ final int hash = doHash( scope, edgeMeta, group ).asInt();
+ runningTasks.clear( hash );
+ }
+
+
+ /**
+ * Hash our data into a consistent long
+ * @param scope
+ * @param directedEdgeMeta
+ * @param shardEntryGroup
+ * @return
+ */
+ private HashCode doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
+ final ShardEntryGroup shardEntryGroup ) {
+
+ final Hasher hasher = MURMUR_128.newHasher();
+
+
+ addToHash( hasher, scope.getApplication() );
+
+ /**
+ * add our edge meta data
+ */
+ for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
+ addToHash( hasher, nodeMeta.getId() );
+ hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
+ }
+
+
+ /**
+ * Add our edge type
+ */
+ for ( String type : directedEdgeMeta.getTypes() ) {
+ hasher.putString( type, CHARSET );
+ }
+
+ //add our compaction target to the hash
+ final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
+
+ hasher.putLong( compactionTarget.getShardIndex() );
+
+
+ return hasher.hash();
+ }
+
+
+ private void addToHash( final Hasher hasher, final Id id ) {
+
+ final UUID nodeUuid = id.getUuid();
+ final String nodeType = id.getType();
+
+ hasher.putLong( nodeUuid.getMostSignificantBits() ).putLong( nodeUuid.getLeastSignificantBits() )
+ .putString( nodeType, CHARSET );
+ }
+ }
+
+ private enum StartResult{
+ /**
+ * Returned if the compaction was started
+ */
+
+ STARTED,
+ /**
+ * Returned if we are running the compaction
+ */
+ RUNNING;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
deleted file mode 100644
index 0bbb011..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardRowIterator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.query.RowQuery;
-import com.netflix.astyanax.util.RangeBuilder;
-
-
-/**
- * Internal iterator to iterate over multiple row keys
- *
- * @param <R> The row type
- * @param <C> The column type
- * @param <T> The parsed return type
- */
-public class ShardRowIterator<R, C, T> implements Iterator<T> {
-
- private final EdgeSearcher<R, C, T> searcher;
-
- private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
-
- private Iterator<T> currentColumnIterator;
-
- private final Keyspace keyspace;
-
- private final int pageSize;
-
- private final ConsistencyLevel consistencyLevel;
-
-
- public ShardRowIterator( final EdgeSearcher<R, C, T> searcher,
- final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
- final ConsistencyLevel consistencyLevel, final int pageSize ) {
- this.searcher = searcher;
- this.cf = cf;
- this.keyspace = keyspace;
- this.pageSize = pageSize;
- this.consistencyLevel = consistencyLevel;
- }
-
-
- @Override
- public boolean hasNext() {
- //we have more columns to return
- if ( currentColumnIterator != null && currentColumnIterator.hasNext() ) {
- return true;
- }
-
- /**
- * We have another row key, advance to it and re-check
- */
- if ( searcher.hasNext() ) {
- advanceRow();
- return hasNext();
- }
-
- //we have no more columns, and no more row keys, we're done
- return false;
- }
-
-
- @Override
- public T next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "There are no more rows or columns left to advance" );
- }
-
- return currentColumnIterator.next();
- }
-
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException( "Remove is unsupported" );
- }
-
-
- /**
- * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
- */
- private void advanceRow() {
-
- /**
- * If the edge is present, we need to being seeking from this
- */
-
- final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
-
-
- //set the range into the search
- searcher.setRange( rangeBuilder );
-
- /**
- * Get our list of slices
- */
- final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.next();
-
-
- final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
-
- for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
-
-
-
- final RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
- keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
- .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
-
-
- final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
-
- columnNameIterators.add( columnNameIterator );
-
- }
-
-
-
- currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
-
-
- }
-}
[2/3] Finished refactor of low level serialization to remove
impedance mismatch of apis between read and write.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 6f7c5ce..030e4a7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -21,6 +21,7 @@
package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+import java.util.Collection;
import java.util.Iterator;
import java.util.UUID;
@@ -47,7 +48,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKeyType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -62,6 +62,10 @@ import com.netflix.astyanax.util.RangeBuilder;
import static com.google.common.base.Preconditions.checkNotNull;
+/**
+ * TODO: Rafactor this to use shards only, no shard groups, just collections of shards. The parent caller can aggregate
+ * the results of multiple groups together, this has an impedance mismatch in the API layer.
+ */
@Singleton
public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@@ -94,224 +98,254 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
- public MutationBatch writeEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
- final MarkedEdge markedEdge, final UUID timestamp ) {
+ public MutationBatch writeEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateEdge( markedEdge );
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
+ return new SourceWriteOp( columnFamilies, markedEdge ) {
- final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
- .withTimestamp( timestamp.timestamp() );
-
- final boolean isDeleted = markedEdge.isDeleted();
-
-
- doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
@Override
- public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
- final RowKey rowKey, final DirectedEdge edge ) {
- batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
- }
-
-
- @Override
- public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
- if(!isDeleted) {
- writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
- }
- }
-
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
- @Override
- public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
- final EdgeRowKey rowKey, final long timestamp ) {
- batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( timestamp, isDeleted );
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
}
- } );
-
-
- return batch;
+ }.createBatch( scope, shards, timestamp );
}
@Override
- public MutationBatch deleteEdge( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
- final MarkedEdge markedEdge, final UUID timestamp ) {
+ public MutationBatch writeEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateEdge( markedEdge );
ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- final MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
- .withTimestamp( timestamp.timestamp() );
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
-
- doWrite( columnFamilies, scope, markedEdge, new RowOp<RowKey>() {
@Override
- public void writeEdge( final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
- final RowKey rowKey, final DirectedEdge edge ) {
- batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
- }
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
- @Override
- public void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
- writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
- }
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
- @Override
- public void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
- final EdgeRowKey rowKey, final long timestamp ) {
- batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( timestamp );
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
}
- } );
-
-
- return batch;
+ }.createBatch( scope, shards, timestamp );
}
- /**
- * EdgeWrite the edges internally
- *
- * @param scope The scope to encapsulate
- * @param edge The edge to write
- * @param op The row operation to invoke
- */
- private void doWrite( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope, final MarkedEdge edge,
- final RowOp op ) {
+ @Override
+ public MutationBatch writeEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta targetEdgeMeta, final UUID timestamp ) {
ValidationUtils.validateApplicationScope( scope );
- GraphValidation.validateEdge( edge );
-
- final Id sourceNodeId = edge.getSourceNode();
- final String sourceNodeType = sourceNodeId.getType();
- final Id targetNodeId = edge.getTargetNode();
- final String targetNodeType = targetNodeId.getType();
- final long timestamp = edge.getTimestamp();
- final String type = edge.getType();
-
-
- /**
- * Key in the serializers based on the edge
- */
-
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- /**
- * write edges from source->target
- */
+ return new TargetWriteOp( columnFamilies, markedEdge ) {
- final long time = timeService.getCurrentTime();
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
- final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).putColumn( edge, isDeleted );
- final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceNodeId, type );
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- final ShardEntryGroup sourceRowKeyShard =
- writeEdgeShardStrategy.getWriteShards( scope, timestamp, sourceEdgeMeta );
+ @Override
+ public MutationBatch writeEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceCf =
- columnFamilies.getSourceNodeCfName();
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
- for ( Shard shard : sourceRowKeyShard.getWriteShards(time) ) {
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
- final long shardId = shard.getShardIndex();
- final RowKey sourceRowKey = new RowKey( sourceNodeId, type, shardId );
- op.writeEdge( sourceCf, sourceRowKey, sourceEdge );
- op.countEdge( shard, sourceEdgeMeta );
- }
+ batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+ .putColumn( edge, isDeleted );
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- final DirectedEdgeMeta sourceEdgeTargetTypeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceNodeId,
- type, targetNodeType );
+ @Override
+ public MutationBatch writeEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
- final ShardEntryGroup sourceWithTypeRowKeyShard = writeEdgeShardStrategy
- .getWriteShards( scope, timestamp, sourceEdgeTargetTypeMeta );
+ ValidationUtils.validateApplicationScope( scope );
+ GraphValidation.validateEdge( markedEdge );
+ ValidationUtils.verifyTimeUuid( timestamp, "timestamp" );
- final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetCf =
- columnFamilies.getSourceNodeTargetTypeCfName();
- for ( Shard shard : sourceWithTypeRowKeyShard.getWriteShards(time) ) {
+ return new EdgeVersions( columnFamilies, markedEdge ) {
- final long shardId = shard.getShardIndex();
- final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId, shardId );
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+ final boolean isDeleted ) {
+ batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
+ .putColumn( column, isDeleted );
- op.writeEdge( targetCf, sourceRowKeyType, sourceEdge );
- op.countEdge( shard, sourceEdgeTargetTypeMeta );
- }
+ if ( !isDeleted ) {
+ writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
+ }
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- /**
- * write edges from target<-source
- */
- final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
+ @Override
+ public MutationBatch deleteEdgeFromSource( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
- final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetNodeId, type );
+ return new SourceWriteOp( columnFamilies, markedEdge ) {
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- final ShardEntryGroup targetRowKeyShard =
- writeEdgeShardStrategy.getWriteShards( scope, timestamp, targetEdgeMeta );
- final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> sourceByTargetCf =
- columnFamilies.getTargetNodeCfName();
+ @Override
+ public MutationBatch deleteEdgeFromSourceWithTargetType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
+ return new SourceTargetTypeWriteOp( columnFamilies, markedEdge ) {
- for ( Shard shard : targetRowKeyShard.getWriteShards(time) ) {
- final long shardId = shard.getShardIndex();
- final RowKey targetRowKey = new RowKey( targetNodeId, type, shardId );
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
- op.writeEdge( sourceByTargetCf, targetRowKey, targetEdge );
- op.countEdge( shard, targetEdgeMeta );
- }
+ batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+ .deleteColumn( edge );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- final DirectedEdgeMeta targetEdgeSourceTypeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( targetNodeId, type, sourceNodeType );
+ @Override
+ public MutationBatch deleteEdgeToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+ return new TargetWriteOp( columnFamilies, markedEdge ) {
- final ShardEntryGroup targetWithTypeRowKeyShard = writeEdgeShardStrategy
- .getWriteShards( scope, timestamp, targetEdgeSourceTypeMeta );
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKey rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
- final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> targetBySourceCf =
- columnFamilies.getTargetNodeSourceTypeCfName();
+ batch.withRow( columnFamily, ScopedRowKey.fromKey( scope, rowKey ) ).deleteColumn( edge );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- for ( Shard shard : targetWithTypeRowKeyShard.getWriteShards(time) ) {
+ @Override
+ public MutationBatch deleteEdgeToTargetWithSourceType( final EdgeColumnFamilies columnFamilies,
+ final ApplicationScope scope, final MarkedEdge markedEdge,
+ final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta,
+ final UUID timestamp ) {
- final long shardId = shard.getShardIndex();
+ return new TargetSourceTypeWriteOp( columnFamilies, markedEdge ) {
- final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId, shardId );
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily,
+ final ApplicationScope scope, final RowKeyType rowKey, final DirectedEdge edge,
+ final Shard shard, final boolean isDeleted ) {
+ batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope, rowKey ) )
+ .deleteColumn( edge );
+ }
+ }.createBatch( scope, shards, timestamp );
+ }
- op.writeEdge( targetBySourceCf, targetRowKeyType, targetEdge );
- op.countEdge( shard, targetEdgeSourceTypeMeta );
- }
- /**
- * Always a 0l shard, we're hard limiting 2b timestamps for the same edge
- */
- final EdgeRowKey edgeRowKey = new EdgeRowKey( sourceNodeId, type, targetNodeId, 0l );
+ @Override
+ public MutationBatch deleteEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
+ final MarkedEdge markedEdge, final Collection<Shard> shards,
+ final DirectedEdgeMeta directedEdgeMeta, final UUID timestamp ) {
+ return new EdgeVersions( columnFamilies, markedEdge ) {
- /**
- * Write this in the timestamp log for this edge of source->target
- */
- op.writeVersion( columnFamilies.getGraphEdgeVersions(), edgeRowKey, timestamp );
+ @Override
+ void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
+ final ApplicationScope scope, final EdgeRowKey rowKey, final Long column, final Shard shard,
+ final boolean isDeleted ) {
+ batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope, rowKey ) )
+ .deleteColumn( column );
+ }
+ }.createBatch( scope, shards, timestamp );
}
@Override
public Iterator<MarkedEdge> getEdgeVersions( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
- final SearchByEdge search, final Iterator<ShardEntryGroup> shards ) {
+ final SearchByEdge search, final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdge( search );
@@ -370,7 +404,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
}
};
- return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
graphFig.getScanPageSize() );
}
@@ -378,7 +412,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
@Override
public Iterator<MarkedEdge> getEdgesFromSource( final EdgeColumnFamilies columnFamilies,
final ApplicationScope scope, final SearchByEdgeType edgeType,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdgeType( edgeType );
@@ -392,7 +426,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
- new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ new SourceEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+ shards ) {
@Override
@@ -417,12 +452,10 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
return new SimpleMarkedEdge( sourceId, type, edge.id, edge.timestamp, marked );
}
-
-
};
- return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
graphFig.getScanPageSize() );
}
@@ -431,7 +464,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
public Iterator<MarkedEdge> getEdgesFromSourceByTargetType( final EdgeColumnFamilies columnFamilies,
final ApplicationScope scope,
final SearchByIdType edgeType,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdgeType( edgeType );
@@ -446,7 +479,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
- new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ new SourceEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+ shards ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -470,22 +504,16 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
protected MarkedEdge createEdge( final DirectedEdge edge, final boolean marked ) {
return new SimpleMarkedEdge( targetId, type, edge.id, edge.timestamp, marked );
}
-
-
-
};
- return new ShardRowIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ return new ShardsColumnIterator( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
graphFig.getScanPageSize() );
}
-
-
@Override
public Iterator<MarkedEdge> getEdgesToTarget( final EdgeColumnFamilies columnFamilies, final ApplicationScope scope,
- final SearchByEdgeType edgeType,
- final Iterator<ShardEntryGroup> shards ) {
+ final SearchByEdgeType edgeType, final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdgeType( edgeType );
@@ -497,7 +525,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final Serializer<DirectedEdge> serializer = columnFamily.getColumnSerializer();
final TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge> searcher =
- new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ new TargetEdgeSearcher<RowKey, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+ shards ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
@@ -524,7 +553,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
};
- return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
graphFig.getScanPageSize() );
}
@@ -533,7 +562,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
public Iterator<MarkedEdge> getEdgesToTargetBySourceType( final EdgeColumnFamilies columnFamilies,
final ApplicationScope scope,
final SearchByIdType edgeType,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
ValidationUtils.validateApplicationScope( scope );
GraphValidation.validateSearchByEdgeType( edgeType );
@@ -548,7 +577,8 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
final TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge> searcher =
- new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(), shards ) {
+ new TargetEdgeSearcher<RowKeyType, DirectedEdge, MarkedEdge>( scope, maxTimestamp, edgeType.last(),
+ shards ) {
@Override
protected Serializer<DirectedEdge> getSerializer() {
return serializer;
@@ -573,7 +603,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
}
};
- return new ShardRowIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
+ return new ShardsColumnIterator<>( searcher, columnFamily, keyspace, cassandraConfig.getReadCL(),
graphFig.getScanPageSize() );
}
@@ -581,74 +611,387 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
/**
* Class for performing searched on rows based on source id
*/
- private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+ private static abstract class SourceEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
protected SourceEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
super( scope, maxTimestamp, last, shards );
}
public int compare( final T o1, final T o2 ) {
- int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+ int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
- if(compare == 0){
- compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+ if ( compare == 0 ) {
+ compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
}
return compare;
}
-
-
}
/**
* Class for performing searched on rows based on target id
*/
- private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T>{
+ private static abstract class TargetEdgeSearcher<R, C, T extends Edge> extends EdgeSearcher<R, C, T> {
protected TargetEdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
- final Iterator<ShardEntryGroup> shards ) {
+ final Collection<Shard> shards ) {
super( scope, maxTimestamp, last, shards );
}
public int compare( final T o1, final T o2 ) {
- int compare = Long.compare(o1.getTimestamp(), o2.getTimestamp());
+ int compare = Long.compare( o1.getTimestamp(), o2.getTimestamp() );
- if(compare == 0){
- compare = o1.getTargetNode().compareTo( o2.getTargetNode());
+ if ( compare == 0 ) {
+ compare = o1.getTargetNode().compareTo( o2.getTargetNode() );
}
return compare;
}
-
-
}
+
/**
* Simple callback to perform puts and deletes with a common row setup code
*
* @param <R> The row key type
+ * @param <C> The column type
*/
- private static interface RowOp<R> {
+ private abstract class RowOp<R, C> {
+
+
+ /**
+ * Return the column family used for the write
+ */
+ protected abstract MultiTennantColumnFamily<ApplicationScope, R, C> getColumnFamily();
+
+ /**
+ * Get the row key
+ */
+ public abstract R getRowKey( final Shard shard );
+
+ /**
+ * Get the column family value
+ */
+ protected abstract C getDirectedEdge();
+
+ /**
+ * Get the flag on if it's deleted
+ */
+ protected abstract boolean isDeleted();
+
/**
* Write the edge with the given data
*/
- void writeEdge( final MultiTennantColumnFamily<ApplicationScope, R, DirectedEdge> columnFamily, final R rowKey,
- final DirectedEdge edge );
+ abstract void writeEdge( final MutationBatch batch,
+ final MultiTennantColumnFamily<ApplicationScope, R, C> columnFamily,
+ final ApplicationScope scope, final R rowKey, final C column, final Shard shard,
+ final boolean isDeleted );
+
+
+ /**
+ * Create a mutation batch
+ */
+ public MutationBatch createBatch( final ApplicationScope scope, final Collection<Shard> shards,
+ final UUID opTimestamp ) {
+
+ final MutationBatch batch =
+ keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() )
+ .withTimestamp( opTimestamp.timestamp() );
+
+
+ final C column = getDirectedEdge();
+ final MultiTennantColumnFamily<ApplicationScope, R, C> columnFamily = getColumnFamily();
+ final boolean isDeleted = isDeleted();
+
+
+ for ( Shard shard : shards ) {
+ final R rowKey = getRowKey( shard );
+ writeEdge( batch, columnFamily, scope, rowKey, column, shard, isDeleted );
+ }
+
+
+ return batch;
+ }
+ }
+
+
+ /**
+ * Perform a write of the source->target
+ */
+ private abstract class SourceWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily;
+ private final Id sourceNodeId;
+
+
+ private final String type;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private SourceWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeCfName();
+
+ this.sourceNodeId = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( markedEdge.getTargetNode(), markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKey getRowKey( final Shard shard ) {
+ return new RowKey( sourceNodeId, type, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the source->target with target type
+ */
+ private abstract class SourceTargetTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily;
+ private final Id sourceNodeId;
+ private final String type;
+ private Id targetId;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private SourceTargetTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+ this.sourceNodeId = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.targetId = markedEdge.getTargetNode();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( targetId, markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKeyType getRowKey( final Shard shard ) {
+ return new RowKeyType( sourceNodeId, type, targetId, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the target <-- source
+ */
+ private abstract class TargetWriteOp extends RowOp<RowKey, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> columnFamily;
+ private final Id targetNode;
+
+
+ private final String type;
+ private final boolean isDeleted;
+ private final DirectedEdge directedEdge;
+
+
+ /**
+ * Write the source write operation
+ */
+ private TargetWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getTargetNodeCfName();
+
+ this.targetNode = markedEdge.getTargetNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( markedEdge.getSourceNode(), markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ApplicationScope, RowKey, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKey getRowKey( final Shard shard ) {
+ return new RowKey( targetNode, type, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the target<--source with source type
+ */
+ private abstract class TargetSourceTypeWriteOp extends RowOp<RowKeyType, DirectedEdge> {
+
+ private final MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> columnFamily;
+ private final Id targetNode;
+
+ private final Id sourceNode;
+
+ final String type;
+
+ final boolean isDeleted;
+ final DirectedEdge directedEdge;
+
/**
- * Perform the count on the edge
+ * Write the source write operation
*/
- void countEdge( final Shard shard, final DirectedEdgeMeta directedEdgeMeta);
+ private TargetSourceTypeWriteOp( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getSourceNodeTargetTypeCfName();
+
+ this.targetNode = markedEdge.getTargetNode();
+ this.sourceNode = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.directedEdge = new DirectedEdge( sourceNode, markedEdge.getTimestamp() );
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ApplicationScope, RowKeyType, DirectedEdge> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public RowKeyType getRowKey( final Shard shard ) {
+ return new RowKeyType( targetNode, type, sourceNode, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected DirectedEdge getDirectedEdge() {
+ return directedEdge;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
+ }
+
+
+ /**
+ * Perform a write of the edge versions
+ */
+ private abstract class EdgeVersions extends RowOp<EdgeRowKey, Long> {
+
+ private final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily;
+ private final Id targetNode;
+
+ private final Id sourceNode;
+
+ final String type;
+
+ final boolean isDeleted;
+ final Long edgeVersion;
+
/**
- * Write the edge into the version cf
+ * Write the source write operation
*/
- void writeVersion( final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> columnFamily,
- final EdgeRowKey rowKey, long timestamp );
+ private EdgeVersions( final EdgeColumnFamilies edgeColumnFamilies, final MarkedEdge markedEdge ) {
+ this.columnFamily = edgeColumnFamilies.getGraphEdgeVersions();
+
+ this.targetNode = markedEdge.getTargetNode();
+ this.sourceNode = markedEdge.getSourceNode();
+
+ this.type = markedEdge.getType();
+ this.isDeleted = markedEdge.isDeleted();
+
+ this.edgeVersion = markedEdge.getTimestamp();
+ }
+
+
+ @Override
+ protected MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> getColumnFamily() {
+ return columnFamily;
+ }
+
+
+ @Override
+ public EdgeRowKey getRowKey( final Shard shard ) {
+ return new EdgeRowKey( sourceNode, type, targetNode, shard.getShardIndex() );
+ }
+
+
+ @Override
+ protected Long getDirectedEdge() {
+ return edgeVersion;
+ }
+
+
+ @Override
+ protected boolean isDeleted() {
+ return isDeleted;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
new file mode 100644
index 0000000..0c7e5b5
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -0,0 +1,128 @@
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
+
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.usergrid.persistence.core.astyanax.ColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiKeyColumnNameIterator;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ * Internal iterator to iterate over multiple row keys
+ *
+ * @param <R> The row type
+ * @param <C> The column type
+ * @param <T> The parsed return type
+ */
+public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
+
+ private final EdgeSearcher<R, C, T> searcher;
+
+ private final MultiTennantColumnFamily<ApplicationScope, R, C> cf;
+
+ private Iterator<T> currentColumnIterator;
+
+ private final Keyspace keyspace;
+
+ private final int pageSize;
+
+ private final ConsistencyLevel consistencyLevel;
+
+
+ public ShardsColumnIterator( final EdgeSearcher<R, C, T> searcher,
+ final MultiTennantColumnFamily<ApplicationScope, R, C> cf, final Keyspace keyspace,
+ final ConsistencyLevel consistencyLevel, final int pageSize ) {
+ this.searcher = searcher;
+ this.cf = cf;
+ this.keyspace = keyspace;
+ this.pageSize = pageSize;
+ this.consistencyLevel = consistencyLevel;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+
+ /**
+ * Iterator isn't initialized, start it
+ */
+ if(currentColumnIterator == null){
+ startIterator();
+ }
+
+ return currentColumnIterator.hasNext();
+ }
+
+
+ @Override
+ public T next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "There are no more rows or columns left to advance" );
+ }
+
+ return currentColumnIterator.next();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator to the next row (assumes the check for row keys is elsewhere)
+ */
+ private void startIterator() {
+
+ /**
+ * If the edge is present, we need to being seeking from this
+ */
+
+ final RangeBuilder rangeBuilder = new RangeBuilder().setLimit( pageSize );
+
+
+ //set the range into the search
+ searcher.setRange( rangeBuilder );
+
+ /**
+ * Get our list of slices
+ */
+ final List<ScopedRowKey<ApplicationScope, R>> rowKeys = searcher.getRowKeys();
+
+
+ final List<ColumnNameIterator<C, T>> columnNameIterators = new ArrayList<>( rowKeys.size() );
+
+ for(ScopedRowKey<ApplicationScope, R> rowKey: rowKeys){
+
+
+
+ final RowQuery<ScopedRowKey<ApplicationScope, R>, C> query =
+ keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKey( rowKey )
+ .autoPaginate( true ).withColumnRange( rangeBuilder.build() );
+
+
+ final ColumnNameIterator<C, T> columnNameIterator = new ColumnNameIterator<>( query, searcher, searcher.hasPage() );
+
+ columnNameIterators.add( columnNameIterator );
+
+ }
+
+
+
+ currentColumnIterator = new MultiKeyColumnNameIterator<>(columnNameIterators, searcher, pageSize);
+
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 8f5171b..59bf014 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -21,32 +21,29 @@
package org.apache.usergrid.persistence.graph;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.runners.model.InitializationError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
-import org.apache.cassandra.thrift.Cassandra;
import org.apache.commons.lang.time.StopWatch;
import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.migration.MigrationException;
import org.apache.usergrid.persistence.core.migration.MigrationManager;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -55,16 +52,20 @@ import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
import org.apache.usergrid.persistence.model.entity.Id;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
import com.google.common.base.Optional;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.netflix.config.ConfigurationManager;
import rx.Observable;
-import rx.Subscriber;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -80,23 +81,68 @@ public class GraphManagerShardConsistencyIT {
@ClassRule
public static CassandraRule rule = new CassandraRule();
+ private static final MetricRegistry registry = new MetricRegistry();
+
+ private static final Meter writeMeter = registry.meter( "writeThroughput" );
+
+ private static final Slf4jReporter reporter = Slf4jReporter.forRegistry( registry )
+ .outputTo( log )
+ .convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS )
+ .build();
+
+
protected ApplicationScope scope;
- protected int numWorkers;
+
+ protected Object originalShardSize;
+
+ protected Object originalShardTimeout;
+
+ protected Object originalShardDelta;
@Before
public void setupOrg() {
+ originalShardSize = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_SIZE );
+
+ originalShardTimeout = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_CACHE_TIMEOUT );
+
+ originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
+
+
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
+
+
+ final long cacheTimeout = 10000;
+ //set our cache timeout to 10 seconds
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
+
+
+ final long minDelta = ( long ) (cacheTimeout * 2.5);
+
+ ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
+
+
+
//get the system property of the UUID to use. If one is not set, use the defualt
String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
scope = new ApplicationScopeImpl( createId( UUID.fromString( uuidString ), "test" ) );
- numWorkers = Integer.parseInt( System.getProperty( "numWorkers", "4" ) );
-// readCount = Integer.parseInt( System.getProperty( "readCount", "20000" ) );
+
+
+ reporter.start( 10, TimeUnit.SECONDS );
+ }
+
+
+ @After
+ public void tearDown(){
+ reporter.stop();
+ reporter.report();
}
@@ -131,7 +177,7 @@ public class GraphManagerShardConsistencyIT {
@Test
public void writeThousandsSingleTarget() throws InterruptedException, ExecutionException, MigrationException {
- final Id targetId = createId("target");
+ final Id sourceId = createId("source");
final String edgeType = "test";
EdgeGenerator generator = new EdgeGenerator() {
@@ -139,7 +185,7 @@ public class GraphManagerShardConsistencyIT {
@Override
public Edge newEdge() {
- Edge edge = createEdge( createId( "source" ), edgeType, targetId );
+ Edge edge = createEdge( sourceId, edgeType, createId( "target" ) );
return edge;
@@ -148,33 +194,45 @@ public class GraphManagerShardConsistencyIT {
@Override
public Observable<Edge> doSearch( final GraphManager manager ) {
- return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+ return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
}
};
+ final int numInjectors = 2;
+
/**
* create 3 injectors. This way all the caches are independent of one another. This is the same as
* multiple nodes
*/
- final List<Injector> injectors = createInjectors(3);
+ final List<Injector> injectors = createInjectors(numInjectors);
final GraphFig graphFig = getInstance( injectors, GraphFig.class );
final long shardSize = graphFig.getShardSize();
+
+ //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing power for writes
+ final int numProcessors = Runtime.getRuntime().availableProcessors() /2 ;
+
+ final int numWorkers = numProcessors/numInjectors;
+
+
/**
* Do 4x shard size so we should have approximately 4 shards
*/
final long numberOfEdges = shardSize * 4;
+
final long countPerWorker = numberOfEdges/numWorkers;
final long writeLimit = countPerWorker;
-// HystrixCassandra.ASYNC_GROUP.
+
+ //min stop time the min delta + 1 cache cycle timeout
+ final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
@@ -185,7 +243,7 @@ public class GraphManagerShardConsistencyIT {
for(Injector injector: injectors) {
final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
- futures.addAll( doTest( gmf, generator, writeLimit ) );
+ futures.addAll( doTest( gmf, generator, numWorkers, writeLimit, minExecutionTime ) );
}
for(Future<Boolean> future: futures){
@@ -193,18 +251,20 @@ public class GraphManagerShardConsistencyIT {
}
//now get all our shards
- final NodeShardAllocation allocation = getInstance( injectors, NodeShardAllocation.class );
+ final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
- final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
+ final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
int count = 0;
- while(count < 4) {
+ while(true) {
//reset our count. Ultimately we'll have 4 groups once our compaction completes
count = 0;
- final Iterator<ShardEntryGroup> groups = allocation.getShards( scope, Optional.<Shard>absent(), directedEdgeMeta );
+
+ //we have to get it from the cache, because this will trigger the compaction process
+ final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
while(groups.hasNext()){
final ShardEntryGroup group = groups.next();
@@ -214,6 +274,13 @@ public class GraphManagerShardConsistencyIT {
count++;
}
+
+ //we're done
+ if(count == 4){
+ break;
+ }
+
+ Thread.sleep(5000);
}
@@ -252,14 +319,14 @@ public class GraphManagerShardConsistencyIT {
/**
* Execute the test with the generator
*/
- private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount ) throws InterruptedException, ExecutionException {
+ private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final int numWorkers, final long writeCount, final long minExecutionTime ) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool( numWorkers );
List<Future<Boolean>> futures = new ArrayList<>( numWorkers );
for ( int i = 0; i < numWorkers; i++ ) {
- Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount ) );
+ Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount, minExecutionTime ) );
futures.add( future );
}
@@ -273,12 +340,14 @@ public class GraphManagerShardConsistencyIT {
private final GraphManagerFactory factory;
private final EdgeGenerator generator;
private final long writeLimit;
+ private final long minExecutionTime;
- private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit) {
+ private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, final long minExecutionTime ) {
this.factory = factory;
this.generator = generator;
this.writeLimit = writeLimit;
+ this.minExecutionTime = minExecutionTime;
}
@@ -287,10 +356,11 @@ public class GraphManagerShardConsistencyIT {
GraphManager manager = factory.createEdgeManager( scope );
- final StopWatch timer = new StopWatch();
- timer.start();
- for ( long i = 0; i < writeLimit; i++ ) {
+ final long startTime = System.currentTimeMillis();
+
+
+ for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime ; i++ ) {
Edge edge = generator.newEdge();
@@ -300,13 +370,13 @@ public class GraphManagerShardConsistencyIT {
assertNotNull( "Returned has a version", returned.getTimestamp() );
+ writeMeter.mark();
+
if ( i % 1000 == 0 ) {
log.info( " Wrote: " + i );
}
}
- timer.stop();
- log.info( "Total time to write {} entries {} ms", writeLimit, timer.getTime() );
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index e4c7627..878674a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -260,7 +260,7 @@ public class NodeShardAllocationTest {
//mock up returning the value
when( shardedEdgeSerialization
.getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
- any( Iterator.class ) ) ).thenReturn( edgeIterator );
+ any( Collection.class ) ) ).thenReturn( edgeIterator );
final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 498d5d1..8e9ed5c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -453,6 +453,12 @@ public class NodeShardApproximationTest {
@Override
+ public double getShardRepairChance() {
+ return 0;
+ }
+
+
+ @Override
public long getShardSize() {
return 0; //To change body of implemented methods use File | Settings | File Templates.
}
@@ -477,6 +483,12 @@ public class NodeShardApproximationTest {
@Override
+ public int getShardCacheRefreshWorkerCount() {
+ return 0;
+ }
+
+
+ @Override
public long getCounterFlushCount() {
return 100000l;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c69c1974/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties b/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
new file mode 100644
index 0000000..6097446
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-SHARD.properties
@@ -0,0 +1,23 @@
+#
+# /*
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing,
+# * software distributed under the License is distributed on an
+# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# * KIND, either express or implied. See the License for the
+# * specific language governing permissions and limitations
+# * under the License.
+# */
+#
+
+# Keep nothing but overriding test defaults in here
+usergrid.graph.shard.size=10000