You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/01 22:01:02 UTC
[03/19] git commit: Added events and task cleanup
Added events and task cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c0b78b9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c0b78b9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c0b78b9f
Branch: refs/heads/two-dot-o-rebuildable-index
Commit: c0b78b9fab07cbc2f5975d89664ff8a523b98b9a
Parents: 4c72f5c
Author: Todd Nine <to...@apache.org>
Authored: Thu Sep 25 12:09:11 2014 -0600
Committer: Todd Nine <to...@apache.org>
Committed: Thu Sep 25 12:09:11 2014 -0600
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 12 +-
.../collection/event/EntityDeleted.java | 25 ++++
.../collection/event/EntityVersionCreated.java | 25 ++++
.../collection/event/EntityVersionDeleted.java | 28 +++++
.../collection/event/EntityVersionRemoved.java | 26 -----
.../impl/EntityCollectionManagerImpl.java | 12 ++
.../impl/EntityVersionCleanupTask.java | 115 +++++++++++++++++++
.../serialization/impl/VersionIterator.java | 111 ++++++++++++++++++
.../core/task/NamedTaskExecutorImpl.java | 26 +++--
.../usergrid/persistence/core/task/Task.java | 2 +-
.../persistence/core/task/TaskExecutor.java | 5 +-
.../persistence/graph/event/EdgeDeleted.java | 8 ++
12 files changed, 349 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 84bcea6..ee3a5d1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -27,14 +27,14 @@ import rx.Observable;
/**
*
- *
- * @author: tnine
+ * The operations for performing changes on an entity
*
*/
public interface EntityCollectionManager {
/**
- * Write the entity in the entity collection.
+ * Write the entity in the entity collection. This is an entire entity, it's contents will
+ * completely overwrite the previous values, if it exists.
*
* @param entity The entity to update
*/
@@ -52,10 +52,10 @@ public interface EntityCollectionManager {
public Observable<Entity> load( Id entityId );
- //TODO add partial update
-
/**
- * Takes the change and reloads an entity with all changes applied.
+ * Takes the change and reloads an entity with all changes applied in this entity applied.
+ * The resulting entity from calling load will be the previous version of this entity + the entity
+ * in this object applied to it.
* @param entity
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
new file mode 100644
index 0000000..1c075f1
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed.
+ * If any listener fails with an exception, the entity will not be removed.
+ *
+ */
+public interface EntityDeleted {
+
+
+ /**
+ * The event fired when an entity is deleted
+ *
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ */
+ public void deleted( final CollectionScope scope, final Id entityId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
new file mode 100644
index 0000000..9d7761c
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
@@ -0,0 +1,25 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Entity;
+
+
+/**
+ *
+ * Invoked after a new version of an entity has been created. The entity should be a complete
+ * view of the entity.
+ *
+ */
+public interface EntityVersionCreated {
+
+
+ /**
+ * The new version of the entity. Note that this should be a fully merged view of the entity.
+ * In the case of partial updates, the passed entity should be fully merged with it's previous entries
+ * @param scope The scope of the entity
+ * @param entity The fully loaded and merged entity
+ */
+ public void versionCreated( final CollectionScope scope, final Entity entity );
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
new file mode 100644
index 0000000..3b76e84
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
@@ -0,0 +1,28 @@
+package org.apache.usergrid.persistence.collection.event;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ *
+ * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
+ * only the version itself.
+ *
+ */
+public interface EntityVersionDeleted {
+
+
+ /**
+ * The version specified was removed.
+ *
+ * @param scope The scope of the entity
+ * @param entityId The entity Id that was removed
+ * @param entityVersion The version that was removed
+ */
+ public void versionDeleted(final CollectionScope scope, final Id entityId, final UUID entityVersion);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
deleted file mode 100644
index dca575d..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionRemoved.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.CollectionScope;
-
-
-/**
- *
- * Invoked when an entity version is removed. Note that this is not a deletion of the entity itself,
- * only the version itself.
- *
- */
-public interface EntityVersionRemoved {
-
-
- /**
- * The version specified was removed.
- * @param scope
- * @param entityId
- * @param entityVersion
- */
- public void versionRemoved(final CollectionScope scope, final UUID entityId, final UUID entityVersion);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 96a7ed2..a6fc31b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -19,11 +19,16 @@
package org.apache.usergrid.persistence.collection.impl;
+import java.util.List;
+import java.util.UUID;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.Write;
import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
@@ -38,6 +43,8 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimist
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.service.UUIDService;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -83,6 +90,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MarkStart markStart;
private final MarkCommit markCommit;
+ private final TaskExecutor taskExecutor;
+
@Inject
public EntityCollectionManagerImpl( final UUIDService uuidService, @Write final WriteStart writeStart,
@WriteUpdate final WriteStart writeUpdate,
@@ -90,6 +99,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback, final Load load,
final MarkStart markStart, final MarkCommit markCommit,
+ final TaskExecutor taskExecutor,
@Assisted final CollectionScope collectionScope) {
Preconditions.checkNotNull( uuidService, "uuidService must be defined" );
@@ -109,6 +119,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.uuidService = uuidService;
this.collectionScope = collectionScope;
+ this.taskExecutor = taskExecutor;
}
@@ -240,4 +251,5 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
new file mode 100644
index 0000000..11e2da9
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -0,0 +1,115 @@
+package org.apache.usergrid.persistence.collection.impl;
+
+
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.impl.VersionIterator;
+import org.apache.usergrid.persistence.core.entity.EntityVersion;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
+ * retained, the range is exclusive.
+ */
+class EntityVersionCleanupTask implements Task<CollectionIoEvent<EntityVersion>, CollectionIoEvent<EntityVersion>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
+
+
+ private final CollectionIoEvent<EntityVersion> collectionIoEvent;
+ private final List<EntityVersionDeleted> listeners;
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+
+
+ private EntityVersionCleanupTask( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final CollectionIoEvent<EntityVersion> collectionIoEvent,
+ final List<EntityVersionDeleted> listeners ) {
+ this.collectionIoEvent = collectionIoEvent;
+ this.listeners = listeners;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ }
+
+
+ @Override
+ public CollectionIoEvent<EntityVersion> getId() {
+ return collectionIoEvent;
+ }
+
+
+ @Override
+ public void exceptionThrown( final Throwable throwable ) {
+ LOG.error( "Unable to run update task for event {}", collectionIoEvent, throwable );
+ }
+
+
+ @Override
+ public void rejected() {
+ //Our task was rejected meaning our queue was full. We need this operation to run,
+ // so we'll run it in our current thread
+
+ try {
+ call();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Exception thrown in call task", e );
+ }
+ }
+
+
+ @Override
+ public CollectionIoEvent<EntityVersion> call() throws Exception {
+
+ final CollectionScope scope = collectionIoEvent.getEntityCollection();
+ final Id entityId = collectionIoEvent.getEvent().getId();
+ final UUID maxVersion = collectionIoEvent.getEvent().getVersion();
+
+
+ VersionIterator versionIterator =
+ new VersionIterator( logEntrySerializationStrategy, scope, entityId, maxVersion, 1000 );
+
+
+ UUID currentVersion = null;
+
+ //for every entry, we want to clean it up with listeners
+
+ while ( versionIterator.hasNext() ) {
+
+ currentVersion = versionIterator.next();
+
+
+ //execute all the listeners
+ for ( EntityVersionDeleted listener : listeners ) {
+ listener.versionDeleted( scope, entityId, currentVersion );
+ }
+
+ //we do multiple invocations on purpose. Our log is our source of versions, only delete from it
+ //after every successful invocation of listeners and entity removal
+ entitySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+ logEntrySerializationStrategy.delete( scope, entityId, currentVersion ).execute();
+
+
+ }
+
+
+ return collectionIoEvent;
+ }
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
new file mode 100644
index 0000000..323f12d
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/VersionIterator.java
@@ -0,0 +1,111 @@
+package org.apache.usergrid.persistence.collection.serialization.impl;
+
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccLogEntry;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+
+/**
+ * Iterator that will iterate all versions of the entity from the log from < the specified maxVersion
+ *
+ */
+public class VersionIterator implements Iterator<UUID> {
+
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final CollectionScope scope;
+ private final Id entityId;
+ private final int pageSize;
+
+
+ private Iterator<MvccLogEntry> elementItr;
+ private UUID nextStart;
+
+
+ /**
+ *
+ * @param logEntrySerializationStrategy The serialization strategy to get the log entries
+ * @param scope The scope of the entity
+ * @param entityId The id of the entity
+ * @param maxVersion The max version of the entity. Iterator will iterate from max to min starting with the version < max
+ * @param pageSize The fetch size to get when querying the serialization strategy
+ */
+ public VersionIterator( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final CollectionScope scope, final Id entityId, final UUID maxVersion,
+ final int pageSize ) {
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.scope = scope;
+ this.entityId = entityId;
+ this.nextStart = maxVersion;
+ this.pageSize = pageSize;
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ if ( elementItr == null || !elementItr.hasNext() && nextStart != null ) {
+ try {
+ advance();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to query cassandra", e );
+ }
+ }
+
+ return elementItr.hasNext();
+ }
+
+
+ @Override
+ public UUID next() {
+ if ( !hasNext() ) {
+ throw new NoSuchElementException( "No more elements exist" );
+ }
+
+ return elementItr.next().getVersion();
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Remove is unsupported" );
+ }
+
+
+ /**
+ * Advance our iterator
+ */
+ public void advance() throws ConnectionException {
+
+ final int requestedSize = pageSize + 1;
+
+ //loop through even entry that's < this one and remove it
+ List<MvccLogEntry> results = logEntrySerializationStrategy.load( scope, entityId, nextStart, requestedSize );
+
+ //we always remove the first version if it's equal since it's returned
+ if ( results.size() > 0 && results.get( 0 ).getVersion().equals( nextStart ) ) {
+ results.remove( 0 );
+ }
+
+
+ //we have results, set our next start
+ if ( results.size() == requestedSize ) {
+ nextStart = results.get( results.size() - 1 ).getVersion();
+ }
+ //nothing left to do
+ else {
+ nextStart = null;
+ }
+
+ elementItr = results.iterator();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
index 8184937..40ee5cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
@@ -32,6 +32,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
private final ListeningExecutorService executorService;
+ private final String name;
+ private final int poolSize;
+ private final int queueLength;
+
/**
* @param name The name of this instance of the task executor
@@ -44,11 +48,14 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
Preconditions.checkArgument( poolSize > 0, "poolSize must be > than 0" );
Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
+ this.name = name;
+ this.poolSize = poolSize;
+ this.queueLength = queueLength;
final BlockingQueue<Runnable> queue =
queueLength > 0 ? new ArrayBlockingQueue<Runnable>( queueLength ) : new SynchronousQueue<Runnable>();
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( name, poolSize, queue ) );
+ executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
}
@@ -105,11 +112,11 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* Create a thread pool that will reject work if our audit tasks become overwhelmed
*/
- private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+ private final class MaxSizeThreadPool extends ThreadPoolExecutor {
- public MaxSizeThreadPool( final String name, final int workerSize, BlockingQueue<Runnable> queue ) {
+ public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
- super( 1, workerSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( name ),
+ super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
new RejectedHandler() );
}
}
@@ -118,15 +125,10 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* Thread factory that will name and count threads for easier debugging
*/
- private static final class CountingThreadFactory implements ThreadFactory {
+ private final class CountingThreadFactory implements ThreadFactory {
private final AtomicLong threadCounter = new AtomicLong();
- private final String name;
-
-
- private CountingThreadFactory( final String name ) {this.name = name;}
-
@Override
public Thread newThread( final Runnable r ) {
@@ -144,12 +146,12 @@ public class NamedTaskExecutorImpl implements TaskExecutor {
/**
* The handler that will handle rejected executions and signal the interface
*/
- private static final class RejectedHandler implements RejectedExecutionHandler {
+ private final class RejectedHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "Audit queue full, rejecting audit task {}", r );
+ LOG.warn( "{} task queue full, rejecting task {}", name, r );
throw new RejectedExecutionException( "Unable to run task, queue full" );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
index 8b1ed22..518b461 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
@@ -27,7 +27,7 @@ public interface Task<V, I> extends Callable<V> {
* Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
* If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
* task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instanc\\\\\\\\\\\\\\\\\\\\\\hjn ) do so.
+ * request and process later (lazy repair for instance ) do so.
*
*/
void rejected();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index b5491bc..619ac14 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -1,6 +1,9 @@
package org.apache.usergrid.persistence.core.task;
+import com.google.common.util.concurrent.ListenableFuture;
+
+
/**
* An interface for execution of tasks
*/
@@ -10,5 +13,5 @@ public interface TaskExecutor {
* Submit the task asynchronously
* @param task
*/
- public <V, I> com.google.common.util.concurrent.ListenableFuture<V> submit( Task<V, I> task );
+ public <V, I> ListenableFuture<V> submit( Task<V, I> task );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c0b78b9f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
new file mode 100644
index 0000000..631de59
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/event/EdgeDeleted.java
@@ -0,0 +1,8 @@
+package org.apache.usergrid.persistence.graph.event;
+
+
+/**
+ *
+ *
+ */
+public interface EdgeDeleted {}