You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/07 18:40:59 UTC
[4/5] incubator-usergrid git commit: First pass at refactoring mark +
sweep
First pass at refactoring mark + sweep
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/45aed6cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/45aed6cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/45aed6cc
Branch: refs/heads/USERGRID-614
Commit: 45aed6ccd5b459aa6c52ad10f1f7f04e1d5cb1f5
Parents: 36b5bad
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 5 17:30:08 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 5 17:30:08 2015 -0600
----------------------------------------------------------------------
.../collection/EntityCollectionManager.java | 14 +
.../cache/CachedEntityCollectionManager.java | 14 +
.../collection/event/EntityDeleted.java | 45 --
.../collection/event/EntityVersionCreated.java | 39 -
.../collection/event/EntityVersionDeleted.java | 46 --
.../collection/guice/CollectionModule.java | 39 -
.../EntityCollectionManagerFactoryImpl.java | 13 +-
.../impl/EntityCollectionManagerImpl.java | 352 ++++-----
.../collection/impl/EntityDeletedTask.java | 137 ----
.../impl/EntityVersionCleanupTask.java | 247 -------
.../impl/EntityVersionCreatedTask.java | 115 ---
.../impl/EntityVersionTaskFactory.java | 60 --
.../mvcc/stage/delete/UniqueCleanup.java | 133 ++++
.../mvcc/stage/delete/VersionCompact.java | 120 ++++
.../MvccLogEntrySerializationStrategy.java | 14 +
.../serialization/impl/LogEntryIterator.java | 114 ---
.../serialization/impl/LogEntryObservable.java | 54 ++
.../impl/MinMaxLogEntryIterator.java | 114 +++
.../MvccLogEntrySerializationProxyImpl.java | 14 +
.../MvccLogEntrySerializationStrategyImpl.java | 57 +-
.../migration/MvccEntityDataMigrationImpl.java | 30 +-
.../impl/EntityVersionCleanupTaskTest.java | 715 -------------------
.../impl/EntityVersionCreatedTaskTest.java | 241 -------
.../mvcc/stage/delete/UniqueCleanupTest.java | 712 ++++++++++++++++++
.../mvcc/stage/delete/VersionCompactTest.java | 238 ++++++
.../impl/LogEntryIteratorTest.java | 132 ----
.../impl/MinMaxLogEntryIteratorTest.java | 134 ++++
...ccLogEntrySerializationStrategyImplTest.java | 85 ++-
.../core/executor/TaskExecutorFactory.java | 95 +++
.../persistence/core/rx/RxTaskScheduler.java | 2 -
.../core/task/NamedTaskExecutorImpl.java | 286 --------
.../usergrid/persistence/core/task/Task.java | 48 --
.../persistence/core/task/TaskExecutor.java | 41 --
.../core/task/NamedTaskExecutorImplTest.java | 271 -------
.../persistence/graph/GraphManager.java | 8 +-
.../persistence/graph/GraphManagerFactory.java | 2 +-
.../persistence/graph/SearchByEdge.java | 6 +
.../persistence/graph/SearchByEdgeType.java | 6 +
.../persistence/graph/guice/GraphModule.java | 11 -
.../graph/impl/GraphManagerImpl.java | 404 ++++-------
.../graph/impl/SimpleSearchByEdge.java | 40 +-
.../graph/impl/SimpleSearchByEdgeType.java | 29 +-
.../shard/impl/ShardGroupCompactionImpl.java | 90 ++-
.../graph/CommittedGraphManagerIT.java | 8 +-
.../persistence/graph/GraphManagerIT.java | 26 +-
.../graph/StorageGraphManagerIT.java | 8 +-
.../impl/shard/ShardGroupCompactionTest.java | 6 +-
47 files changed, 2196 insertions(+), 3219 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 5a329e3..8c27825 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -96,6 +96,20 @@ public interface EntityCollectionManager {
*/
Observable<EntitySet> load( Collection<Id> entityIds );
+ /**
+ * Get all versions of the log entry, from Max to min
+ * @param entityId
+ * @return An observable stream of mvccLog entries
+ */
+ Observable<MvccLogEntry> getVersions(final Id entityId);
+
+ /**
+ * Remove these versions. Must be atomic so that read log entries are removed
+ * @param entries
+ * @return Any observable of all successfully compacted log entries
+ */
+ Observable<MvccLogEntry> compact(final Collection<MvccLogEntry> entries);
+
/**
* Returns health of entity data store.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
index fa35580..9412516 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/cache/CachedEntityCollectionManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.FieldSet;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -125,6 +126,19 @@ public class CachedEntityCollectionManager implements EntityCollectionManager {
return targetEntityCollectionManager.load( entityIds );
}
+
+ @Override
+ public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+ return null;
+ }
+
+
+ @Override
+ public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
+ return targetEntityCollectionManager.compact( entries );
+ }
+
+
@Override
public Health getHealth() {
return targetEntityCollectionManager.getHealth();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
deleted file mode 100644
index 0e9b62e..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity is deleted. The delete log entry is not removed until all instances of this listener has completed.
- * If any listener fails with an exception, the entity will not be removed.
- *
- */
-public interface EntityDeleted {
-
-
- /**
- * The event fired when an entity is deleted
- *
- * @param scope The scope of the entity
- * @param entityId The id of the entity
- * @param version the entity version
- */
- public void deleted( final ApplicationScope scope, final Id entityId, final UUID version);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
deleted file mode 100644
index 7f1be1a..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionCreated.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-
-/**
- * Invoked after a new version of an entity has been created.
- * The entity should be a complete view of the entity.
- */
-public interface EntityVersionCreated {
-
- /**
- * The new version of the entity. Note that this should be a fully merged view of the entity.
- * In the case of partial updates, the passed entity should be fully merged with it's previous
- * entries.
- * @param scope The scope of the entity
- * @param entity The fully loaded and merged entity
- */
- public void versionCreated( final ApplicationScope scope, final Entity entity );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
deleted file mode 100644
index 7fd8fe7..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityVersionDeleted.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event;
-
-
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-/**
- *
- * Invoked when an entity version is removed. Note that this is not a deletion of the entity
- * itself, only the version itself.
- *
- */
-public interface EntityVersionDeleted {
-
- /**
- * The version specified was removed.
- *
- * @param scope The scope of the entity
- * @param entityId The entity Id that was removed
- * @param entityVersions The versions that are to be removed
- */
- public void versionDeleted(final ApplicationScope scope, final Id entityId,
- final List<MvccLogEntry> entityVersions);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index b256a44..78c7f37 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -22,27 +22,14 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
-import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGenerator;
import org.apache.usergrid.persistence.collection.mvcc.changelog.ChangeLogGeneratorImpl;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
-import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyImpl;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.multibindings.Multibinder;
/**
@@ -61,15 +48,7 @@ public abstract class CollectionModule extends AbstractModule {
install( new SerializationModule() );
install( new ServiceModule() );
- install ( new FactoryModuleBuilder().build( EntityVersionTaskFactory.class ));
-
// users of this module can add their own implemementations
- // for more information: https://github.com/google/guice/wiki/Multibindings
-
- Multibinder.newSetBinder( binder(), EntityVersionDeleted.class );
- Multibinder.newSetBinder( binder(), EntityVersionCreated.class );
- Multibinder.newSetBinder( binder(), EntityDeleted.class );
-
// create a guice factor for getting our collection manager
bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
@@ -81,24 +60,6 @@ public abstract class CollectionModule extends AbstractModule {
configureMigrationProvider();
}
-//
-// @Provides
-// @Singleton
-// @Inject
-// public WriteStart write (final MvccLogEntrySerializationStrategy logStrategy) {
-// final WriteStart writeStart = new WriteStart( logStrategy, MvccEntity.Status.COMPLETE);
-//
-// return writeStart;
-// }
-
- @Inject
- @Singleton
- @Provides
- @CollectionTaskExecutor
- public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks",
- serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
- }
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index 761c4b5..c4422f7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -27,7 +27,6 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.cache.CachedEntityCollectionManager;
import org.apache.usergrid.persistence.collection.cache.EntityCacheFig;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.RollbackAction;
@@ -41,7 +40,6 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
@@ -71,8 +69,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
private final Keyspace keyspace;
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final TaskExecutor taskExecutor;
private final EntityCacheFig entityCacheFig;
private final MetricsFactory metricsFactory;
private final RxTaskScheduler rxTaskScheduler;
@@ -86,7 +82,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
writeStart, writeVerifyUnique,
writeOptimisticVerify, writeCommit, rollback, markStart, markCommit,
entitySerializationStrategy, uniqueValueSerializationStrategy,
- mvccLogEntrySerializationStrategy, keyspace,entityVersionTaskFactory, taskExecutor, scope, metricsFactory,
+ mvccLogEntrySerializationStrategy, keyspace, scope, metricsFactory,
rxTaskScheduler );
@@ -105,10 +101,7 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace,
- final EntityVersionTaskFactory entityVersionTaskFactory,
- @CollectionTaskExecutor final TaskExecutor taskExecutor, final
- EntityCacheFig entityCacheFig,
+ final Keyspace keyspace, final EntityCacheFig entityCacheFig,
MetricsFactory metricsFactory, final RxTaskScheduler rxTaskScheduler ) {
this.writeStart = writeStart;
@@ -122,8 +115,6 @@ public class EntityCollectionManagerFactoryImpl implements EntityCollectionManag
this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
this.keyspace = keyspace;
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.taskExecutor = taskExecutor;
this.entityCacheFig = entityCacheFig;
this.metricsFactory = metricsFactory;
this.rxTaskScheduler = rxTaskScheduler;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6f10e86..83c2035 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -32,8 +32,8 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntitySet;
import org.apache.usergrid.persistence.collection.FieldSet;
import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
@@ -49,10 +49,9 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
import org.apache.usergrid.persistence.collection.serialization.impl.MutableFieldSet;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.model.entity.Entity;
@@ -60,7 +59,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
@@ -73,13 +71,9 @@ import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.CqlResult;
import com.netflix.astyanax.serializers.StringSerializer;
-import rx.Notification;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.schedulers.Schedulers;
/**
@@ -107,36 +101,31 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final MvccEntitySerializationStrategy entitySerializationStrategy;
private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final TaskExecutor taskExecutor;
private final RxTaskScheduler rxTaskScheduler;
private final Keyspace keyspace;
private final Timer writeTimer;
- private final Meter writeMeter;
private final Timer deleteTimer;
+ private final Timer fieldIdTimer;
+ private final Timer fieldEntityTimer;
private final Timer updateTimer;
private final Timer loadTimer;
private final Timer getLatestTimer;
- private final Meter deleteMeter;
- private final Meter getLatestMeter;
- private final Meter loadMeter;
- private final Meter updateMeter;
private final ApplicationScope applicationScope;
+
@Inject
public EntityCollectionManagerImpl( final WriteStart writeStart, final WriteUniqueVerify writeVerifyUnique,
- final WriteOptimisticVerify writeOptimisticVerify, final WriteCommit
- writeCommit,
- final RollbackAction rollback, final MarkStart markStart,
- final MarkCommit markCommit, final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit, final RollbackAction rollback,
+ final MarkStart markStart, final MarkCommit markCommit,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
- final Keyspace keyspace, final EntityVersionTaskFactory entityVersionTaskFactory,
- @CollectionTaskExecutor final TaskExecutor taskExecutor, @Assisted final ApplicationScope applicationScope,
+ final Keyspace keyspace, @Assisted final ApplicationScope applicationScope,
final MetricsFactory metricsFactory,
final RxTaskScheduler rxTaskScheduler ) {
@@ -158,21 +147,16 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.keyspace = keyspace;
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.taskExecutor = taskExecutor;
this.applicationScope = applicationScope;
this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
- this.writeTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"write.timer");
- this.writeMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "write.meter");
- this.deleteTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "delete.timer");
- this.deleteMeter= metricsFactory.getMeter(EntityCollectionManagerImpl.class, "delete.meter");
- this.updateTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class, "update.timer");
- this.updateMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "update.meter");
- this.loadTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"load.timer");
- this.loadMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "load.meter");
- this.getLatestTimer = metricsFactory.getTimer(EntityCollectionManagerImpl.class,"latest.timer");
- this.getLatestMeter = metricsFactory.getMeter(EntityCollectionManagerImpl.class, "latest.meter");
+ this.writeTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "write" );
+ this.deleteTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "delete" );
+ this.fieldIdTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldId" );
+ this.fieldEntityTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "fieldEntity" );
+ this.updateTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "update" );
+ this.loadTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "load" );
+ this.getLatestTimer = metricsFactory.getTimer( EntityCollectionManagerImpl.class, "latest" );
}
@@ -192,34 +176,10 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> observable = stageRunner( writeData, writeStart );
- // execute all validation stages concurrently. Needs refactored when this is done.
- // https://github.com/Netflix/RxJava/issues/627
- // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
- // writeVerifyUnique, writeOptimisticVerify );
- final Timer.Context timer = writeTimer.time();
- return observable.map(writeCommit).doOnNext(new Action1<Entity>() {
- @Override
- public void call(final Entity entity) {
- //TODO fire the created task first then the entityVersioncleanup
- taskExecutor.submit( entityVersionTaskFactory.getCreatedTask( applicationScope, entity ));
- taskExecutor.submit( entityVersionTaskFactory.getCleanupTask( applicationScope, entityId,
- entity.getVersion(), false ));
- //post-processing to come later. leave it empty for now.
- }
- }).doOnError( rollback )
- .doOnEach( new Action1<Notification<? super Entity>>() {
- @Override
- public void call( Notification<? super Entity> notification ) {
- writeMeter.mark();
- }
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ final Observable<Entity> write = observable.map( writeCommit );
+
+ return ObservableTimer.time( write, writeTimer );
}
@@ -230,36 +190,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id is required in this stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
- final Timer.Context timer = deleteTimer.time();
- Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) )
- .map(markStart)
- .doOnNext( markCommit )
- .map(new Func1<CollectionIoEvent<MvccEntity>, Id>() {
-
- @Override
- public Id call(final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent) {
- MvccEntity entity = mvccEntityCollectionIoEvent.getEvent();
- Task<Void> task = entityVersionTaskFactory
- .getDeleteTask( applicationScope, entity.getId(), entity.getVersion() );
- taskExecutor.submit(task);
- return entity.getId();
- }
- }
- )
- .doOnNext(new Action1<Id>() {
- @Override
- public void call(Id id) {
- deleteMeter.mark();
- }
- })
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ Observable<Id> o = Observable.just( new CollectionIoEvent<Id>( applicationScope, entityId ) ).map( markStart )
+ .doOnNext( markCommit ).map( entityEvent -> entityEvent.getEvent().getId() );
- return o;
+
+ return ObservableTimer.time( o, deleteTimer );
}
@@ -270,35 +205,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in load stage" );
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage" );
- final Timer.Context timer = loadTimer.time();
- return load( Collections.singleton( entityId ) ).flatMap(new Func1<EntitySet, Observable<Entity>>() {
- @Override
- public Observable<Entity> call(final EntitySet entitySet) {
- final MvccEntity entity = entitySet.getEntity(entityId);
-
- if (entity == null || !entity.getEntity().isPresent()) {
- return Observable.empty();
- }
+ final Observable<Entity> entityObservable= load( Collections.singleton( entityId ) ).flatMap( entitySet -> {
+ final MvccEntity entity = entitySet.getEntity( entityId );
- return Observable.just( entity.getEntity().get() );
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ return Observable.empty();
}
- })
- .doOnNext( new Action1<Entity>() {
- @Override
- public void call( Entity entity ) {
- loadMeter.mark();
- }
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
- }
+ return Observable.just( entity.getEntity().get() );
+ } );
+ return ObservableTimer.time( entityObservable, loadTimer );
+ }
@Override
@@ -306,15 +225,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityIds, "entityIds cannot be null" );
- final Timer.Context timer = loadTimer.time();
-
- return Observable.create( new Observable.OnSubscribe<EntitySet>() {
+ final Observable<EntitySet> entitySetObservable = Observable.create( new Observable.OnSubscribe<EntitySet>() {
@Override
public void call( final Subscriber<? super EntitySet> subscriber ) {
try {
final EntitySet results =
- entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+ entitySerializationStrategy.load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
subscriber.onNext( results );
subscriber.onCompleted();
@@ -323,148 +240,144 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } )
- .doOnNext(new Action1<EntitySet>() {
- @Override
- public void call(EntitySet entitySet) {
- loadMeter.mark();
- }
- })
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ } );
+
+
+ return ObservableTimer.time( entitySetObservable, loadTimer );
}
@Override
- public Observable<Id> getIdField(final String type, final Field field ) {
+ public Observable<MvccLogEntry> getVersions( final Id entityId ) {
+// mvccLogEntrySerializationStrategy.load( )
+ return null;
+ }
+
+
+ @Override
+ public Observable<MvccLogEntry> compact( final Collection<MvccLogEntry> entries ) {
+ return null;
+ }
+
+
+ @Override
+ public Observable<Id> getIdField( final String type, final Field field ) {
final List<Field> fields = Collections.singletonList( field );
- return rx.Observable.from( fields ).map( new Func1<Field, Id>() {
- @Override
- public Id call( Field field ) {
- try {
- final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
- final UniqueValue value = set.getValue( field.getName() );
- return value == null ? null : value.getEntityId();
- }
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
- }
+ final Observable<Id> idObservable = Observable.from( fields ).map( field1 -> {
+ try {
+ final UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields );
+ final UniqueValue value = set.getValue( field1.getName() );
+ return value == null ? null : value.getEntityId();
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
}
} );
+
+ return ObservableTimer.time( idObservable, fieldIdTimer );
}
/**
* Retrieves all entities that correspond to each field given in the Collection.
- * @param fields
- * @return
*/
@Override
- public Observable<FieldSet> getEntitiesFromFields(final String type, final Collection<Field> fields ) {
- return rx.Observable.just(fields).map( new Func1<Collection<Field>, FieldSet>() {
- @Override
- public FieldSet call( Collection<Field> fields ) {
- try {
+ public Observable<FieldSet> getEntitiesFromFields( final String type, final Collection<Field> fields ) {
+ final Observable<FieldSet> fieldSetObservable = Observable.just( fields ).map( fields1 -> {
+ try {
- final UUID startTime = UUIDGenerator.newTimeUUID();
+ final UUID startTime = UUIDGenerator.newTimeUUID();
- //Get back set of unique values that correspond to collection of fields
- UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope,type, fields );
-
- //Short circut if we don't have any uniqueValues from the given fields.
- if(!set.iterator().hasNext()){
- return new MutableFieldSet( 0 );
- }
+ //Get back set of unique values that correspond to collection of fields
+ UniqueValueSet set = uniqueValueSerializationStrategy.load( applicationScope, type, fields1 );
+ //Short circuit if we don't have any uniqueValues from the given fields.
+ if ( !set.iterator().hasNext() ) {
+ return new MutableFieldSet( 0 );
+ }
- //loop through each field, and construct an entity load
- List<Id> entityIds = new ArrayList<>(fields.size());
- List<UniqueValue> uniqueValues = new ArrayList<>(fields.size());
- for(final Field expectedField: fields) {
+ //loop through each field, and construct an entity load
+ List<Id> entityIds = new ArrayList<>( fields1.size() );
+ List<UniqueValue> uniqueValues = new ArrayList<>( fields1.size() );
- UniqueValue value = set.getValue(expectedField.getName());
+ for ( final Field expectedField : fields1 ) {
- if(value ==null){
- logger.debug( "Field does not correspond to a unique value" );
- }
+ UniqueValue value = set.getValue( expectedField.getName() );
- entityIds.add(value.getEntityId());
- uniqueValues.add(value);
+ if ( value == null ) {
+ logger.debug( "Field does not correspond to a unique value" );
}
- //Load a entity for each entityId we retrieved.
- final EntitySet entitySet = entitySerializationStrategy.load(applicationScope, entityIds, startTime);
-
- //now loop through and ensure the entities are there.
- final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
-
- final MutableFieldSet response = new MutableFieldSet(fields.size());
+ entityIds.add( value.getEntityId() );
+ uniqueValues.add( value );
+ }
- for(final UniqueValue expectedUnique: uniqueValues) {
- final MvccEntity entity = entitySet.getEntity(expectedUnique.getEntityId());
+ //Load a entity for each entityId we retrieved.
+ final EntitySet entitySet =
+ entitySerializationStrategy.load( applicationScope, entityIds, startTime );
- //bad unique value, delete this, it's inconsistent
- if(entity == null || !entity.getEntity().isPresent()){
- final MutationBatch valueDelete = uniqueValueSerializationStrategy.delete(applicationScope, expectedUnique);
- deleteBatch.mergeShallow(valueDelete);
- continue;
- }
+ //now loop through and ensure the entities are there.
+ final MutationBatch deleteBatch = keyspace.prepareMutationBatch();
+ final MutableFieldSet response = new MutableFieldSet( fields1.size() );
- //else add it to our result set
- response.addEntity(expectedUnique.getField(),entity);
+ for ( final UniqueValue expectedUnique : uniqueValues ) {
+ final MvccEntity entity = entitySet.getEntity( expectedUnique.getEntityId() );
+ //bad unique value, delete this, it's inconsistent
+ if ( entity == null || !entity.getEntity().isPresent() ) {
+ final MutationBatch valueDelete =
+ uniqueValueSerializationStrategy.delete( applicationScope, expectedUnique );
+ deleteBatch.mergeShallow( valueDelete );
+ continue;
}
- //TODO: explore making this an Async process
- //We'll repair it again if we have to
- deleteBatch.execute();
- return response;
+ //else add it to our result set
+ response.addEntity( expectedUnique.getField(), entity );
+ }
+ //TODO: explore making this an Async process
+ //We'll repair it again if we have to
+ deleteBatch.execute();
- }
- catch ( ConnectionException e ) {
- logger.error( "Failed to getIdField", e );
- throw new RuntimeException( e );
- }
+ return response;
+ }
+ catch ( ConnectionException e ) {
+ logger.error( "Failed to getIdField", e );
+ throw new RuntimeException( e );
}
} );
- }
+ return ObservableTimer.time( fieldSetObservable, fieldEntityTimer );
+ }
// fire the stages
public Observable<CollectionIoEvent<MvccEntity>> stageRunner( CollectionIoEvent<Entity> writeData,
WriteStart writeState ) {
- return Observable.just( writeData ).map( writeState ).doOnNext( new Action1<CollectionIoEvent<MvccEntity>>() {
+ return Observable.just( writeData ).map( writeState ).flatMap( mvccEntityCollectionIoEvent -> {
- @Override
- public void call( final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent ) {
+ Observable<CollectionIoEvent<MvccEntity>> uniqueObservable =
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( writeVerifyUnique );
- Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( writeVerifyUnique );
+ // optimistic verification
+ Observable<CollectionIoEvent<MvccEntity>> optimisticObservable =
+ Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( writeOptimisticVerify );
- // optimistic verification
- Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.just( mvccEntityCollectionIoEvent ).subscribeOn( rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( writeOptimisticVerify );
+ final Observable<CollectionIoEvent<MvccEntity>> zip = Observable.zip( uniqueObservable, optimisticObservable,
+ ( unique, optimistic ) -> optimistic );
+ return zip;
- //wait for both to finish
- Observable.merge( unique, optimistic ).toBlocking().last();
- }
- } );
+ } );
}
@@ -478,7 +391,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
public void call( final Subscriber<? super VersionSet> subscriber ) {
try {
final VersionSet logEntries = mvccLogEntrySerializationStrategy
- .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
+ .load( applicationScope, entityIds, UUIDGenerator.newTimeUUID() );
subscriber.onNext( logEntries );
subscriber.onCompleted();
@@ -487,13 +400,12 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
subscriber.onError( e );
}
}
- } )
- .doOnCompleted( new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- } );
+ } ).doOnCompleted( new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ } );
}
@@ -502,11 +414,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
try {
ColumnFamily<String, String> CF_SYSTEM_LOCAL =
- new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
- StringSerializer.get() );
+ new ColumnFamily<String, String>( "system.local", StringSerializer.get(), StringSerializer.get(),
+ StringSerializer.get() );
OperationResult<CqlResult<String, String>> result =
- keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
+ keyspace.prepareQuery( CF_SYSTEM_LOCAL ).withCql( "SELECT now() FROM system.local;" ).execute();
if ( result.getResult().getRows().size() == 1 ) {
return Health.GREEN;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
deleted file mode 100644
index 1753d26..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.MutationBatch;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires Cleanup Task
- */
-public class EntityDeletedTask implements Task<Void> {
- private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class);
-
- private final EntityVersionTaskFactory entityVersionTaskFactory;
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private final MvccEntitySerializationStrategy entitySerializationStrategy;
- private final Set<EntityDeleted> listeners;
- private final ApplicationScope collectionScope;
- private final Id entityId;
- private final UUID version;
-
-
- @Inject
- public EntityDeletedTask(
- EntityVersionTaskFactory entityVersionTaskFactory,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final Set<EntityDeleted> listeners, // MUST be a set or Guice will not inject
- @Assisted final ApplicationScope collectionScope,
- @Assisted final Id entityId,
- @Assisted final UUID version) {
-
- this.entityVersionTaskFactory = entityVersionTaskFactory;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.entitySerializationStrategy = entitySerializationStrategy;
- this.listeners = listeners;
- this.collectionScope = collectionScope;
- this.entityId = entityId;
- this.version = version;
- }
-
-
- @Override
- public void exceptionThrown(Throwable throwable) {
- LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { collectionScope, entityId, version }, throwable );
- }
-
-
- @Override
- public Void rejected() {
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
-
- entityVersionTaskFactory.getCleanupTask( collectionScope, entityId, version, true ).call();
-
- fireEvents();
- final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
- final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
-
- entityDelete.execute();
- logDelete.execute();
-//
- return null;
- }
-
-
- private void fireEvents() {
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().deleted( collectionScope, entityId,version );
- return;
- }
-
- LOG.debug( "Started firing {} listeners", listenerSize );
-
- //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
- Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.deleted( collectionScope, entityId, version );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
- LOG.debug( "Finished firing {} listeners", listenerSize );
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
deleted file mode 100644
index b5f9085..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
-import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.serialization.impl.LogEntryIterator;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.observables.BlockingObservable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
- * retained, the range is exclusive.
- */
-public class EntityVersionCleanupTask implements Task<Void> {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVersionCleanupTask.class );
-
- private final Set<EntityVersionDeleted> listeners;
-
- private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
- private UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
- private final Keyspace keyspace;
-
- private final SerializationFig serializationFig;
-
- private final ApplicationScope scope;
- private final Id entityId;
- private final UUID version;
- private final boolean includeVersion;
-
-
- @Inject
- public EntityVersionCleanupTask(
- final SerializationFig serializationFig,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final Keyspace keyspace,
- final Set<EntityVersionDeleted> listeners, // MUST be a set or Guice will not inject
- @Assisted final ApplicationScope scope,
- @Assisted final Id entityId,
- @Assisted final UUID version,
- @Assisted final boolean includeVersion) {
-
- this.serializationFig = serializationFig;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
- this.keyspace = keyspace;
- this.listeners = listeners;
- this.scope = scope;
- this.entityId = entityId;
- this.version = version;
-
- this.includeVersion = includeVersion;
- }
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- logger.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { scope, entityId, version }, throwable );
- }
-
-
- @Override
- public Void rejected() {
- //Our task was rejected meaning our queue was full. We need this operation to run,
- // so we'll run it in our current thread
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
- //TODO Refactor this logic into a a class that can be invoked from anywhere
- //iterate all unique values
- final BlockingObservable<Long> uniqueValueCleanup =
- Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
- @Override
- protected Iterator<UniqueValue> getIterator() {
- return uniqueValueSerializationStrategy.getAllUniqueFields( scope, entityId );
- }
- } )
-
- //skip current versions
- .skipWhile( new Func1<UniqueValue, Boolean>() {
- @Override
- public Boolean call( final UniqueValue uniqueValue ) {
- return !includeVersion && version.equals( uniqueValue.getEntityVersion() );
- }
- } )
- //buffer our buffer size, then roll them all up in a single batch mutation
- .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<UniqueValue>>() {
- @Override
- public void call( final List<UniqueValue> uniqueValues ) {
- final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
- for ( UniqueValue value : uniqueValues ) {
- uniqueCleanupBatch.mergeShallow( uniqueValueSerializationStrategy.delete( scope, value ) );
- }
-
- try {
- uniqueCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
- }
- } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
-
- //start calling the listeners for remove log entries
- BlockingObservable<Long> versionsDeletedObservable =
-
- Observable.create( new ObservableIterator<MvccLogEntry>( "Log entry iterator" ) {
- @Override
- protected Iterator<MvccLogEntry> getIterator() {
-
- return new LogEntryIterator( logEntrySerializationStrategy, scope, entityId, version,
- serializationFig.getBufferSize() );
- }
- } )
- //skip current version
- .skipWhile( new Func1<MvccLogEntry, Boolean>() {
- @Override
- public Boolean call( final MvccLogEntry mvccLogEntry ) {
- return !includeVersion && version.equals( mvccLogEntry.getVersion() );
- }
- } )
- //buffer them for efficiency
- .buffer( serializationFig.getBufferSize() ).doOnNext( new Action1<List<MvccLogEntry>>() {
- @Override
- public void call( final List<MvccLogEntry> mvccEntities ) {
-
- fireEvents( mvccEntities );
-
- final MutationBatch logCleanupBatch = keyspace.prepareMutationBatch();
-
-
- for ( MvccLogEntry entry : mvccEntities ) {
- logCleanupBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, entry.getVersion() ));
- }
-
- try {
- logCleanupBatch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute batch mutation", e );
- }
- }
- } ).subscribeOn( Schedulers.io() ).countLong().toBlocking();
-
- //wait or this to complete
- final Long removedCount = uniqueValueCleanup.last();
-
- logger.debug( "Removed unique values for {} entities of entity {}", removedCount, entityId );
-
- final Long versionCleanupCount = versionsDeletedObservable.last();
-
- logger.debug( "Removed {} previous entity versions of entity {}", versionCleanupCount, entityId );
-
- return null;
- }
-
-
- private void fireEvents( final List<MvccLogEntry> versions ) {
-
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().versionDeleted( scope, entityId, versions );
- return;
- }
-
- logger.debug( "Started firing {} listeners", listenerSize );
-
- //if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
-
-
- //if we have more than 1, run them on the rx scheduler for a max of 10 operations at a time
- Observable.from(listeners).flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.versionDeleted( scope, entityId, versions );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
-
- logger.debug( "Finished firing {} listeners", listenerSize );
- }
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
deleted file mode 100644
index fbbcdbd..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCreatedTask.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.task.Task;
-import org.apache.usergrid.persistence.model.entity.Entity;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Fires events so that all EntityVersionCreated handlers area called.
- */
-public class EntityVersionCreatedTask implements Task<Void> {
- private static final Logger logger = LoggerFactory.getLogger( EntityVersionCreatedTask.class );
-
- private Set<EntityVersionCreated> listeners;
- private final ApplicationScope collectionScope;
- private final Entity entity;
-
-
- @Inject
- public EntityVersionCreatedTask( @Assisted final ApplicationScope collectionScope,
- final Set<EntityVersionCreated> listeners,
- @Assisted final Entity entity ) {
-
- this.listeners = listeners;
- this.collectionScope = collectionScope;
- this.entity = entity;
- }
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- logger.error( "Unable to run update task for collection {} with entity {} and version {}",
- new Object[] { collectionScope, entity}, throwable );
- }
-
-
- @Override
- public Void rejected() {
-
- // Our task was rejected meaning our queue was full.
- // We need this operation to run, so we'll run it in our current thread
- try {
- call();
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Exception thrown in call task", e );
- }
-
- return null;
- }
-
-
- @Override
- public Void call() throws Exception {
-
- fireEvents();
- return null;
- }
-
-
- private void fireEvents() {
-
- final int listenerSize = listeners.size();
-
- if ( listenerSize == 0 ) {
- return;
- }
-
- if ( listenerSize == 1 ) {
- listeners.iterator().next().versionCreated( collectionScope, entity );
- return;
- }
-
- logger.debug( "Started firing {} listeners", listenerSize );
-
-
- Observable.from( listeners )
- .flatMap( currentListener -> Observable.just( currentListener ).doOnNext( listener -> {
- listener.versionCreated( collectionScope, entity );
- } ).subscribeOn( Schedulers.io() ), 10 ).toBlocking().last();
-
-
- logger.debug( "Finished firing {} listeners", listenerSize );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
deleted file mode 100644
index 51a4607..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionTaskFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.impl;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface EntityVersionTaskFactory {
-
- /**
- * Get a task for cleaning up latent entity data. If includeVersion = true, the passed version will be cleaned up as well
- * Otherwise this is a V-1 operation
- *
- * @param scope
- * @param entityId
- * @param version
- * @param includeVersion
- * @return
- */
- EntityVersionCleanupTask getCleanupTask( final ApplicationScope scope, final Id entityId, final UUID version,
- final boolean includeVersion );
-
- /**
- * Get an entityVersionCreatedTask
- * @param scope
- * @param entity
- * @return
- */
- EntityVersionCreatedTask getCreatedTask( final ApplicationScope scope, final Entity entity );
-
- /**
- * Get an entity deleted task
- * @param collectionScope
- * @param entityId
- * @param version
- * @return
- */
- EntityDeletedTask getDeleteTask( final ApplicationScope collectionScope, final Id entityId, final UUID version );
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
new file mode 100644
index 0000000..0034f03
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.rx.ObservableIterator;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Runs on an entity that as just be mark committed, and removes all unique values <= this entity
+ */
+public class UniqueCleanup
+ implements Observable.Transformer<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>> {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( UniqueCleanup.class );
+ private final Timer uniqueCleanupTimer;
+
+
+ private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final Keyspace keyspace;
+
+ private final SerializationFig serializationFig;
+
+
+ @Inject
+ public UniqueCleanup( final SerializationFig serializationFig,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace, final MetricsFactory metricsFactory ) {
+
+ this.serializationFig = serializationFig;
+ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+ this.keyspace = keyspace;
+ this.uniqueCleanupTimer = metricsFactory.getTimer( UniqueCleanup.class, "uniquecleanup" );
+ }
+
+
+ @Override
+ public Observable<CollectionIoEvent<MvccEntity>> call(
+ final Observable<CollectionIoEvent<MvccEntity>> collectionIoEventObservable ) {
+
+ final Observable<CollectionIoEvent<MvccEntity>> outputObservable =
+ collectionIoEventObservable.doOnNext( mvccEntityCollectionIoEvent -> {
+
+ final Id entityId = mvccEntityCollectionIoEvent.getEvent().getId();
+ final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
+ final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
+
+ //TODO Refactor this logic into a a class that can be invoked from anywhere
+ //iterate all unique values
+ final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup =
+ Observable.create( new ObservableIterator<UniqueValue>( "Unique value load" ) {
+ @Override
+ protected Iterator<UniqueValue> getIterator() {
+ return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope, entityId );
+ }
+ } )
+
+ //skip versions > the specified version
+ .skipWhile( uniqueValue -> {
+
+ final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+
+ return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion ) > 0;
+ } )
+
+ //buffer our buffer size, then roll them all up in a single batch mutation
+ .buffer( serializationFig.getBufferSize() )
+
+ //roll them up
+ .doOnNext( uniqueValues -> {
+ final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+ for ( UniqueValue value : uniqueValues ) {
+ uniqueCleanupBatch
+ .mergeShallow( uniqueValueSerializationStrategy.delete( applicationScope, value ) );
+ }
+
+ try {
+ uniqueCleanupBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute batch mutation", e );
+ }
+ } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent );
+ } );
+
+ return ObservableTimer.time( outputObservable, uniqueCleanupTimer );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
new file mode 100644
index 0000000..0945827
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/VersionCompact.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.mvcc.stage.delete;
+
+
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import rx.Observable;
+
+
+/**
+ * Compact all versions on the input observable by removing them from the log, and from the
+ * versions
+ */
+public class VersionCompact
+ implements Observable.Transformer<CollectionIoEvent<MvccLogEntry>, CollectionIoEvent<MvccLogEntry>> {
+
+ private static final Logger logger = LoggerFactory.getLogger( VersionCompact.class );
+
+ private final MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private final MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
+ private final SerializationFig serializationFig;
+ private final Keyspace keyspace;
+ private final Timer compactTimer;
+
+
+ @Inject
+ public VersionCompact( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final SerializationFig serializationFig, final Keyspace keyspace,
+ final MetricsFactory metricsFactory,
+ final MvccEntitySerializationStrategy mvccEntitySerializationStrategy ) {
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.serializationFig = serializationFig;
+ this.keyspace = keyspace;
+ this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
+ this.compactTimer = metricsFactory.getTimer( VersionCompact.class, "compact" );
+ }
+
+
+ @Override
+ public Observable<CollectionIoEvent<MvccLogEntry>> call(
+ final Observable<CollectionIoEvent<MvccLogEntry>> collectionIoEventObservable ) {
+
+
+ final Observable<CollectionIoEvent<MvccLogEntry>> entryBuffer =
+ collectionIoEventObservable.buffer( serializationFig.getBufferSize() ).flatMap(
+ buffer -> Observable.from( buffer ).collect( () -> keyspace.prepareMutationBatch(),
+ ( ( mutationBatch, mvccLogEntryCollectionIoEvent ) -> {
+
+ final ApplicationScope scope = mvccLogEntryCollectionIoEvent.getEntityCollection();
+ final MvccLogEntry mvccLogEntry = mvccLogEntryCollectionIoEvent.getEvent();
+ final Id entityId = mvccLogEntry.getEntityId();
+ final UUID version = mvccLogEntry.getVersion();
+
+ //delete from our log
+ mutationBatch.mergeShallow( logEntrySerializationStrategy.delete( scope, entityId, version ) );
+
+ //merge our entity delete in
+ mutationBatch
+ .mergeShallow( mvccEntitySerializationStrategy.delete( scope, entityId, version ) );
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug(
+ "Deleting log entry and version data for entity id {} and version {} in app scope {}",
+ new Object[] { entityId, version, scope } );
+ }
+
+
+
+
+ } ) )
+ //delete from the entities
+ .doOnNext( mutationBatch -> {
+ try {
+ mutationBatch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform batch mutation" );
+ }
+ } ).flatMap( batches -> Observable.from( buffer ) ) );
+
+
+ return ObservableTimer.time( entryBuffer, compactTimer );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/45aed6cc/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
index 92669a7..5e249ae 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/MvccLogEntrySerializationStrategy.java
@@ -71,6 +71,20 @@ public interface MvccLogEntrySerializationStrategy extends Migration, VersionedD
*/
List<MvccLogEntry> load( ApplicationScope applicationScope, Id entityId, UUID version, int maxSize );
+
+
+ /**
+ * Load a list, from lowest to highest of the stage with versions <= version up to maxSize elements
+ *
+ * @param applicationScope The applicationScope to load the entity from
+ * @param entityId The entity id to load
+ * @param minVersion The min version to seek from. Null is allowed
+ * @param maxSize The maximum size to return. If you receive this size, there may be more versions to load.
+ *
+ * @return A list of entities up to max size ordered from max(UUID)=> min(UUID)
+ */
+ List<MvccLogEntry> loadReversed( ApplicationScope applicationScope, Id entityId, UUID minVersion, int maxSize );
+
/**
* MarkCommit the stage from the applicationScope with the given entityId and version
*