You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/10/27 21:03:13 UTC
[3/6] git commit: Minor formatting changes only.
Minor formatting changes only.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/05429c51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/05429c51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/05429c51
Branch: refs/heads/two-dot-o-events
Commit: 05429c518cfbdf79f1ae4579909a882fdc9c50a2
Parents: 57c28ba
Author: Dave Johnson <dm...@apigee.com>
Authored: Mon Oct 27 10:31:24 2014 -0400
Committer: Dave Johnson <dm...@apigee.com>
Committed: Mon Oct 27 10:31:24 2014 -0400
----------------------------------------------------------------------
.../persistence/collection/EntitySet.java | 4 +-
.../persistence/collection/VersionSet.java | 22 +--
.../collection/impl/EntityDeletedTask.java | 52 +++---
.../impl/EntityVersionCleanupTask.java | 167 ++++++++++---------
4 files changed, 115 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05429c51/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
index 1e811ae..35b6a12 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntitySet.java
@@ -1,4 +1,4 @@
-package org.apache.usergrid.persistence.collection;/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,7 +16,7 @@ package org.apache.usergrid.persistence.collection;/*
* specific language governing permissions and limitations
* under the License.
*/
-
+package org.apache.usergrid.persistence.collection;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05429c51/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/VersionSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/VersionSet.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/VersionSet.java
index 8ee9cdc..77520a3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/VersionSet.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/VersionSet.java
@@ -1,24 +1,4 @@
/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.persistence.collection;/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,7 +16,7 @@ package org.apache.usergrid.persistence.collection;/*
* specific language governing permissions and limitations
* under the License.
*/
-
+package org.apache.usergrid.persistence.collection;
import org.apache.usergrid.persistence.model.entity.Id;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05429c51/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
index 52962c9..0755025 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityDeletedTask.java
@@ -1,23 +1,21 @@
/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-
package org.apache.usergrid.persistence.collection.impl;
import com.google.inject.Inject;
@@ -40,6 +38,7 @@ import rx.schedulers.Schedulers;
import java.util.List;
import java.util.UUID;
+
/**
* Fires Cleanup Task
*/
@@ -54,12 +53,14 @@ public class EntityDeletedTask implements Task<Void> {
private static final Logger LOG = LoggerFactory.getLogger(EntityDeletedTask.class);
@Inject
- public EntityDeletedTask(EntityVersionCleanupFactory entityVersionCleanupFactory,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final List<EntityDeleted> listeners,
- @Assisted final CollectionScope collectionScope,
- @Assisted final Id entityId, @Assisted final UUID version){
+ public EntityDeletedTask( EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final List<EntityDeleted> listeners,
+ @Assisted final CollectionScope collectionScope,
+ @Assisted final Id entityId,
+ @Assisted final UUID version) {
+
this.entityVersionCleanupFactory = entityVersionCleanupFactory;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
this.entitySerializationStrategy = entitySerializationStrategy;
@@ -88,13 +89,16 @@ public class EntityDeletedTask implements Task<Void> {
}
@Override
- public Void call() throws Exception {
- entityVersionCleanupFactory.getTask(collectionScope,entityId,version).call();
+ public Void call() throws Exception {
+
+ entityVersionCleanupFactory.getTask( collectionScope, entityId, version, listeners ).call();
+
fireEvents();
final MutationBatch entityDelete = entitySerializationStrategy.delete(collectionScope, entityId, version);
final MutationBatch logDelete = logEntrySerializationStrategy.delete(collectionScope, entityId, version);
entityDelete.execute();
logDelete.execute();
+
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/05429c51/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
index 83bad2f..ca66192 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityVersionCleanupTask.java
@@ -53,8 +53,8 @@ import rx.schedulers.Schedulers;
/**
- * Cleans up previous versions from the specified version. Note that this means the version passed in the io event is
- * retained, the range is exclusive.
+ * Cleans up previous versions from the specified version. Note that this means the version
+ * passed in the io event is retained, the range is exclusive.
*/
public class EntityVersionCleanupTask implements Task<Void> {
@@ -77,13 +77,13 @@ public class EntityVersionCleanupTask implements Task<Void> {
@Inject
public EntityVersionCleanupTask( final SerializationFig serializationFig,
- final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
- final MvccEntitySerializationStrategy entitySerializationStrategy,
- final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
- final Keyspace keyspace,
- @Assisted final CollectionScope scope,
- final List<EntityVersionDeleted> listeners,
- @Assisted final Id entityId,@Assisted final UUID version ) {
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final Keyspace keyspace,
+ @Assisted final CollectionScope scope,
+ final List<EntityVersionDeleted> listeners,
+ @Assisted final Id entityId,@Assisted final UUID version ) {
this.serializationFig = serializationFig;
this.logEntrySerializationStrategy = logEntrySerializationStrategy;
@@ -124,71 +124,72 @@ public class EntityVersionCleanupTask implements Task<Void> {
//TODO Refactor this logic into a a class that can be invoked from anywhere
//load every entity we have history of
Observable<List<MvccEntity>> deleteFieldsObservable =
- Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
- @Override
- protected Iterator<MvccEntity> getIterator() {
- Iterator<MvccEntity> entities = entitySerializationStrategy.load(scope, entityId, version, serializationFig.getBufferSize());
- return entities;
- }
- }) //buffer them for efficiency
- .skip(1)
- .buffer(serializationFig.getBufferSize()).doOnNext(
- new Action1<List<MvccEntity>>() {
- @Override
- public void call(final List<MvccEntity> mvccEntities) {
- final MutationBatch batch = keyspace.prepareMutationBatch();
- final MutationBatch entityBatch = keyspace.prepareMutationBatch();
- final MutationBatch logBatch = keyspace.prepareMutationBatch();
-
- for (MvccEntity mvccEntity : mvccEntities) {
- if (!mvccEntity.getEntity().isPresent()) {
- continue;
- }
-
- final UUID entityVersion = mvccEntity.getVersion();
- final Entity entity = mvccEntity.getEntity().get();
-
- //remove all unique fields from the index
- for (final Field field : entity.getFields()) {
- if (!field.isUnique()) {
- continue;
- }
- final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
- final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(scope,unique);
- batch.mergeShallow(deleteMutation);
- }
-
- final MutationBatch entityDelete = entitySerializationStrategy.delete(scope, entityId, mvccEntity.getVersion());
- entityBatch.mergeShallow(entityDelete);
- final MutationBatch logDelete = logEntrySerializationStrategy.delete(scope, entityId, version);
- logBatch.mergeShallow(logDelete);
- }
-
- try {
- batch.execute();
- } catch (ConnectionException e1) {
- throw new RuntimeException("Unable to execute " +
- "unique value " +
- "delete", e1);
- }
- fireEvents(mvccEntities);
- try {
- entityBatch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to delete entities in cleanup", e);
- }
-
- try {
- logBatch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException("Unable to delete entities from the log", e);
- }
+ Observable.create(new ObservableIterator<MvccEntity>("deleteColumns") {
+ @Override
+ protected Iterator<MvccEntity> getIterator() {
+ Iterator<MvccEntity> entities = entitySerializationStrategy
+ .load(scope, entityId, version, serializationFig.getBufferSize());
+ return entities;
+ }
+ }) //buffer them for efficiency
+ .skip(1)
+ .buffer(serializationFig.getBufferSize()).doOnNext(
+ new Action1<List<MvccEntity>>() {
+ @Override
+ public void call(final List<MvccEntity> mvccEntities) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch entityBatch = keyspace.prepareMutationBatch();
+ final MutationBatch logBatch = keyspace.prepareMutationBatch();
+
+ for (MvccEntity mvccEntity : mvccEntities) {
+ if (!mvccEntity.getEntity().isPresent()) {
+ continue;
+ }
+
+ final UUID entityVersion = mvccEntity.getVersion();
+ final Entity entity = mvccEntity.getEntity().get();
+ //remove all unique fields from the index
+ for (final Field field : entity.getFields()) {
+ if (!field.isUnique()) {
+ continue;
}
+ final UniqueValue unique = new UniqueValueImpl( field, entityId, entityVersion);
+ final MutationBatch deleteMutation = uniqueValueSerializationStrategy.delete(scope,unique);
+ batch.mergeShallow(deleteMutation);
}
+ final MutationBatch entityDelete = entitySerializationStrategy
+ .delete(scope, entityId, mvccEntity.getVersion());
+ entityBatch.mergeShallow(entityDelete);
+ final MutationBatch logDelete = logEntrySerializationStrategy
+ .delete(scope, entityId, version);
+ logBatch.mergeShallow(logDelete);
+ }
+
+ try {
+ batch.execute();
+ } catch (ConnectionException e1) {
+ throw new RuntimeException("Unable to execute " +
+ "unique value " +
+ "delete", e1);
+ }
+ fireEvents(mvccEntities);
+ try {
+ entityBatch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Unable to delete entities in cleanup", e);
+ }
+
+ try {
+ logBatch.execute();
+ } catch (ConnectionException e) {
+ throw new RuntimeException("Unable to delete entities from the log", e);
+ }
- );
+ }
+ }
+ );
final int removedCount = deleteFieldsObservable.count().toBlocking().last();
@@ -216,20 +217,20 @@ public class EntityVersionCleanupTask implements Task<Void> {
//if we have more than 1, run them on the rx scheduler for a max of 8 operations at a time
Observable.from( listeners )
- .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
-
- @Override
- public Observable<EntityVersionDeleted> call(
- final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
-
- return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
- @Override
- public void call( final EntityVersionDeleted listener ) {
- listener.versionDeleted( scope, entityId, versions );
- }
- } );
- }
- }, Schedulers.io() ).toBlocking().last();
+ .parallel( new Func1<Observable<EntityVersionDeleted>, Observable<EntityVersionDeleted>>() {
+
+ @Override
+ public Observable<EntityVersionDeleted> call(
+ final Observable<EntityVersionDeleted> entityVersionDeletedObservable ) {
+
+ return entityVersionDeletedObservable.doOnNext( new Action1<EntityVersionDeleted>() {
+ @Override
+ public void call( final EntityVersionDeleted listener ) {
+ listener.versionDeleted( scope, entityId, versions );
+ }
+ } );
+ }
+ }, Schedulers.io() ).toBlocking().last();
LOG.debug( "Finished firing {} listeners", listenerSize );
}