You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2014/10/09 02:07:22 UTC

git commit: latest

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-events 9bd46b5a5 -> 23ff82e05


latest


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/23ff82e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/23ff82e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/23ff82e0

Branch: refs/heads/two-dot-o-events
Commit: 23ff82e05aa38ca56d16016d65ef8d7316618299
Parents: 9bd46b5
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Oct 8 18:07:06 2014 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Oct 8 18:07:06 2014 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/GuiceModule.java   |  12 +--
 .../usergrid/event/EntityDeletedImpl.java       |  54 ++++++++++
 .../event/EntityVersionDeletedImpl.java         |  71 +++++++++++++
 .../collection/event/EntityDeleted.java         |   4 +-
 .../event/impl/EntityDeletedImpl.java           |  62 ------------
 .../collection/impl/EntityDeletedTask.java      | 100 +++++++++++++++++++
 .../impl/EntityVersionCleanupTask.java          |  15 ---
 7 files changed, 233 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 686ed8e..201d14b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -19,9 +19,10 @@ package org.apache.usergrid.corepersistence;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.event.EntityDeletedImpl;
+import org.apache.usergrid.event.EntityVersionDeletedImpl;
 import org.apache.usergrid.persistence.collection.event.EntityDeleted;
 import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
-import org.apache.usergrid.persistence.collection.event.impl.EntityDeletedImpl;
 import org.apache.usergrid.persistence.collection.guice.CollectionModule;
 import org.apache.usergrid.persistence.core.guice.CommonModule;
 import org.apache.usergrid.persistence.graph.guice.GraphModule;
@@ -51,12 +52,11 @@ public class GuiceModule  extends AbstractModule {
         bind(CpEntityDeleteListener.class).asEagerSingleton();
         bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
 
-        Multibinder<EntityDeleted> uriBinder = Multibinder.newSetBinder(binder(), EntityDeleted.class);
-        uriBinder.addBinding().to( EntityDeletedImpl.class );
-        Multibinder<EntityVersionDeleted> versionBinder = Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
-
-
 
+        Multibinder<EntityDeleted> entityBinder =  Multibinder.newSetBinder(binder(), EntityDeleted.class);
+        entityBinder.addBinding().to( EntityDeletedImpl.class );
+        Multibinder<EntityVersionDeleted> versionBinder = Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
+        versionBinder.addBinding().to(EntityVersionDeletedImpl.class);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java b/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
new file mode 100644
index 0000000..ce39c5d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/event/EntityDeletedImpl.java
@@ -0,0 +1,54 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.event;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityDeleted;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+
+import java.util.UUID;
+
+/**
+ * purge most current entity
+ */
+public class EntityDeletedImpl implements EntityDeleted {
+
+    private final EntityIndexBatch entityIndex;
+
+    public EntityDeletedImpl(EntityIndexBatch entityIndex){
+        this.entityIndex = entityIndex;
+    }
+
+    @Override
+    public void deleted(CollectionScope scope, Id entityId, UUID version) {
+        IndexScope indexScope = new IndexScopeImpl(
+                new SimpleId(scope.getOwner().getUuid(),scope.getOwner().getType()),
+                scope.getName()
+        );
+        entityIndex.deindex(indexScope,entityId,version);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java b/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
new file mode 100644
index 0000000..e48fa7c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/event/EntityVersionDeletedImpl.java
@@ -0,0 +1,71 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.event;
+
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.entity.SimpleId;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
+
+import java.util.List;
+
+/**
+ * Purge old entity versions
+ */
+public class EntityVersionDeletedImpl implements EntityVersionDeleted{
+
+
+    private final EntityIndexBatch entityIndexBatch;
+    private final SerializationFig serializationFig;
+
+    public EntityVersionDeletedImpl(EntityIndexBatch entityIndexBatch, SerializationFig serializationFig){
+        this.entityIndexBatch = entityIndexBatch;
+        this.serializationFig = serializationFig;
+    }
+
+    @Override
+    public void versionDeleted(final CollectionScope scope, final Id entityId, final List<MvccEntity> entityVersions) {
+        final IndexScope indexScope = new IndexScopeImpl(
+                new SimpleId(scope.getOwner().getUuid(),scope.getOwner().getType()),
+                scope.getName()
+        );
+        rx.Observable.from(entityVersions)
+                .subscribeOn(Schedulers.io())
+                .buffer(serializationFig.getBufferSize())
+                .map(new Func1<List<MvccEntity>,List<MvccEntity>>() {
+                    @Override
+                    public List<MvccEntity> call(List<MvccEntity> entityList) {
+                        for(MvccEntity entity : entityList){
+                             entityIndexBatch.deindex(indexScope,entityId,entity.getVersion());
+                        }
+                        entityIndexBatch.execute();
+                        return entityList;
+                    }
+                }).toBlocking().last();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
index 39b266d..0e2b8a2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/EntityDeleted.java
@@ -38,8 +38,8 @@ public interface EntityDeleted {
      *
      * @param scope The scope of the entity
      * @param entityId The id of the entity
-     * @param inclusiveVersionToDeleteFrom the entity version
+     * @param version the entity version
      */
-    public void deleted( final CollectionScope scope, final Id entityId, final UUID inclusiveVersionToDeleteFrom);
+    public void deleted( final CollectionScope scope, final Id entityId, final UUID version);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
deleted file mode 100644
index f2b398f..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/event/impl/EntityDeletedImpl.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.event.impl;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.event.EntityDeleted;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.util.UUID;
-
-
-public class EntityDeletedImpl implements EntityDeleted {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedImpl.class);
-
-    private MvccEntitySerializationStrategy mvccEntitySerializationStrategy;
-    private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
-
-    public EntityDeletedImpl(MvccEntitySerializationStrategy mvccEntitySerializationStrategy, MvccLogEntrySerializationStrategy logEntrySerializationStrategy){
-
-        this.mvccEntitySerializationStrategy = mvccEntitySerializationStrategy;
-        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
-    }
-
-    @Override
-    public void deleted(CollectionScope scope, Id entityId, UUID inclusiveVersionToDeleteFrom) {
-        //TODO: clean up cass versions
-        MvccEntity mvccEntity = mvccEntitySerializationStrategy.load(scope,entityId,inclusiveVersionToDeleteFrom);
-        final MutationBatch entityDelete = mvccEntitySerializationStrategy.delete(scope, entityId, mvccEntity.getVersion());
-        final MutationBatch logDelete = logEntrySerializationStrategy.delete(scope, entityId, inclusiveVersionToDeleteFrom);
-        try {
-            entityDelete.execute();
-            logDelete.execute();
-        }catch (ConnectionException ce){
-            LOG.error("Error deleing from", ce);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
new file mode 100644
index 0000000..8abb456
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -0,0 +1,100 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.core.task.Task;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Fires Cleanup Task
+ */
+public class EntityDeletedTask implements Task<Void> {
+    private EntityVersionCleanupFactory entityVersionCleanupFactory;
+    private MvccLogEntrySerializationStrategy logEntrySerializationStrategy;
+    private MvccEntitySerializationStrategy entitySerializationStrategy;
+    private CollectionScope collectionScope;
+    private Id entityId;
+    private UUID version;
+    private static final Logger LOG =  LoggerFactory.getLogger(EntityDeletedTask.class);
+
+    public EntityDeletedTask(EntityVersionCleanupFactory entityVersionCleanupFactory,
+                             final SerializationFig serializationFig,
+                             final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+                             final MvccEntitySerializationStrategy entitySerializationStrategy,
+                             final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                             final Keyspace keyspace,
+                             final CollectionScope scope,
+                             final List<EntityVersionDeleted> listeners,
+                             CollectionScope collectionScope,
+                             @Assisted Id entityId, @Assisted UUID version){
+        this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+        this.logEntrySerializationStrategy = logEntrySerializationStrategy;
+        this.entitySerializationStrategy = entitySerializationStrategy;
+        this.collectionScope = collectionScope;
+        this.entityId = entityId;
+        this.version = version;
+    }
+
+    @Override
+    public void exceptionThrown(Throwable throwable) {
+        LOG.error( "Unable to run update task for collection {} with entity {} and version {}",
+                new Object[] { collectionScope, entityId, version }, throwable );
+    }
+
+    @Override
+    public Void rejected() {
+        try {
+            call();
+        }
+        catch ( Exception e ) {
+            throw new RuntimeException( "Exception thrown in call task", e );
+        }
+
+        return null;
+    }
+
+    @Override
+    public Void call() throws Exception {
+        entityVersionCleanupFactory.getTask(entityId,version).call();
+        fireEvents();
+        final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
+        final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
+        entityDelete.execute();
+        logDelete.execute();
+        return null;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/23ff82e0/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 0d954c7..259e931 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -111,7 +111,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
     public Void rejected() {
         //Our task was rejected meaning our queue was full.  We need this operation to run,
         // so we'll run it in our current thread
-
         try {
             call();
         }
@@ -140,8 +139,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
                         new Action1<List<MvccEntity>>() {
                             @Override
                             public void call(final List<MvccEntity> mvccEntities) {
-
-
                                 final MutationBatch batch = keyspace.prepareMutationBatch();
                                 final MutationBatch entityBatch = keyspace.prepareMutationBatch();
                                 final MutationBatch logBatch = keyspace.prepareMutationBatch();
@@ -152,32 +149,22 @@ public class EntityVersionCleanupTask implements Task<Void> {
                                     }
 
                                     final UUID entityVersion = mvccEntity.getVersion();
-
                                     final Entity entity = mvccEntity.getEntity().get();
 
                                     //remove all unique fields from the index
                                     for (final Field field : entity.getFields()) {
-
                                         if (!field.isUnique()) {
                                             continue;
                                         }
-
                                         final UniqueValue unique = new UniqueValueImpl(scope, field, entityId, entityVersion);
-
                                         final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(unique);
-
                                         batch.mergeShallow(deleteMutation);
                                     }
 
                                     final MutationBatch entityDelete = entitySerializationStrategy.delete(scope, entityId, mvccEntity.getVersion());
-
                                     entityBatch.mergeShallow(entityDelete);
-
                                     final MutationBatch logDelete = logEntrySerializationStrategy.delete(scope, entityId, version);
-
                                     logBatch.mergeShallow(logDelete);
-
-
                                 }
 
                                 try {
@@ -214,8 +201,6 @@ public class EntityVersionCleanupTask implements Task<Void> {
     }
 
 
-
-
     private void fireEvents( final List<MvccEntity> versions ) {
 
         final int listenerSize = listeners.size();