You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:01 UTC
[02/19] git commit: Migrated graph over to new task executor
Migrated graph over to new task executor
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/4c72f5c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/4c72f5c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/4c72f5c6
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: 4c72f5c636bae9b16df7578302538c7b4d9600e7
Parents: 66e508a
Author: Todd Nine <to...@apache.org>
Authored: Wed Sep 24 17:39:07 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Wed Sep 24 17:39:07 2014 -0600
----------------------------------------------------------------------
.../collection/event/EntityVersionRemoved.java | 26 +++
.../collection/guice/CollectionModule.java | 17 +-
.../serialization/SerializationFig.java | 32 ++-
.../core/astyanax/AstyanaxKeyspaceProvider.java | 2 +
.../persistence/core/guice/CommonModule.java | 2 +
.../core/task/NamedTaskExecutorImpl.java | 14 +-
.../persistence/core/task/TaskExecutor.java | 2 +-
.../usergrid/persistence/graph/GraphFig.java | 2 +
.../persistence/graph/guice/GraphModule.java | 15 ++
.../shard/impl/ShardGroupCompactionImpl.java | 202 ++++++++++++-------
.../impl/shard/ShardGroupCompactionTest.java | 5 +-
11 files changed, 227 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
new file mode 100644
index 0000000..dca575d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
@@ -0,0 +1,26 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+
+
+/**
+ *
+ * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionRemoved {
+
+
+ /**
+ * The version specified was removed.
+ * @param scope
+ * @param entityId
+ * @param entityVersion
+ */
+ public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 84f69db..306f6e0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -36,6 +36,8 @@ import org.apache.usergrid.persistence.collection.serialization.SerializationFig
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.UUIDService;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
@@ -79,8 +81,7 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@Write
-
- public WriteStart write (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
return writeStart;
@@ -90,13 +91,21 @@ public class CollectionModule extends AbstractModule {
@Singleton
@Inject
@WriteUpdate
-
- public WriteStart writeUpdate (MvccLogEntrySerializationStrategy logStrategy, UUIDService uuidService) {
+ public WriteStart writeUpdate (final MvccLogEntrySerializationStrategy logStrategy) {
final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.PARTIAL );
return writeStart;
}
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
+ return new NamedTaskExecutorImpl( "collectiontasks", serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
index 81302a6..7e69a19 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/SerializationFig.java
@@ -15,25 +15,45 @@ public interface SerializationFig extends GuicyFig {
/**
* Time to live timeout in seconds.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.stage.transient.timeout" )
- @Default( "5" )
+ @Key("collection.stage.transient.timeout")
+ @Default("5")
int getTimeout();
/**
* Number of history items to return for delete.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.delete.history.size" )
- @Default( "100" )
+ @Key("collection.delete.history.size")
+ @Default("100")
int getHistorySize();
/**
* Number of items to buffer.
+ *
* @return Timeout in seconds.
*/
- @Key( "collection.buffer.size" )
- @Default( "10" )
+ @Key("collection.buffer.size")
+ @Default("10")
int getBufferSize();
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.threadsize" )
+ @Default( "20" )
+ int getTaskPoolThreadSize();
+
+
+
+ /**
+ * The size of threads to have in the task pool
+ */
+ @Key( "collection.task.pool.queuesize" )
+ @Default( "20" )
+ int getTaskPoolQueueSize();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
index 7caeaeb..8bd5a9f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/AstyanaxKeyspaceProvider.java
@@ -24,6 +24,7 @@ import java.util.Set;
import com.google.inject.Inject;
import com.google.inject.Provider;
+import com.google.inject.Singleton;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
@@ -41,6 +42,7 @@ import com.netflix.astyanax.thrift.ThriftFamilyFactory;
*
* @author tnine
*/
+@Singleton
public class AstyanaxKeyspaceProvider implements Provider<Keyspace> {
private final CassandraFig cassandraFig;
private final CassandraConfig cassandraConfig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a4cc98a..5f461bb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -63,4 +63,6 @@ public class CommonModule extends AbstractModule {
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8ba73a0..8184937 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -53,7 +53,7 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
- public <V, I> void submit( final Task<V, I> task ) {
+ public <V, I> ListenableFuture<V> submit( final Task<V, I> task ) {
final ListenableFuture<V> future;
@@ -67,8 +67,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
}
catch ( RejectedExecutionException ree ) {
task.rejected();
- return;
+ return Futures.immediateCancelledFuture();
}
+
+ return future;
}
@@ -147,16 +149,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
-
- // ListenableFutureTask<Task<?, ?>> future = ( ListenableFutureTask<Task<?, ?>> ) r;
- //
- // future.
- // final Task<?, ?> task = ( Task<?, ?> ) r;
LOG.warn( "Audit queue full, rejecting audit task {}", r );
throw new RejectedExecutionException( "Unable to run task, queue full" );
- // LOG.warn( "Audit queue full, rejecting audit task {}", task );
- // task.rejected();
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index e60da83..b5491bc 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -10,5 +10,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> void submit(Task<V, I> task);
+ public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 0a6ecfa..894e74a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -90,6 +90,8 @@ public interface GraphFig extends GuicyFig {
public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+
+
@Default("1000")
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f0e954b..608f8ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,6 +24,8 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.consistency.TimeServiceImpl;
import org.apache.usergrid.persistence.core.migration.Migration;
+import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -62,7 +64,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.Sizeb
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
import com.google.inject.Key;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.google.inject.multibindings.Multibinder;
@@ -144,6 +149,16 @@ public class GraphModule extends AbstractModule {
migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
}
+
+
+ @Inject
+ @Singleton
+ @Provides
+ public TaskExecutor graphTaskExecutor(final GraphFig graphFig){
+ return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() );
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 5076424..be7fbe4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -30,7 +30,6 @@ import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
@@ -46,6 +45,8 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -68,8 +69,6 @@ import com.google.common.hash.PrimitiveSink;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.netflix.astyanax.Keyspace;
@@ -91,7 +90,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
private static final HashFunction MURMUR_128 = Hashing.murmur3_128();
- private final ListeningExecutorService executorService;
+ private final TaskExecutor taskExecutor;
private final TimeService timeService;
private final GraphFig graphFig;
private final NodeShardAllocation nodeShardAllocation;
@@ -110,7 +109,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
final NodeShardAllocation nodeShardAllocation,
final ShardedEdgeSerialization shardedEdgeSerialization,
final EdgeColumnFamilies edgeColumnFamilies, final Keyspace keyspace,
- final EdgeShardSerialization edgeShardSerialization ) {
+ final EdgeShardSerialization edgeShardSerialization,
+ final TaskExecutor taskExecutor ) {
this.timeService = timeService;
this.graphFig = graphFig;
@@ -124,8 +124,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
this.shardCompactionTaskTracker = new ShardCompactionTaskTracker();
this.shardAuditTaskTracker = new ShardAuditTaskTracker();
- executorService = MoreExecutors.listeningDecorator(
- new MaxSizeThreadPool( graphFig.getShardAuditWorkerCount(), graphFig.getShardAuditWorkerQueueSize() ) );
+ this.taskExecutor = taskExecutor;
}
@@ -232,7 +231,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
/**
- * We didn't move anything this pass, mark the shard as compacted. If we move something, it means that we missed it on the first pass
+ * We didn't move anything this pass, mark the shard as compacted. If we move something,
+ * it means that we missed it on the first pass
* or someone is still not writing to the target shard only.
*/
if ( edgeCount == 0 ) {
@@ -293,91 +293,153 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
* Try and submit. During back pressure, we may not be able to submit, that's ok. Better to drop than to
* hose the system
*/
- ListenableFuture<AuditResult> future = executorService.submit( new Callable<AuditResult>() {
+ ListenableFuture<AuditResult> future = taskExecutor.submit( new ShardAuditTask( scope, edgeMeta, group ) );
+
+ /**
+ * Log our success or failures for debugging purposes
+ */
+ Futures.addCallback( future, new FutureCallback<AuditResult>() {
@Override
- public AuditResult call() throws Exception {
+ public void onSuccess( @Nullable final AuditResult result ) {
+ LOG.debug( "Successfully completed audit of task {}", result );
+ }
- /**
- * We don't have a compaction pending. Run an audit on the shards
- */
- if ( !group.isCompactionPending() ) {
+ @Override
+ public void onFailure( final Throwable t ) {
+ LOG.error( "Unable to perform audit. Exception is ", t );
+ }
+ } );
- /**
- * Check if we should allocate, we may want to
- */
+ return future;
+ }
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.CHECKED_NO_OP;
- }
- try {
+ private final class ShardAuditTask implements Task<AuditResult, ShardAuditKey> {
- final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
- if ( !created ) {
- return AuditResult.CHECKED_NO_OP;
- }
- }
- finally {
- shardAuditTaskTracker.complete( scope, edgeMeta, group );
- }
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
- return AuditResult.CHECKED_CREATED;
- }
+ public ShardAuditTask( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public ShardAuditKey getId() {
+ return new ShardAuditKey( scope, edgeMeta, group );
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to execute audit for shard of {}", throwable );
+ }
+
+
+ @Override
+ public void rejected() {
+ //ignore, if this happens we don't care, we're saturated, we can check later
+ LOG.error( "Rejected audit for shard of {}", getId() );
+ }
- //check our taskmanager
+ @Override
+ public AuditResult call() throws Exception {
+ /**
+ * We don't have a compaction pending. Run an audit on the shards
+ */
+ if ( !group.isCompactionPending() ) {
/**
- * Do the compaction
+ * Check if we should allocate, we may want to
*/
- if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
- /**
- * It's already compacting, don't do anything
- */
- if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
- return AuditResult.COMPACTING;
- }
- /**
- * We use a finally b/c we always want to remove the task track
- */
- try {
- CompactionResult result = compact( scope, edgeMeta, group );
- LOG.info( "Compaction result for compaction of scope {} with edge meta data of {} and shard group {} is {}", new Object[]{scope, edgeMeta, group, result} );
- }
- finally {
- shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardAuditTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.CHECKED_NO_OP;
+ }
+
+ try {
+
+ final boolean created = nodeShardAllocation.auditShard( scope, group, edgeMeta );
+ if ( !created ) {
+ return AuditResult.CHECKED_NO_OP;
}
- return AuditResult.COMPACTED;
+ }
+ finally {
+ shardAuditTaskTracker.complete( scope, edgeMeta, group );
}
- //no op, there's nothing we need to do to this shard
- return AuditResult.NOT_CHECKED;
- }
- } );
- /**
- * Log our success or failures for debugging purposes
- */
- Futures.addCallback( future, new FutureCallback<AuditResult>() {
- @Override
- public void onSuccess( @Nullable final AuditResult result ) {
- LOG.debug( "Successfully completed audit of task {}", result );
+ return AuditResult.CHECKED_CREATED;
}
+ //check our taskmanager
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to perform audit. Exception is ", t );
+
+ /**
+ * Do the compaction
+ */
+ if ( group.shouldCompact( timeService.getCurrentTime() ) ) {
+ /**
+ * It's already compacting, don't do anything
+ */
+ if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) ) {
+ return AuditResult.COMPACTING;
+ }
+
+ /**
+ * We use a finally b/c we always want to remove the task track
+ */
+ try {
+ CompactionResult result = compact( scope, edgeMeta, group );
+ LOG.info(
+ "Compaction result for compaction of scope {} with edge meta data of {} and shard group " +
+ "{} is {}",
+ new Object[] { scope, edgeMeta, group, result } );
+ }
+ finally {
+ shardCompactionTaskTracker.complete( scope, edgeMeta, group );
+ }
+ return AuditResult.COMPACTED;
}
- } );
- return future;
+ //no op, there's nothing we need to do to this shard
+ return AuditResult.NOT_CHECKED;
+ }
+ }
+
+
+ private static final class ShardAuditKey {
+ private final ApplicationScope scope;
+ private final DirectedEdgeMeta edgeMeta;
+ private final ShardEntryGroup group;
+
+
+ private ShardAuditKey( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
+ final ShardEntryGroup group ) {
+ this.scope = scope;
+ this.edgeMeta = edgeMeta;
+ this.group = group;
+ }
+
+
+ @Override
+ public String toString() {
+ return "ShardAuditKey{" +
+ "scope=" + scope +
+ ", edgeMeta=" + edgeMeta +
+ ", group=" + group +
+ '}';
+ }
}
@@ -531,7 +593,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
}
-
public static final class CompactionResult {
public final long copiedEdges;
@@ -541,7 +602,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
public final Shard compactedShard;
-
private CompactionResult( final long copiedEdges, final Shard targetShard, final Set<Shard> sourceShards,
final Set<Shard> removedShards, final Shard compactedShard ) {
this.copiedEdges = copiedEdges;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/4c72f5c6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
index 1513e85..9f0792d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -35,6 +35,7 @@ import org.mockito.Matchers;
import org.apache.usergrid.persistence.core.consistency.TimeService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
@@ -84,6 +85,8 @@ public class ShardGroupCompactionTest {
final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+ final TaskExecutor taskExecutor = mock(TaskExecutor.class);
+
final long delta = 10000;
final long createTime = 20000;
@@ -100,7 +103,7 @@ public class ShardGroupCompactionTest {
ShardGroupCompactionImpl compaction =
new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
- edgeColumnFamilies, keyspace, edgeShardSerialization );
+ edgeColumnFamilies, keyspace, edgeShardSerialization, taskExecutor );
DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );