You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:19 UTC
[05/14] incubator-usergrid git commit: First pass at refactoring mark
+ sweep
First pass at refactoring mark + sweep
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/3e2afe23
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/3e2afe23
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/3e2afe23
Branch: refs/heads/USERGRID-641
Commit: 3e2afe23bb14761b395acbe1b6577ef024b0c3dd
Parents: 36b5bad
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 5 17:30:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Sun May 10 04:59:16 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 2 +-
.../corepersistence/CpEntityManagerFactory.java | 2 +-
.../corepersistence/CpRelationManager.java | 4 +-
.../migration/AppInfoMigrationPlugin.java | 3 +-
.../collection/EntityCollectionManager.java | 20 +-
.../cache/CachedEntityCollectionManager.java | 18 +-
.../collection/event/EntityDeleted.java | 45 --
.../collection/event/EntityVersionCreated.java | 39 -
.../collection/event/EntityVersionDeleted.java | 46 --
.../collection/guice/CollectionModule.java | 39 -
.../EntityCollectionManagerFactoryImpl.java | 31 +-
.../impl/EntityCollectionManagerImpl.java | 386 ++++------
.../collection/impl/EntityDeletedTask.java | 137 ----
.../impl/EntityVersionCleanupTask.java | 247 -------
.../impl/EntityVersionCreatedTask.java | 115 ---
.../impl/EntityVersionTaskFactory.java | 60 --
.../mvcc/stage/delete/UniqueCleanup.java | 133 ++++
.../mvcc/stage/delete/VersionCompact.java | 120 ++++
.../MvccLogEntrySerializationStrategy.java | 14 +
.../serialization/SerializationFig.java | 30 +-
.../serialization/impl/LogEntryIterator.java | 114 ---
.../impl/MinMaxLogEntryIterator.java | 121 ++++
.../MvccLogEntrySerializationProxyImpl.java | 14 +
.../MvccLogEntrySerializationStrategyImpl.java | 83 ++-
.../migration/MvccEntityDataMigrationImpl.java | 30 +-
.../collection/EntityCollectionManagerIT.java | 215 ++++--
.../impl/EntityVersionCleanupTaskTest.java | 715 -------------------
.../impl/EntityVersionCreatedTaskTest.java | 241 -------
.../mvcc/stage/delete/UniqueCleanupTest.java | 712 ++++++++++++++++++
.../mvcc/stage/delete/VersionCompactTest.java | 238 ++++++
.../impl/LogEntryIteratorTest.java | 132 ----
.../impl/MinMaxLogEntryIteratorTest.java | 131 ++++
...ccLogEntrySerializationStrategyImplTest.java | 132 +++-
.../collection/util/LogEntryMock.java | 103 +--
.../src/test/resources/log4j.properties | 1 +
.../core/executor/TaskExecutorFactory.java | 95 +++
.../persistence/core/rx/RxTaskScheduler.java | 2 -
.../core/task/NamedTaskExecutorImpl.java | 286 --------
.../usergrid/persistence/core/task/Task.java | 48 --
.../persistence/core/task/TaskExecutor.java | 41 --
.../core/task/NamedTaskExecutorImplTest.java | 271 -------
.../usergrid/persistence/graph/GraphFig.java | 26 +-
.../persistence/graph/GraphManager.java | 8 +-
.../persistence/graph/GraphManagerFactory.java | 2 +-
.../persistence/graph/SearchByEdge.java | 6 +
.../persistence/graph/SearchByEdgeType.java | 6 +
.../persistence/graph/guice/GraphModule.java | 11 -
.../graph/impl/GraphManagerImpl.java | 404 ++++-------
.../graph/impl/SimpleSearchByEdge.java | 40 +-
.../graph/impl/SimpleSearchByEdgeType.java | 29 +-
.../shard/impl/ShardGroupCompactionImpl.java | 90 ++-
.../graph/CommittedGraphManagerIT.java | 8 +-
.../persistence/graph/GraphManagerIT.java | 26 +-
.../graph/StorageGraphManagerIT.java | 8 +-
.../impl/shard/ShardGroupCompactionTest.java | 6 +-
55 files changed, 2464 insertions(+), 3422 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 460fc11..8bcc73a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -664,7 +664,7 @@ public class CpEntityManager implements EntityManager {
//delete it asynchronously
indexService.queueEntityDelete( applicationScope, entityId );
- return ecm.delete( entityId );
+ return ecm.mark( entityId );
}
else {
return Observable.empty();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 6c375ef..e7ad682 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -369,7 +369,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
final ApplicationEntityIndex aei = entityIndexFactory.createApplicationEntityIndex(applicationScope);
final GraphManager managementGraphManager = managerCache.getGraphManager(managementAppScope);
- final Observable deleteNodeGraph = managementGraphManager.deleteNode(applicationId, Long.MAX_VALUE);
+ final Observable deleteNodeGraph = managementGraphManager.markNode( applicationId, Long.MAX_VALUE );
final Observable deleteAppFromIndex = aei.deleteApplication();
return Observable.concat(copyConnections, deleteNodeGraph, deleteAppFromIndex)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 6adeefc..df3fa82 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -528,7 +528,7 @@ public class CpRelationManager implements RelationManager {
//run our delete
final Edge collectionToItemEdge = createCollectionEdge( cpHeadEntity.getId(), collName, memberEntity.getId() );
- gm.deleteEdge( collectionToItemEdge ).toBlocking().last();
+ gm.markEdge( collectionToItemEdge ).toBlocking().last();
/**
@@ -782,7 +782,7 @@ public class CpRelationManager implements RelationManager {
//delete all the edges
final Edge lastEdge =
- gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.deleteEdge( returnedEdge ) ).toBlocking()
+ gm.loadEdgeVersions( search ).flatMap( returnedEdge -> gm.markEdge( returnedEdge ) ).toBlocking()
.lastOrDefault( null );
if ( lastEdge != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
index 30955af..97b87b3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/AppInfoMigrationPlugin.java
@@ -42,7 +42,6 @@ import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.functions.Func1;
import java.util.HashMap;
import java.util.Map;
@@ -200,7 +199,7 @@ public class AppInfoMigrationPlugin implements MigrationPlugin {
final ApplicationScope systemAppScope = getApplicationScope(CpNamingUtils.SYSTEM_APP_ID );
final EntityCollectionManager systemCollectionManager =
entityCollectionManagerFactory.createCollectionManager( systemAppScope );
- systemCollectionManager.delete(new SimpleId(uuid, "appinfos")).toBlocking().last();
+ systemCollectionManager.mark( new SimpleId( uuid, "appinfos" ) ).toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 5a329e3..9de8f41 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -46,11 +46,12 @@ public interface EntityCollectionManager {
/**
- * @param entityId MarkCommit the entity as deleted
+ * @param entityId MarkCommit the entity as deleted. Will not actually remove it from cassandra. This operation will
+ * also remove all unique properties for this entity
*
* @return The observable of the id after the operation has completed
*/
- Observable<Id> delete( Id entityId );
+ Observable<Id> mark( Id entityId );
/**
* @param entityId The entity id to load.
@@ -96,6 +97,21 @@ public interface EntityCollectionManager {
*/
Observable<EntitySet> load( Collection<Id> entityIds );
+ /**
+ * Get all versions of the log entry, from Max to min
+ * @param entityId
+ * @return An observable stream of mvccLog entries
+ */
+ Observable<MvccLogEntry> getVersions(final Id entityId);
+
+ /**
+ * Delete these versions from cassandra. Must be atomic so that read log entries are only removed. Entity data
+ * and log entry will be deleted
+ * @param entries
+ * @return Any observable of all successfully compacted log entries
+ */
+ Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries );
+
/**
* Returns health of entity data store.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index fa35580..7a04b8d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.FieldSet;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -84,8 +85,8 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
@Override
- public Observable<Id> delete( final Id entityId ) {
- return targetEntityCollectionManager.delete( entityId ).doOnNext( new Action1<Id>() {
+ public Observable<Id> mark( final Id entityId ) {
+ return targetEntityCollectionManager.mark( entityId ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id id ) {
entityCache.invalidate( id );
@@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
return targetEntityCollectionManager.load( entityIds );
}
+
+ @Override
+ public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+ return targetEntityCollectionManager.getVersions( entityId );
+ }
+
+
+ @Override
+ public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+ return targetEntityCollectionManager.delete( entries );
+ }
+
+
@Override
public Health getHealth() {
return targetEntityCollectionManager.getHealth();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
deleted file mode 100644
index 0e9b62e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed.
- * If any listener fails with an exception, the entity will not be removed.
- *
- */
-public interface EntityDeleted {
-
-
- /**
- * The event fired when an entity is deleted
- *
- * @param scope The scope of the entity
- * @param entityId The id of the entity
- * @param version the entity version
- */
- public void deleted( final ApplicationScope scope, final Id entityId, final UUID version);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
deleted file mode 100644
index 7f1be1a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Invoked after a new version of an entity has been created.
- * The entity should be a complete view of the entity.
- */
-public interface EntityVersionCreated {
-
- /**
- * The new version of the entity. Note that this should be a fully merged view of the entity.
- * In the case of partial updates, the passed entity should be fully merged with it's previous
- * entries.
- * @param scope The scope of the entity
- * @param entity The fully loaded and merged entity
- */
- public void versionCreated( final ApplicationScope scope, final Entity entity );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
deleted file mode 100644
index 7fd8fe7..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity version is removed. Note that this is not a deletion of the entity
- * itself, only the version itself.
- *
- */
-public interface EntityVersionDeleted {
-
- /**
- * The version specified was removed.
- *
- * @param scope The scope of the entity
- * @param entityId The entity Id that was removed
- * @param entityVersions The versions that are to be removed
- */
- public void versionDeleted(final ApplicationScope scope, final Id entityId,
- final List<MvccLogEntry> entityVersions);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index b256a44..78c7f37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.multibindings.Multibinder;
/**
@@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule {
install( new SerializationModule() );
install( new ServiceModule() );
- install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
-
// users of this module can add their own implemementations
- // for more information: https://github.com/google/guice/wiki/Multibindings
-
- Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
- Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
- Multibinder.newSetBinder( binder(), EntityDeleted.class );
-
// create a guice factor for getting our collection manager
bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
@@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule {
configureMigrationProvider();
}
-//
-// @Provides
-// @Singleton
-// @Inject
-// public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
-// final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-//
-// return writeStart;
-// }
-
- @Inject
- @Singleton
- @Provides
- @CollectionTaskExecutor
- public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks",
- serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
- }
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 761c4b5..50a4bfc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -27,9 +27,10 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -37,11 +38,11 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@ -67,12 +68,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final RollbackAction rollback;
private final MarkStart markStart;
private final MarkCommit markCommit;
+ private final UniqueCleanup uniqueCleanup;
+ private final VersionCompact versionCompact;
+ private final SerializationFig serializationFig;
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final TaskExecutor taskExecutor;
private final EntityCacheFig entityCacheFig;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
@@ -84,10 +86,11 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
//create the target EM that will perform logic
final EntityCollectionManager target = new EntityCollectionManagerImpl(
writeStart, writeVerifyUnique,
- writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
+ writeOptimisticVerify, writeCommit, rollback, markStart, markCommit, uniqueCleanup, versionCompact,
entitySerializationStrategy, uniqueValueSerializationStrategy,
- mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory,
- rxTaskScheduler );
+ mvccLogEntrySerializationStrategy, keyspace,
+ metricsFactory, serializationFig,
+ rxTaskScheduler, scope );
final EntityCollectionManager proxy = new CachedEntityCollectionManager(entityCacheFig, target );
@@ -102,13 +105,12 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit, final RollbackAction rollback,
final MarkStart markStart, final MarkCommit markCommit,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+ final SerializationFig serializationFig, final
+ MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace,
- final EntityVersionTaskFactory entityVersionTaskFactory,
- @CollectionTaskExecutor final TaskExecutor taskExecutor, final
- EntityCacheFig entityCacheFig,
+ final Keyspace keyspace, final EntityCacheFig entityCacheFig,
MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
this.writeStart = writeStart;
@@ -118,12 +120,13 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.rollback = rollback;
this.markStart = markStart;
this.markCommit = markCommit;
+ this.uniqueCleanup = uniqueCleanup;
+ this.versionCompact = versionCompact;
+ this.serializationFig = serializationFig;
this.entitySerializationStrategy = entitySerializationStrategy;
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
this.keyspace = keyspace;
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.taskExecutor = taskExecutor;
this.entityCacheFig = entityCacheFig;
this.metricsFactory = metricsFactory;
this.rxTaskScheduler = rxTaskScheduler;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6f10e86..7a32d72 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.collection.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
@@ -32,11 +33,13 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.FieldSet;
import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.UniqueCleanup;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteOptimisticVerify;
@@ -44,15 +47,17 @@ import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteUniqueVerify;
import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.MinMaxLogEntryIterator;
import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -60,7 +65,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -73,13 +77,9 @@ import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
-import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
/**
@@ -97,6 +97,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final WriteOptimisticVerify writeOptimisticVerify;
private final WriteCommit writeCommit;
private final RollbackAction rollback;
+ private final UniqueCleanup uniqueCleanup;
+ private final VersionCompact versionCompact;
//delete stages
@@ -107,41 +109,39 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final TaskExecutor taskExecutor;
+ private final SerializationFig serializationFig;
- private final RxTaskScheduler rxTaskScheduler;
private final Keyspace keyspace;
private final Timer writeTimer;
- private final Meter writeMeter;
private final Timer deleteTimer;
+ private final Timer fieldIdTimer;
+ private final Timer fieldEntityTimer;
private final Timer updateTimer;
private final Timer loadTimer;
private final Timer getLatestTimer;
- private final Meter deleteMeter;
- private final Meter getLatestMeter;
- private final Meter loadMeter;
- private final Meter updateMeter;
private final ApplicationScope applicationScope;
+ private final RxTaskScheduler rxTaskScheduler;
@Inject
public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit
- writeCommit,
- final RollbackAction rollback, final MarkStart markStart,
- final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit, final RollbackAction rollback,
+ final MarkStart markStart, final MarkCommit markCommit,
+ final UniqueCleanup uniqueCleanup, final VersionCompact versionCompact,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory,
- @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope,
- final MetricsFactory metricsFactory,
-
- final RxTaskScheduler rxTaskScheduler ) {
+ final Keyspace keyspace, final MetricsFactory metricsFactory,
+ final SerializationFig serializationFig, final RxTaskScheduler rxTaskScheduler,
+ @Assisted final ApplicationScope applicationScope ) {
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
+ this.uniqueCleanup = uniqueCleanup;
+ this.versionCompact = versionCompact;
+ this.serializationFig = serializationFig;
this.rxTaskScheduler = rxTaskScheduler;
ValidationUtils.validateApplicationScope( applicationScope );
@@ -158,21 +158,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.keyspace = keyspace;
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.taskExecutor = taskExecutor;
this.applicationScope = applicationScope;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
- this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
- this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
- this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
- this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter");
- this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer");
- this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter");
- this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
- this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter");
- this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
- this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter");
+ this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" );
+ this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" );
+ this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" );
+ this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" );
+ this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" );
+ this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" );
+ this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" );
}
@@ -192,74 +187,26 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- // execute all validation stages concurrently. Needs refactored when this is done.
- // https://github.com/Netflix/RxJava/issues/627
- // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
- // writeVerifyUnique, writeOptimisticVerify );
- final Timer.Context timer = writeTimer.time();
- return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
- @Override
- public void call(final Entity entity) {
- //TODO fire the created task first then the entityVersioncleanup
- taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity ));
- taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId,
- entity.getVersion(), false ));
- //post-processing to come later. leave it empty for now.
- }
- }).doOnError( rollback )
- .doOnEach( new Action1<Notification<? super Entity>>() {
- @Override
- public void call( Notification<? super Entity> notification ) {
- writeMeter.mark();
- }
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ final Observable<Entity> write = observable.map( writeCommit );
+
+ return ObservableTimer.time( write, writeTimer );
}
@Override
- public Observable<Id> delete( final Id entityId ) {
+ public Observable<Id> mark( final Id entityId ) {
Preconditions.checkNotNull( entityId, "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
- final Timer.Context timer = deleteTimer.time();
- Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) )
- .map(markStart)
- .doOnNext( markCommit )
- .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
- @Override
- public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
- MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
- Task<Void> task = entityVersionTaskFactory
- .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() );
- taskExecutor.submit(task);
- return entity.getId();
- }
- }
- )
- .doOnNext(new Action1<Id>() {
- @Override
- public void call(Id id) {
- deleteMeter.mark();
- }
- })
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ Observable<Id> o = Observable.just( new CollectionIoEvent<>( applicationScope, entityId ) ).map( markStart )
+ .doOnNext( markCommit ).compose( uniqueCleanup ).map(
+ entityEvent -> entityEvent.getEvent().getId() );
+
- return o;
+ return ObservableTimer.time( o, deleteTimer );
}
@@ -270,35 +217,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- final Timer.Context timer = loadTimer.time();
- return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
- @Override
- public Observable<Entity> call(final EntitySet entitySet) {
- final MvccEntity entity = entitySet.getEntity(entityId);
-
- if (entity == null || !entity.getEntity().isPresent()) {
- return Observable.empty();
- }
+ final Observable<Entity> entityObservable = load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+ final MvccEntity entity = entitySet.getEntity( entityId );
- return Observable.just( entity.getEntity().get() );
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ return Observable.empty();
}
- })
- .doOnNext( new Action1<Entity>() {
- @Override
- public void call( Entity entity ) {
- loadMeter.mark();
- }
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
- }
+ return Observable.just( entity.getEntity().get() );
+ } );
+ return ObservableTimer.time( entityObservable, loadTimer );
+ }
@Override
@@ -306,15 +237,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
- final Timer.Context timer = loadTimer.time();
-
- return Observable.create( new Observable.OnSubscribe<EntitySet>() {
+ final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
@Override
public void call( final Subscriber<? super EntitySet> subscriber ) {
try {
final EntitySet results =
- entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+ entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
subscriber.onNext( results );
subscriber.onCompleted();
@@ -323,162 +252,167 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } )
- .doOnNext(new Action1<EntitySet>() {
- @Override
- public void call(EntitySet entitySet) {
- loadMeter.mark();
- }
- })
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ } );
+
+
+ return ObservableTimer.time( entitySetObservable, loadTimer );
}
@Override
- public Observable<Id> getIdField(final String type, final Field field ) {
- final List<Field> fields = Collections.singletonList( field );
- return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
+ public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+ ValidationUtils.verifyIdentity( entityId );
+
+ return Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
@Override
- public Id call( Field field ) {
- try {
- final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
- final UniqueValue value = set.getValue( field.getName() );
- return value == null ? null : value.getEntityId();
- }
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
- }
+ protected Iterator<MvccLogEntry> getIterator() {
+ return new MinMaxLogEntryIterator( mvccLogEntrySerializationStrategy, applicationScope, entityId,
+ serializationFig.getBufferSize() );
}
} );
}
+ @Override
+ public Observable<MvccLogEntry> delete( final Collection<MvccLogEntry> entries ) {
+ Preconditions.checkNotNull( entries, "entries must not be null" );
+
+
+ return Observable.from( entries ).map( logEntry -> new CollectionIoEvent<>( applicationScope, logEntry ) )
+ .compose( versionCompact ).map( event -> event.getEvent() );
+ }
+
+
+ @Override
+ public Observable<Id> getIdField( final String type, final Field field ) {
+ final List<Field> fields = Collections.singletonList( field );
+ final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
+ try {
+ final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
+ final UniqueValue value = set.getValue( field1.getName() );
+ return value == null ? null : value.getEntityId();
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
+ }
+ } );
+
+ return ObservableTimer.time( idObservable, fieldIdTimer );
+ }
+
+
/**
* Retrieves all entities that correspond to each field given in the Collection.
- * @param fields
- * @return
*/
@Override
- public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) {
- return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
- @Override
- public FieldSet call( Collection<Field> fields ) {
- try {
-
- final UUID startTime = UUIDGenerator.newTimeUUID();
+ public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) {
+ final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> {
+ try {
- //Get back set of unique values that correspond to collection of fields
- UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type, fields );
+ final UUID startTime = UUIDGenerator.newTimeUUID();
- //Short circut if we don't have any uniqueValues from the given fields.
- if(!set.iterator().hasNext()){
- return new MutableFieldSet( 0 );
- }
+ //Get back set of unique values that correspond to collection of fields
+ UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 );
+ //Short circuit if we don't have any uniqueValues from the given fields.
+ if ( !set.iterator().hasNext() ) {
+ return new MutableFieldSet( 0 );
+ }
- //loop through each field, and construct an entity load
- List<Id> entityIds = new ArrayList<>(fields.size());
- List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
- for(final Field expectedField: fields) {
+ //loop through each field, and construct an entity load
+ List<Id> entityIds = new ArrayList<>( fields1.size() );
+ List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
- UniqueValue value = set.getValue(expectedField.getName());
+ for ( final Field expectedField : fields1 ) {
- if(value ==null){
- logger.debug( "Field does not correspond to a unique value" );
- }
+ UniqueValue value = set.getValue( expectedField.getName() );
- entityIds.add(value.getEntityId());
- uniqueValues.add(value);
+ if ( value == null ) {
+ logger.debug( "Field does not correspond to a unique value" );
}
- //Load a entity for each entityId we retrieved.
- final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime);
-
- //now loop through and ensure the entities are there.
- final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
-
- final MutableFieldSet response = new MutableFieldSet(fields.size());
+ entityIds.add( value.getEntityId() );
+ uniqueValues.add( value );
+ }
- for(final UniqueValue expectedUnique: uniqueValues) {
- final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+ //Load a entity for each entityId we retrieved.
+ final EntitySet entitySet = entitySerializationStrategy.load( applicationScope, entityIds, startTime );
- //bad unique value, delete this, it's inconsistent
- if(entity == null || !entity.getEntity().isPresent()){
- final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique);
- deleteBatch.mergeShallow(valueDelete);
- continue;
- }
+ //now loop through and ensure the entities are there.
+ final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+ final MutableFieldSet response = new MutableFieldSet( fields1.size() );
- //else add it to our result set
- response.addEntity(expectedUnique.getField(),entity);
+ for ( final UniqueValue expectedUnique : uniqueValues ) {
+ final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
+ //bad unique value, delete this, it's inconsistent
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ final MutationBatch valueDelete =
+ uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+ deleteBatch.mergeShallow( valueDelete );
+ continue;
}
- //TODO: explore making this an Async process
- //We'll repair it again if we have to
- deleteBatch.execute();
- return response;
+ //else add it to our result set
+ response.addEntity( expectedUnique.getField(), entity );
+ }
+ //TODO: explore making this an Async process
+ //We'll repair it again if we have to
+ deleteBatch.execute();
- }
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
- }
+ return response;
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
}
} );
- }
+ return ObservableTimer.time( fieldSetObservable, fieldEntityTimer );
+ }
// fire the stages
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
- return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
-
- @Override
- public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
- Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( writeVerifyUnique );
+ Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( writeVerifyUnique );
- // optimistic verification
- Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( writeOptimisticVerify );
+ // optimistic verification
+ Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( writeOptimisticVerify );
+ final Observable<CollectionIoEvent<MvccEntity>> zip =
+ Observable.zip( uniqueObservable, optimisticObservable, ( unique, optimistic ) -> optimistic );
- //wait for both to finish
- Observable.merge( unique, optimistic ).toBlocking().last();
- }
- } );
+ return zip;
+ } );
}
@Override
public Observable<VersionSet> getLatestVersion( final Collection<Id> entityIds ) {
- final Timer.Context timer = getLatestTimer.time();
- return Observable.create( new Observable.OnSubscribe<VersionSet>() {
+
+ final Observable<VersionSet> observable = Observable.create( new Observable.OnSubscribe<VersionSet>() {
@Override
public void call( final Subscriber<? super VersionSet> subscriber ) {
try {
final VersionSet logEntries = mvccLogEntrySerializationStrategy
- .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+ .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
subscriber.onNext( logEntries );
subscriber.onCompleted();
@@ -487,13 +421,9 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ } );
+
+ return ObservableTimer.time( observable, getLatestTimer );
}
@@ -502,11 +432,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
try {
ColumnFamily<String, String> CF_SYSTEM_LOCAL =
- new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
- StringSerializer.get() );
+ new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+ StringSerializer.get() );
OperationResult<CqlResult<String, String>> result =
- keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
+ keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
if ( result.getResult().getRows().size() == 1 ) {
return Health.GREEN;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
deleted file mode 100644
index 1753d26..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires Cleanup Task
- */
-public class EntityDeletedTask implements Task<Void> {
- private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class);
-
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private final MvccEntitySerializationStrategy entitySerializationStrategy;
- private final Set<EntityDeleted> listeners;
- private final ApplicationScope collectionScope;
- private final Id entityId;
- private final UUID version;
-
-
- @Inject
- public EntityDeletedTask(
- EntityVersionTaskFactory entityVersionTaskFactory,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject
- @Assisted final ApplicationScope collectionScope,
- @Assisted final Id entityId,
- @Assisted final UUID version) {
-
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.entitySerializationStrategy = entitySerializationStrategy;
- this.listeners = listeners;
- this.collectionScope = collectionScope;
- this.entityId = entityId;
- this.version = version;
- }
-
-
- @Override
- public void exceptionThrown(Throwable throwable) {
- LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { collectionScope, entityId, version }, throwable );
- }
-
-
- @Override
- public Void rejected() {
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
-
- entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
-
- fireEvents();
- final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
- final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
-
- entityDelete.execute();
- logDelete.execute();
-//
- return null;
- }
-
-
- private void fireEvents() {
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().deleted( collectionScope, entityId,version );
- return;
- }
-
- LOG.debug( "Started firing {} listeners", listenerSize );
-
- //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
- Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.deleted( collectionScope, entityId, version );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
- LOG.debug( "Finished firing {} listeners", listenerSize );
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
deleted file mode 100644
index b5f9085..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.BlockingObservable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
- * retained, the range is exclusive.
- */
-public class EntityVersionCleanupTask implements Task<Void> {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
-
- private final Set<EntityVersionDeleted> listeners;
-
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
- private final Keyspace keyspace;
-
- private final SerializationFig serializationFig;
-
- private final ApplicationScope scope;
- private final Id entityId;
- private final UUID version;
- private final boolean includeVersion;
-
-
- @Inject
- public EntityVersionCleanupTask(
- final SerializationFig serializationFig,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final Keyspace keyspace,
- final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject
- @Assisted final ApplicationScope scope,
- @Assisted final Id entityId,
- @Assisted final UUID version,
- @Assisted final boolean includeVersion) {
-
- this.serializationFig = serializationFig;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
- this.keyspace = keyspace;
- this.listeners = listeners;
- this.scope = scope;
- this.entityId = entityId;
- this.version = version;
-
- this.includeVersion = includeVersion;
- }
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- logger.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { scope, entityId, version }, throwable );
- }
-
-
- @Override
- public Void rejected() {
- //Our task was rejected meaning our queue was full. We need this operation to run,
- // so we'll run it in our current thread
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
- //TODO Refactor this logic into a a class that can be invoked from anywhere
- //iterate all unique values
- final BlockingObservable<Long> uniqueValueCleanup =
- Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
- @Override
- protected Iterator<UniqueValue> getIterator() {
- return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
- }
- } )
-
- //skip current versions
- .skipWhile( new Func1<UniqueValue, Boolean>() {
- @Override
- public Boolean call( final UniqueValue uniqueValue ) {
- return !includeVersion && version.equals( uniqueValue.getEntityVersion() );
- }
- } )
- //buffer our buffer size, then roll them all up in a single batch mutation
- .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
- @Override
- public void call( final List<UniqueValue> uniqueValues ) {
- final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
- for ( UniqueValue value : uniqueValues ) {
- uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
- }
-
- try {
- uniqueCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
- }
- } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-
- //start calling the listeners for remove log entries
- BlockingObservable<Long> versionsDeletedObservable =
-
- Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
- @Override
- protected Iterator<MvccLogEntry> getIterator() {
-
- return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
- serializationFig.getBufferSize() );
- }
- } )
- //skip current version
- .skipWhile( new Func1<MvccLogEntry, Boolean>() {
- @Override
- public Boolean call( final MvccLogEntry mvccLogEntry ) {
- return !includeVersion && version.equals( mvccLogEntry.getVersion() );
- }
- } )
- //buffer them for efficiency
- .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
- @Override
- public void call( final List<MvccLogEntry> mvccEntities ) {
-
- fireEvents( mvccEntities );
-
- final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
-
-
- for ( MvccLogEntry entry : mvccEntities ) {
- logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
- }
-
- try {
- logCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
- }
- } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
- //wait or this to complete
- final Long removedCount = uniqueValueCleanup.last();
-
- logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
-
- final Long versionCleanupCount = versionsDeletedObservable.last();
-
- logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
-
- return null;
- }
-
-
- private void fireEvents( final List<MvccLogEntry> versions ) {
-
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().versionDeleted( scope, entityId, versions );
- return;
- }
-
- logger.debug( "Started firing {} listeners", listenerSize );
-
- //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-
-
- //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
- Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.versionDeleted( scope, entityId, versions );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-
- logger.debug( "Finished firing {} listeners", listenerSize );
- }
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
deleted file mode 100644
index fbbcdbd..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires events so that all EntityVersionCreated handlers area called.
- */
-public class EntityVersionCreatedTask implements Task<Void> {
- private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class );
-
- private Set<EntityVersionCreated> listeners;
- private final ApplicationScope collectionScope;
- private final Entity entity;
-
-
- @Inject
- public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope,
- final Set<EntityVersionCreated> listeners,
- @Assisted final Entity entity ) {
-
- this.listeners = listeners;
- this.collectionScope = collectionScope;
- this.entity = entity;
- }
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- logger.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { collectionScope, entity}, throwable );
- }
-
-
- @Override
- public Void rejected() {
-
- // Our task was rejected meaning our queue was full.
- // We need this operation to run, so we'll run it in our current thread
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
-
- fireEvents();
- return null;
- }
-
-
- private void fireEvents() {
-
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().versionCreated( collectionScope, entity );
- return;
- }
-
- logger.debug( "Started firing {} listeners", listenerSize );
-
-
- Observable.from( listeners )
- .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.versionCreated( collectionScope, entity );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
- logger.debug( "Finished firing {} listeners", listenerSize );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
deleted file mode 100644
index 51a4607..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface EntityVersionTaskFactory {
-
- /**
- * Get a task for cleaning up latent entity data. If includeVersion = true, the passed version will be cleaned up as well
- * Otherwise this is a V-1 operation
- *
- * @param scope
- * @param entityId
- * @param version
- * @param includeVersion
- * @return
- */
- EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version,
- final boolean includeVersion );
-
- /**
- * Get an entityVersionCreatedTask
- * @param scope
- * @param entity
- * @return
- */
- EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity );
-
- /**
- * Get an entity deleted task
- * @param collectionScope
- * @param entityId
- * @param version
- * @return
- */
- EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version );
-
-}