You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/09 02:07:22 UTC
git commit: latest
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-events 9bd46b5a5 -> 23ff82e05
latest
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23ff82e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23ff82e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23ff82e0
Branch: refs/heads/two-dot-o-events
Commit: 23ff82e05aa38ca56d16016d65ef8d7316618299
Parents: 9bd46b5
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 8 18:07:06 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 8 18:07:06 2014 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/GuiceModule.java | 12 +--
.../usergrid/event/EntityDeletedImpl.java | 54 ++++++++++
.../event/EntityVersionDeletedImpl.java | 71 +++++++++++++
.../collection/event/EntityDeleted.java | 4 +-
.../event/impl/EntityDeletedImpl.java | 62 ------------
.../collection/impl/EntityDeletedTask.java | 100 +++++++++++++++++++
.../impl/EntityVersionCleanupTask.java | 15 ---
7 files changed, 233 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 686ed8e..201d14b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -19,9 +19,10 @@ package org.apache.usergrid.corepersistence;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.event.EntityDeletedImpl;
+import org.apache.usergrid.event.EntityVersionDeletedImpl;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.event.impl.EntityDeletedImpl;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
@@ -51,12 +52,11 @@ public class GuiceModule extends AbstractModule {
bind(CpEntityDeleteListener.class).asEagerSingleton();
bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
- Multibinder<EntityDeleted> uriBinder = Multibinder.newSetBinder(binder(), EntityDeleted.class);
- uriBinder.addBinding().to( EntityDeletedImpl.class );
- Multibinder<EntityVersionDeleted> versionBinder = Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
-
-
+ Multibinder<EntityDeleted> entityBinder = Multibinder.newSetBinder(binder(), EntityDeleted.class);
+ entityBinder.addBinding().to( EntityDeletedImpl.class );
+ Multibinder<EntityVersionDeleted> versionBinder = Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
+ versionBinder.addBinding().to(EntityVersionDeletedImpl.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java b/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
new file mode 100644
index 0000000..ce39c5d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.event;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import java.util.UUID;
+
+/**
+ * purge most current entity
+ */
+public class EntityDeletedImpl implements EntityDeleted {
+
+ private final EntityIndexBatch entityIndex;
+
+ public EntityDeletedImpl(EntityIndexBatch entityIndex){
+ this.entityIndex = entityIndex;
+ }
+
+ @Override
+ public void deleted(CollectionScope scope, Id entityId, UUID version) {
+ IndexScope indexScope = new IndexScopeImpl(
+ new SimpleId(scope.getOwner().getUuid(),scope.getOwner().getType()),
+ scope.getName()
+ );
+ entityIndex.deindex(indexScope,entityId,version);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java b/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
new file mode 100644
index 0000000..e48fa7c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.event;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+
+/**
+ * Purge old entity versions
+ */
+public class EntityVersionDeletedImpl implements EntityVersionDeleted{
+
+
+ private final EntityIndexBatch entityIndexBatch;
+ private final SerializationFig serializationFig;
+
+ public EntityVersionDeletedImpl(EntityIndexBatch entityIndexBatch, SerializationFig serializationFig){
+ this.entityIndexBatch = entityIndexBatch;
+ this.serializationFig = serializationFig;
+ }
+
+ @Override
+ public void versionDeleted(final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+ final IndexScope indexScope = new IndexScopeImpl(
+ new SimpleId(scope.getOwner().getUuid(),scope.getOwner().getType()),
+ scope.getName()
+ );
+ rx.Observable.from(entityVersions)
+ .subscribeOn(Schedulers.io())
+ .buffer(serializationFig.getBufferSize())
+ .map(new Func1<List<MvccEntity>,List<MvccEntity>>() {
+ @Override
+ public List<MvccEntity> call(List<MvccEntity> entityList) {
+ for(MvccEntity entity : entityList){
+ entityIndexBatch.deindex(indexScope,entityId,entity.getVersion());
+ }
+ entityIndexBatch.execute();
+ return entityList;
+ }
+ }).toBlocking().last();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
index 39b266d..0e2b8a2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -38,8 +38,8 @@ public interface EntityDeleted {
*
* @param scope The scope of the entity
* @param entityId The id of the entity
- * @param inclusiveVersionToDeleteFrom the entity version
+ * @param version the entity version
*/
- public void deleted( final CollectionScope scope, final Id entityId, final UUID inclusiveVersionToDeleteFrom);
+ public void deleted( final CollectionScope scope, final Id entityId, final UUID version);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
deleted file mode 100644
index f2b398f..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event.impl;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.util.UUID;
-
-
-public class EntityDeletedImpl implements EntityDeleted {
-
- private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedImpl.class);
-
- private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
- private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-
- public EntityDeletedImpl(MvccEntitySerializationStrategy mvccEntitySerializationStrategy, MvccLogEntrySerializationStrategy logEntrySerializationStrategy){
-
- this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
- this.logEntrySerializationStrategy = logEntrySerializationStrategy;
- }
-
- @Override
- public void deleted(CollectionScope scope, Id entityId, UUID inclusiveVersionToDeleteFrom) {
- //TODO: clean up cass versions
- MvccEntity mvccEntity = mvccEntitySerializationStrategy.load(scope,entityId,inclusiveVersionToDeleteFrom);
- final MutationBatch entityDelete = mvccEntitySerializationStrategy.delete(scope, entityId, mvccEntity.getVersion());
- final MutationBatch logDelete = logEntrySerializationStrategy.delete(scope, entityId, inclusiveVersionToDeleteFrom);
- try {
- entityDelete.execute();
- logDelete.execute();
- }catch (ConnectionException ce){
- LOG.error("Error deleing from", ce);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
new file mode 100644
index 0000000..8abb456
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -0,0 +1,100 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Fires Cleanup Task
+ */
+public class EntityDeletedTask implements Task<Void> {
+ private EntityVersionCleanupFactory entityVersionCleanupFactory;
+ private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+ private MvccEntitySerializationStrategy entitySerializationStrategy;
+ private CollectionScope collectionScope;
+ private Id entityId;
+ private UUID version;
+ private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class);
+
+ public EntityDeletedTask(EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final SerializationFig serializationFig,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace,
+ final CollectionScope scope,
+ final List<EntityVersionDeleted> listeners,
+ CollectionScope collectionScope,
+ @Assisted Id entityId, @Assisted UUID version){
+ this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+ this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.collectionScope = collectionScope;
+ this.entityId = entityId;
+ this.version = version;
+ }
+
+ @Override
+ public void exceptionThrown(Throwable throwable) {
+ LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+ new Object[] { collectionScope, entityId, version }, throwable );
+ }
+
+ @Override
+ public Void rejected() {
+ try {
+ call();
+ }
+ catch ( Exception e ) {
+ throw new RuntimeException( "Exception thrown in call task", e );
+ }
+
+ return null;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ entityVersionCleanupFactory.getTask(entityId,version).call();
+ fireEvents();
+ final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
+ final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
+ entityDelete.execute();
+ logDelete.execute();
+ return null;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 0d954c7..259e931 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -111,7 +111,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
public Void rejected() {
//Our task was rejected meaning our queue was full. We need this operation to run,
// so we'll run it in our current thread
-
try {
call();
}
@@ -140,8 +139,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
new Action1<List<MvccEntity>>() {
@Override
public void call(final List<MvccEntity> mvccEntities) {
-
-
final MutationBatch batch = keyspace.prepareMutationBatch();
final MutationBatch entityBatch = keyspace.prepareMutationBatch();
final MutationBatch logBatch = keyspace.prepareMutationBatch();
@@ -152,32 +149,22 @@ public class EntityVersionCleanupTask implements Task<Void> {
}
final UUID entityVersion = mvccEntity.getVersion();
-
final Entity entity = mvccEntity.getEntity().get();
//remove all unique fields from the index
for (final Field field : entity.getFields()) {
-
if (!field.isUnique()) {
continue;
}
-
final UniqueValue unique = new UniqueValueImpl(scope, field, entityId, entityVersion);
-
final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(unique);
-
batch.mergeShallow(deleteMutation);
}
final MutationBatch entityDelete = entitySerializationStrategy.delete(scope, entityId, mvccEntity.getVersion());
-
entityBatch.mergeShallow(entityDelete);
-
final MutationBatch logDelete = logEntrySerializationStrategy.delete(scope, entityId, version);
-
logBatch.mergeShallow(logDelete);
-
-
}
try {
@@ -214,8 +201,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
}
-
-
private void fireEvents( final List<MvccEntity> versions ) {
final int listenerSize = listeners.size();