You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2014/11/13 01:05:17 UTC
[1/4] incubator-usergrid git commit: Finished migrations. Also added
utility functions for streaming data.
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o a143ddfea -> 43b0ba64a
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index fb4800c..beaeab4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.corepersistence;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
+import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.core.guice.CommonModule;
@@ -52,6 +53,7 @@ public class GuiceModule extends AbstractModule {
bind(CpEntityIndexDeleteListener.class).asEagerSingleton();
Multibinder<DataMigration> dataMigrationMultibinder = Multibinder.newSetBinder( binder(), DataMigration.class );
+ dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
index 9f4d4d8..580d128 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/NamingUtils.java
@@ -45,6 +45,11 @@ public class NamingUtils {
public static final UUID DEFAULT_APPLICATION_ID =
UUID.fromString("b6768a08-b5d5-11e3-a495-11ddb1de66c9");
+ /**
+ * The name of the map that holds our entity id->type mapping
+ */
+ public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
+
/**
* Get the application scope from the given uuid
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
new file mode 100644
index 0000000..36d2e60
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.migration;
+
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.NamingUtils;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
+
+import com.google.inject.Inject;
+
+import rx.functions.Action1;
+
+
+/**
+ * Migration to ensure that our entity id is written into our map data
+ */
+public class EntityTypeMappingMigration implements DataMigration {
+
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityTypeMappingMigration.class );
+
+ private final GraphManagerFactory graphManagerFactory;
+ private final MapManagerFactory mapManagerFactory;
+
+
+ @Inject
+ public EntityTypeMappingMigration( final GraphManagerFactory graphManagerFactory,
+ final MapManagerFactory mapManagerFactory ) {
+ this.graphManagerFactory = graphManagerFactory;
+ this.mapManagerFactory = mapManagerFactory;
+ }
+
+
+ @Override
+ public void migrate( final ProgressObserver observer ) throws Throwable {
+
+ final AtomicLong atomicLong = new AtomicLong();
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( graphManagerFactory )
+ .doOnNext( new Action1<AllEntitiesInSystemObservable.EntityData>() {
+
+
+ @Override
+ public void call( final AllEntitiesInSystemObservable.EntityData entityData ) {
+
+ final MapScope ms = new MapScopeImpl( entityData.entityId,
+ NamingUtils.TYPES_BY_UUID_MAP );
+
+
+ final MapManager mapManager = mapManagerFactory.createMapManager( ms );
+
+ final UUID entityUuid = entityData.entityId.getUuid();
+ final String entityType = entityData.entityId.getType();
+
+ mapManager.putString( entityUuid.toString(), entityType );
+
+ if ( atomicLong.incrementAndGet() % 100 == 0 ) {
+ updateStatus( atomicLong, observer );
+ }
+ }
+ } ).toBlocking().lastOrDefault( null );
+ }
+
+
+ private void updateStatus( final AtomicLong counter, final ProgressObserver observer ) {
+
+ observer.update( getVersion(), String.format( "Updated %d entities", counter.get() ) );
+ }
+
+
+ @Override
+ public int getVersion() {
+ return Versions.VERSION_1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index 7c26f0c..c2573e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -1,21 +1,21 @@
/*
*
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing,
- * * software distributed under the License is distributed on an
- * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * * KIND, either express or implied. See the License for the
- * * specific language governing permissions and limitations
- * * under the License.
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
*
*/
@@ -23,24 +23,25 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
-import org.apache.usergrid.corepersistence.rx.ApplicationObservable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable;
-import org.apache.usergrid.corepersistence.rx.TargetIdObservable;
import org.apache.usergrid.persistence.core.guice.CurrentImpl;
-import org.apache.usergrid.persistence.core.guice.PreviousImpl;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action1;
@@ -53,73 +54,80 @@ import rx.functions.Func1;
public class GraphShardVersionMigration implements DataMigration {
- private final EdgeMetadataSerialization v1Serialization;
+ private static final Logger logger = LoggerFactory.getLogger( GraphShardVersionMigration.class );
private final EdgeMetadataSerialization v2Serialization;
private final GraphManagerFactory graphManagerFactory;
+ private final Keyspace keyspace;
@Inject
- public GraphShardVersionMigration( @PreviousImpl final EdgeMetadataSerialization v1Serialization,
- @CurrentImpl final EdgeMetadataSerialization v2Serialization,
- final GraphManagerFactory graphManagerFactory ) {
- this.v1Serialization = v1Serialization;
+ public GraphShardVersionMigration( @CurrentImpl final EdgeMetadataSerialization v2Serialization,
+ final GraphManagerFactory graphManagerFactory, final Keyspace keyspace ) {
this.v2Serialization = v2Serialization;
this.graphManagerFactory = graphManagerFactory;
+ this.keyspace = keyspace;
}
@Override
public void migrate( final ProgressObserver observer ) throws Throwable {
-// TODO, finish this
-// get each applicationid in our system
-// Observable.create( new ApplicationObservable( graphManagerFactory ) ) .doOnNext( new Action1<Id>() {
-// @Override
-// public void call( final Id id ) {
-//
-// //set up our application scope and graph manager
-// final ApplicationScope applicationScope = new ApplicationScopeImpl( id );
-//
-//
-// final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
-//
-//
-// //load all nodes that are targets of our application node. I.E. entities that have been saved
-// final Observable<Edge> entityNodes = Observable.create( new EdgesFromSourceObservable( applicationScope, id, gm ) );
-//
-// //create our application node
-// final Observable<Id> applicationNode = Observable.just( id );
-//
-// //merge both the specified application node and the entity node so they all get used
-// Observable.merge( applicationNode, entityNodes ).doOnNext( new Action1<Id>() {
-// //load all meta types from and to-and re-save them
-//
-//
-// @Override
-// public void call( final Id id ) {
-// //get the edge types from the source, buffer them, then re-save them. This implicity
-// // updates target edges as well
-// gm.loadEdgesFromSource( new SimpleSearchByEdgeType()
-// new SimpleSearchEdgeType( id, null, null )).buffer( 1000 ).doOnNext(
-//
-// new Action1<List<String>>() {
-// @Override
-// public void call( final List<String> strings ) {
-// v2Serialization.writeEdge( applicationScope, )
-// }
-// } )
-// }
-// } );
-// }
-// } );
+ final AtomicLong counter = new AtomicLong();
+
+ AllEntitiesInSystemObservable.getAllEntitiesInSystem( graphManagerFactory ).flatMap(
+ new Func1<AllEntitiesInSystemObservable.EntityData, Observable<List<Edge>>>() {
+
+
+ @Override
+ public Observable<List<Edge>> call( final AllEntitiesInSystemObservable.EntityData entityData ) {
+ logger.info( "Migrating edges from node {} in scope {}", entityData.entityId,
+ entityData.applicationScope );
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager( entityData.applicationScope );
+
+ //get each edge from this node as a source
+ return EdgesFromSourceObservable.edgesFromSource( entityData.applicationScope,
+ entityData.entityId, gm )
+
+ //for each edge, re-index it in v2 every 1000 edges or less
+ .buffer( 1000 ).doOnNext( new Action1<List<Edge>>() {
+ @Override
+ public void call( final List<Edge> edges ) {
+
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( final Edge edge : edges ) {
+ logger.info( "Migrating meta for edge {}", edge );
+ final MutationBatch edgeBatch =
+ v2Serialization.writeEdge( entityData.applicationScope, edge );
+ batch.mergeShallow( edgeBatch );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform migration", e );
+ }
+
+ //update the observer so the admin can see it
+ final long newCount = counter.addAndGet( edges.size() );
+
+ observer.update( getVersion(), String.format("Finished re-writing %d edges", newCount) );
+
+
+ }
+ } );
+ }
+ } ).toBlocking().lastOrDefault( null );
}
@Override
public int getVersion() {
- return Versions.VERSION_1;
+ return Versions.VERSION_2;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
index e9ee517..b4fe095 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/Versions.java
@@ -31,7 +31,12 @@ import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSeri
public class Versions {
/**
- * Version 1
+ * Version 1 of our mappings
*/
- public static final int VERSION_1 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
+ public static final int VERSION_1 = 1;
+
+ /**
+ * Version 2. Edge meta changes
+ */
+ public static final int VERSION_2 = EdgeMetadataSerializationProxyImpl.MIGRATION_VERSION;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
new file mode 100644
index 0000000..dcd6c89
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application id graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public class AllEntitiesInSystemObservable {
+
+
+ /**
+ * Return an observable that emits all entities in the system.
+ */
+ public static Observable<EntityData> getAllEntitiesInSystem( final GraphManagerFactory graphManagerFactory ) {
+ //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
+ return ApplicationObservable.getAllApplicationIds( graphManagerFactory )
+
+ .flatMap( new Func1<Id, Observable<EntityData>>() {
+ @Override
+ public Observable<EntityData> call( final Id id ) {
+
+ //set up our application scope and graph manager
+ final ApplicationScope applicationScope = new ApplicationScopeImpl( id );
+
+
+ final GraphManager gm =
+ graphManagerFactory.createEdgeManager( applicationScope );
+
+
+ //load all nodes that are targets of our application node. I.E.
+ // entities that have been saved
+ final Observable<Id> entityNodes =
+ TargetIdObservable.getTargetNodes( applicationScope, id, gm );
+
+ //create our application node
+ final Observable<Id> applicationNode = Observable.just( id );
+
+ //merge both the specified application node and the entity node
+ // so they all get used
+ return Observable.merge( applicationNode, entityNodes )
+ .map( new Func1<Id, EntityData>() {
+ @Override
+ public EntityData call( final Id id ) {
+ return new EntityData( applicationScope, id );
+ }
+ } );
+ }
+ } );
+ }
+
+
+ /**
+ * Get the entity data. Immutable bean for fast access
+ */
+ public static final class EntityData {
+ public final ApplicationScope applicationScope;
+ public final Id entityId;
+
+
+ public EntityData( final ApplicationScope applicationScope, final Id entityId ) {
+ this.applicationScope = applicationScope;
+ this.entityId = entityId;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
index 8ead71c..93ba9f0 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.rx;
+import java.util.Arrays;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +38,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Action1;
+import rx.functions.Func1;
import static org.apache.usergrid.corepersistence.NamingUtils.generateApplicationId;
import static org.apache.usergrid.corepersistence.NamingUtils.getApplicationScope;
@@ -44,27 +47,26 @@ import static org.apache.usergrid.corepersistence.NamingUtils.getApplicationScop
/**
* An observable that will emit all application stored in the system.
*/
-public class ApplicationObservable implements Observable.OnSubscribe<Id> {
+public class ApplicationObservable {
- private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
- private final GraphManagerFactory graphManagerFactory;
+ /**
+ * Get all applicationIds as an observable
+ * @param graphManagerFactory
+ * @return
+ */
+ public static Observable<Id> getAllApplicationIds( final GraphManagerFactory graphManagerFactory ) {
- public ApplicationObservable( final GraphManagerFactory graphManagerFactory ) {
- this.graphManagerFactory = graphManagerFactory;
- }
+ //emit our 3 hard coded applications that are used the manage the system first.
+ //this way consumers can perform whatever work they need to on the root system first
- @Override
- public void call( final Subscriber<? super Id> subscriber ) {
+ final Observable<Id> systemIds = Observable.from( Arrays.asList( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ),
+ generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ),
+ generateApplicationId( NamingUtils.SYSTEM_APP_ID ) ) );
- //emit our 3 hard coded applications that are used the manage the system first.
- //this way consumers can perform whatever work they need to on the root system first
- emit( generateApplicationId( NamingUtils.DEFAULT_APPLICATION_ID ), subscriber );
- emit( generateApplicationId( NamingUtils.MANAGEMENT_APPLICATION_ID ), subscriber );
- emit( generateApplicationId( NamingUtils.SYSTEM_APP_ID ), subscriber );
ApplicationScope appScope = getApplicationScope( NamingUtils.SYSTEM_APP_ID );
@@ -75,45 +77,17 @@ public class ApplicationObservable implements Observable.OnSubscribe<Id> {
Id rootAppId = appScope.getApplication();
- Observable<Edge> edges = gm.loadEdgesFromSource(
+ Observable<Id> appIds = gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- null ) );
-
-
- final int count = edges.doOnNext( new Action1<Edge>() {
+ null ) ).map( new Func1<Edge, Id>() {
@Override
- public void call( final Edge edge ) {
- Id applicationId = edge.getTargetNode();
-
-
- logger.debug( "Emitting applicationId of {}", applicationId );
-
- emit( applicationId, subscriber );
+ public Id call( final Edge edge ) {
+ return edge.getTargetNode();
}
- } )
- //if we don't want the count, not sure we need to block. We may just need to subscribe
- .count().toBlocking().last();
+ } );
- logger.debug( "Emitted {} application ids", count );
+ return Observable.merge( systemIds, appIds);
}
- /**
- * Return false if no more items should be emitted, true otherwise
- */
- private boolean emit( final Id appId, final Subscriber<? super Id> subscriber ) {
-
- if ( subscriber.isUnsubscribed() ) {
- return false;
- }
-
- try {
- subscriber.onNext( appId );
- }
- catch ( Throwable t ) {
- subscriber.onError( t );
- }
-
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
index b986d56..88efc7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
@@ -32,42 +32,29 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
-import rx.Subscriber;
import rx.functions.Func1;
/**
* Emits the id of all nodes that are target nodes for the given source node
*/
-public class EdgesFromSourceObservable implements Observable.OnSubscribe<Edge> {
+public class EdgesFromSourceObservable {
private static final Logger logger = LoggerFactory.getLogger( EdgesFromSourceObservable.class );
- private final ApplicationScope applicationScope;
- private final Id sourceNode;
- private final GraphManager gm;
-
-
- public EdgesFromSourceObservable( final ApplicationScope applicationScope, final Id sourceNode,
- final GraphManager gm ) {
- this.applicationScope = applicationScope;
- this.sourceNode = sourceNode;
- this.gm = gm;
- }
-
-
- @Override
- public void call( final Subscriber<? super Edge> subscriber ) {
-
+ /**
+ * Get all edges from the source
+ */
+ public static Observable<Edge> edgesFromSource( final ApplicationScope applicationScope, final Id sourceNode,
+ final GraphManager gm ) {
final Id applicationId = applicationScope.getApplication();
//only search edge types that start with collections
- Observable<String> edgeTypes = gm.getEdgeTypesFromSource(
- new SimpleSearchEdgeType( sourceNode, null, null ) );
+ Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
- edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+ return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
@Override
public Observable<Edge> call( final String edgeType ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
index bd5e12f..91ba741 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
@@ -29,47 +29,39 @@ import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
+import rx.functions.Func1;
/**
* Emits the id of all nodes that are target nodes from the given source node
*/
-public class TargetIdObservable implements Observable.OnSubscribe<Id> {
+public class TargetIdObservable {
private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class );
- private final ApplicationScope applicationScope;
- private final Id sourceNode;
- private final GraphManager gm;
-
-
- public TargetIdObservable( final ApplicationScope applicationScope, final Id sourceNode, final GraphManager gm ) {
- this.applicationScope = applicationScope;
- this.sourceNode = sourceNode;
- this.gm = gm;
- }
-
-
- @Override
- public void call( final Subscriber<? super Id> subscriber ) {
+ /**
+ * Get all nodes that are target nodes from the sourceNode
+ * @param applicationScope
+ * @param sourceNode
+ * @param gm
+ * @return
+ */
+ public static Observable<Id> getTargetNodes(final ApplicationScope applicationScope, final Id sourceNode, final GraphManager gm) {
//only search edge types that start with collections
- Observable.create( new EdgesFromSourceObservable( applicationScope, sourceNode, gm ) )
- .doOnNext( new Action1<Edge>() {
+ return EdgesFromSourceObservable.edgesFromSource( applicationScope, sourceNode, gm ).map( new Func1<Edge, Id>() {
- @Override
- public void call( Edge edge ) {
- logger.info( "Emitting targetId of {}", edge );
+ @Override
+ public Id call( final Edge edge ) {
+ final Id targetNode = edge.getTargetNode();
- final Id targetNode = edge.getTargetNode();
+ logger.info( "Emitting targetId of {}", edge );
- subscriber.onNext( targetNode );
- }
- } ).toBlocking().lastOrDefault( null ); // end foreach on edges
+ return targetNode;
+ }
+ } );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
index a8bd363..6e571d1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManager.java
@@ -579,6 +579,9 @@ public interface EntityManager {
String entityType, String propertyName, Object propertyValue ) throws Exception;
@Deprecated
+ /**
+ * Get an entity by UUID. This will return null if the entity is not found
+ */
public Entity get( UUID id ) throws Exception;
public <A extends Entity> A get( EntityRef entityRef, Class<A> entityClass ) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index 203fa38..b3aeda7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -46,7 +46,7 @@ import com.netflix.astyanax.MutationBatch;
@Singleton
public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerialization {
- public static final int MIGRATION_VERSION = 1;
+ public static final int MIGRATION_VERSION = 2;
private final DataMigrationManager dataMigrationManager;
private final Keyspace keyspace;
[2/4] incubator-usergrid git commit: Finished migrations. Also added
utility functions for streaming data.
Posted by to...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/0f02e823/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7e8e1b2..84f9efe 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -16,18 +16,9 @@
package org.apache.usergrid.corepersistence;
-import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
-import com.google.common.cache.LoadingCache;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
-import com.yammer.metrics.annotation.Metered;
-import static java.lang.String.CASE_INSENSITIVE_ORDER;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
-import static java.util.Arrays.asList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -43,23 +34,10 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.CounterRow;
-import me.prettyprint.hector.api.beans.CounterRows;
-import me.prettyprint.hector.api.beans.CounterSlice;
-import me.prettyprint.hector.api.beans.DynamicComposite;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HCounterColumn;
-import me.prettyprint.hector.api.factory.HFactory;
-import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
-import static me.prettyprint.hector.api.factory.HFactory.createMutator;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.SliceCounterQuery;
-import static org.apache.commons.lang.StringUtils.capitalize;
-import static org.apache.commons.lang.StringUtils.isBlank;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.util.Assert;
import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -77,40 +55,15 @@ import org.apache.usergrid.persistence.IndexBucketLocator;
import org.apache.usergrid.persistence.RelationManager;
import org.apache.usergrid.persistence.Results;
import org.apache.usergrid.persistence.Schema;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
-import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
-import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
-import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
-import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
-import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
import org.apache.usergrid.persistence.SimpleEntityRef;
-import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid;
import org.apache.usergrid.persistence.SimpleRoleRef;
import org.apache.usergrid.persistence.TypedEntity;
import org.apache.usergrid.persistence.cassandra.ApplicationCF;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
-import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
import org.apache.usergrid.persistence.cassandra.CassandraPersistenceUtils;
import org.apache.usergrid.persistence.cassandra.CassandraService;
-import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
import org.apache.usergrid.persistence.cassandra.CounterUtils;
import org.apache.usergrid.persistence.cassandra.GeoIndexManager;
-import static org.apache.usergrid.persistence.cassandra.Serializers.be;
-import static org.apache.usergrid.persistence.cassandra.Serializers.le;
-import static org.apache.usergrid.persistence.cassandra.Serializers.se;
-import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
import org.apache.usergrid.persistence.cassandra.util.TraceParticipant;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -145,20 +98,71 @@ import org.apache.usergrid.persistence.model.field.Field;
import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.apache.usergrid.utils.ClassUtils;
-import static org.apache.usergrid.utils.ClassUtils.cast;
import org.apache.usergrid.utils.CompositeUtils;
+import org.apache.usergrid.utils.StringUtils;
+import org.apache.usergrid.utils.UUIDUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
+import com.google.common.cache.LoadingCache;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
+import com.yammer.metrics.annotation.Metered;
+
+import me.prettyprint.hector.api.Keyspace;
+import me.prettyprint.hector.api.beans.ColumnSlice;
+import me.prettyprint.hector.api.beans.CounterRow;
+import me.prettyprint.hector.api.beans.CounterRows;
+import me.prettyprint.hector.api.beans.CounterSlice;
+import me.prettyprint.hector.api.beans.DynamicComposite;
+import me.prettyprint.hector.api.beans.HColumn;
+import me.prettyprint.hector.api.beans.HCounterColumn;
+import me.prettyprint.hector.api.factory.HFactory;
+import me.prettyprint.hector.api.mutation.Mutator;
+import me.prettyprint.hector.api.query.MultigetSliceCounterQuery;
+import me.prettyprint.hector.api.query.QueryResult;
+import me.prettyprint.hector.api.query.SliceCounterQuery;
+import rx.Observable;
+
+import static java.lang.String.CASE_INSENSITIVE_ORDER;
+import static java.util.Arrays.asList;
+
+import static me.prettyprint.hector.api.factory.HFactory.createCounterSliceQuery;
+import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.commons.lang.StringUtils.capitalize;
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
+import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLENAMES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_ROLETIMES;
+import static org.apache.usergrid.persistence.Schema.DICTIONARY_SETS;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_CREATED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_INACTIVITY;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_MODIFIED;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_NAME;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TIMESTAMP;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_TYPE;
+import static org.apache.usergrid.persistence.Schema.PROPERTY_UUID;
+import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.Schema.TYPE_ENTITY;
+import static org.apache.usergrid.persistence.SimpleEntityRef.getUuid;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.APPLICATION_AGGREGATE_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COMPOSITE_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_COUNTERS;
+import static org.apache.usergrid.persistence.cassandra.ApplicationCF.ENTITY_DICTIONARIES;
+import static org.apache.usergrid.persistence.cassandra.CassandraService.ALL_COUNT;
+import static org.apache.usergrid.persistence.cassandra.Serializers.be;
+import static org.apache.usergrid.persistence.cassandra.Serializers.le;
+import static org.apache.usergrid.persistence.cassandra.Serializers.se;
+import static org.apache.usergrid.persistence.cassandra.Serializers.ue;
+import static org.apache.usergrid.utils.ClassUtils.cast;
import static org.apache.usergrid.utils.ConversionUtils.bytebuffer;
import static org.apache.usergrid.utils.ConversionUtils.getLong;
import static org.apache.usergrid.utils.ConversionUtils.object;
import static org.apache.usergrid.utils.ConversionUtils.string;
import static org.apache.usergrid.utils.InflectionUtils.singularize;
-import org.apache.usergrid.utils.StringUtils;
-import org.apache.usergrid.utils.UUIDUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.util.Assert;
-import rx.Observable;
-
/**
@@ -186,8 +190,6 @@ public class CpEntityManager implements EntityManager {
private boolean skipAggregateCounters;
- private String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
-
/** Short-term cache to keep us from reloading same Entity during single request. */
private LoadingCache<EntityScope, org.apache.usergrid.persistence.model.entity.Entity> entityCache;
@@ -198,8 +200,8 @@ public class CpEntityManager implements EntityManager {
@Override
public void init( EntityManagerFactory emf, UUID applicationId ) {
- Preconditions.checkNotNull(emf, "emf must not be null");
- Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
+ Preconditions.checkNotNull( emf, "emf must not be null" );
+ Preconditions.checkNotNull( applicationId, "applicationId must not be null" );
this.emf = ( CpEntityManagerFactory ) emf;
this.managerCache = this.emf.getManagerCache();
@@ -213,22 +215,23 @@ public class CpEntityManager implements EntityManager {
// set to false for now
this.skipAggregateCounters = false;
- int entityCacheSize = Integer.parseInt(
- cass.getProperties().getProperty("usergrid.entity_cache_size", "100"));
+ int entityCacheSize =
+ Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_size", "100" ) );
- int entityCacheTimeout = Integer.parseInt(
- cass.getProperties().getProperty("usergrid.entity_cache_timeout_ms", "500"));
+ int entityCacheTimeout =
+ Integer.parseInt( cass.getProperties().getProperty( "usergrid.entity_cache_timeout_ms", "500" ) );
- this.entityCache = CacheBuilder.newBuilder()
- .maximumSize( entityCacheSize )
- .expireAfterWrite( entityCacheTimeout, TimeUnit.MILLISECONDS )
- .build( new CacheLoader<EntityScope, org.apache.usergrid.persistence.model.entity.Entity>() {
- public org.apache.usergrid.persistence.model.entity.Entity load( EntityScope es) {
- return managerCache.getEntityCollectionManager(
- es.scope ).load( es.entityId ).toBlocking().lastOrDefault(null);
- }
- }
- );
+ this.entityCache = CacheBuilder.newBuilder().maximumSize( entityCacheSize )
+ .expireAfterWrite( entityCacheTimeout, TimeUnit.MILLISECONDS )
+ .build( new CacheLoader<EntityScope, org.apache.usergrid.persistence.model
+ .entity.Entity>() {
+ public org.apache.usergrid.persistence.model.entity.Entity load(
+ EntityScope es ) {
+ return managerCache.getEntityCollectionManager( es.scope )
+ .load( es.entityId ).toBlocking()
+ .lastOrDefault( null );
+ }
+ } );
}
@@ -239,10 +242,12 @@ public class CpEntityManager implements EntityManager {
}
- /** Needed to support short-term Entity cache. */
+ /** Needed to support short-term Entity cache. */
public static class EntityScope {
CollectionScope scope;
Id entityId;
+
+
public EntityScope( CollectionScope scope, Id entityId ) {
this.scope = scope;
this.entityId = entityId;
@@ -251,21 +256,21 @@ public class CpEntityManager implements EntityManager {
/**
- * Load entity from short-term cache.
- * Package scope so that CpRelationManager can use it too.
- *
+ * Load entity from short-term cache. Package scope so that CpRelationManager can use it too.
+ *
* @param es Carries Entity Id and CollectionScope from which to load Entity.
+ *
* @return Entity or null if not found
*/
org.apache.usergrid.persistence.model.entity.Entity load( EntityScope es ) {
try {
return entityCache.get( es );
-
- } catch ( InvalidCacheLoadException icle ) {
+ }
+ catch ( InvalidCacheLoadException icle ) {
// fine, entity not found
return null;
-
- } catch ( ExecutionException exex ) {
+ }
+ catch ( ExecutionException exex ) {
// uh-oh, more serious problem
throw new RuntimeException( "Error loading entity", exex );
}
@@ -289,8 +294,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public <A extends Entity> A create(
- String entityType, Class<A> entityClass, Map<String, Object> properties ) throws Exception {
+ public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties )
+ throws Exception {
if ( ( entityType != null ) && ( entityType.startsWith( TYPE_ENTITY ) || entityType
.startsWith( "entities" ) ) ) {
@@ -314,8 +319,7 @@ public class CpEntityManager implements EntityManager {
@Override
- public Entity create(
- UUID importId, String entityType, Map<String, Object> properties ) throws Exception {
+ public Entity create( UUID importId, String entityType, Map<String, Object> properties ) throws Exception {
UUID timestampUuid = importId != null ? importId : UUIDUtils.newTimeUUID();
@@ -344,9 +348,8 @@ public class CpEntityManager implements EntityManager {
*/
@Metered( group = "core", name = "EntityManager_create" )
@TraceParticipant
- public <A extends Entity> A create(
- String entityType, Class<A> entityClass, Map<String, Object> properties, UUID importId )
- throws Exception {
+ public <A extends Entity> A create( String entityType, Class<A> entityClass, Map<String, Object> properties,
+ UUID importId ) throws Exception {
UUID timestampUuid = importId != null ? importId : UUIDUtils.newTimeUUID();
@@ -372,57 +375,55 @@ public class CpEntityManager implements EntityManager {
Id id = new SimpleId( entityRef.getUuid(), entityRef.getType() );
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
- CollectionScope collectionScope = new CollectionScopeImpl(
- getApplicationScope().getApplication(), getApplicationScope().getApplication(), collectionName );
+ CollectionScope collectionScope =
+ new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
+ collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
-// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
-// throw new IllegalArgumentException(
-// "Entity Id " + id.getType() + ":"+ id.getUuid() +" uuid not time based");
-// }
+ // if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
+ // throw new IllegalArgumentException(
+ // "Entity Id " + id.getType() + ":"+ id.getUuid() +" uuid not time based");
+ // }
- org.apache.usergrid.persistence.model.entity.Entity cpEntity =
- load( new EntityScope( collectionScope, id ) );
+ org.apache.usergrid.persistence.model.entity.Entity cpEntity = load( new EntityScope( collectionScope, id ) );
if ( cpEntity == null ) {
if ( logger.isDebugEnabled() ) {
- logger.debug( "FAILED to load entity {}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- id.getType(), id.getUuid(),
- collectionScope.getApplication(),
- collectionScope.getOwner(),
- collectionScope.getName()
- } );
+ logger.debug( "FAILED to load entity {}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[] {
+ id.getType(), id.getUuid(), collectionScope.getApplication(),
+ collectionScope.getOwner(), collectionScope.getName()
+ } );
}
return null;
- }
+ }
-// if ( entityRef.getType().equals("group") ) {
-// logger.debug("Reading Group");
-// for ( Field field : cpEntity.getFields() ) {
-// logger.debug(" Reading prop name={} value={}", field.getName(), field.getValue() );
-// }
-// }
+ // if ( entityRef.getType().equals("group") ) {
+ // logger.debug("Reading Group");
+ // for ( Field field : cpEntity.getFields() ) {
+ // logger.debug(" Reading prop name={} value={}", field.getName(), field.getValue() );
+ // }
+ // }
Class clazz = Schema.getDefaultSchema().getEntityClass( entityRef.getType() );
Entity entity = EntityFactory.newEntity( entityRef.getUuid(), entityRef.getType(), clazz );
entity.setProperties( CpEntityMapUtils.toMap( cpEntity ) );
-// if ( entityRef.getType().equals("group") ) {
-// logger.debug("Reading Group " + entity.getProperties());
-// }
+ // if ( entityRef.getType().equals("group") ) {
+ // logger.debug("Reading Group " + entity.getProperties());
+ // }
-// if ( logger.isDebugEnabled() ) {
-// logger.debug( "Loaded entity {}:{} from scope\n app {}\n owner {}\n name {}",
-// new Object[] {
-// id.getType(), id.getUuid(),
-// collectionScope.getApplication(),
-// collectionScope.getOwner(),
-// collectionScope.getName()
-// } );
-// }
+ // if ( logger.isDebugEnabled() ) {
+ // logger.debug( "Loaded entity {}:{} from scope\n app {}\n owner {}\n name {}",
+ // new Object[] {
+ // id.getType(), id.getUuid(),
+ // collectionScope.getApplication(),
+ // collectionScope.getOwner(),
+ // collectionScope.getName()
+ // } );
+ // }
return entity;
}
@@ -459,28 +460,26 @@ public class CpEntityManager implements EntityManager {
Id id = new SimpleId( entityId, type );
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( type );
- CollectionScope collectionScope = new CollectionScopeImpl(
- getApplicationScope().getApplication(), getApplicationScope().getApplication(), collectionName );
+ CollectionScope collectionScope =
+ new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
+ collectionName );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
-// if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
-// throw new IllegalArgumentException(
-// "Entity Id " + id.getType() + ":"+ id.getUuid() +" uuid not time based");
-// }
+ // if ( !UUIDUtils.isTimeBased( id.getUuid() ) ) {
+ // throw new IllegalArgumentException(
+ // "Entity Id " + id.getType() + ":"+ id.getUuid() +" uuid not time based");
+ // }
- org.apache.usergrid.persistence.model.entity.Entity cpEntity =
- load( new EntityScope( collectionScope, id ) );
+ org.apache.usergrid.persistence.model.entity.Entity cpEntity = load( new EntityScope( collectionScope, id ) );
if ( cpEntity == null ) {
if ( logger.isDebugEnabled() ) {
- logger.debug( "FAILED to load entity {}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- id.getType(), id.getUuid(),
- collectionScope.getApplication(),
- collectionScope.getOwner(),
- collectionScope.getName()
- } );
+ logger.debug( "FAILED to load entity {}:{} from scope\n app {}\n owner {}\n name {}",
+ new Object[] {
+ id.getType(), id.getUuid(), collectionScope.getApplication(),
+ collectionScope.getOwner(), collectionScope.getName()
+ } );
}
return null;
}
@@ -493,9 +492,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results get(
- Collection<UUID> entityIds, Class<? extends Entity> entityClass, Level resultsLevel )
- throws Exception {
+ public Results get( Collection<UUID> entityIds, Class<? extends Entity> entityClass, Level resultsLevel )
+ throws Exception {
String type = Schema.getDefaultSchema().getEntityType( entityClass );
@@ -516,7 +514,7 @@ public class CpEntityManager implements EntityManager {
@Override
public Results get( Collection<UUID> entityIds, String entityType, Class<? extends Entity> entityClass,
- Level resultsLevel ) throws Exception {
+ Level resultsLevel ) throws Exception {
throw new UnsupportedOperationException( "Not supported yet." );
}
@@ -525,47 +523,44 @@ public class CpEntityManager implements EntityManager {
public void update( Entity entity ) throws Exception {
// first, update entity index in its own collection scope
- CollectionScope collectionScope = new CollectionScopeImpl(
- getApplicationScope().getApplication(),
- getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
+ CollectionScope collectionScope =
+ new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( entity.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
Id entityId = new SimpleId( entity.getUuid(), entity.getType() );
if ( logger.isDebugEnabled() ) {
- logger.debug( "Updating entity {}:{} from scope\n app {}\n owner {}\n name {}",
- new Object[] {
- entityId.getType(), entityId.getUuid(),
- collectionScope.getApplication(),
- collectionScope.getOwner(),
- collectionScope.getName()
- } );
+ logger.debug( "Updating entity {}:{} from scope\n app {}\n owner {}\n name {}", new Object[] {
+ entityId.getType(), entityId.getUuid(), collectionScope.getApplication(),
+ collectionScope.getOwner(), collectionScope.getName()
+ } );
}
-// if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
-// throw new IllegalArgumentException(
-// "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
-// }
+ // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
+ // throw new IllegalArgumentException(
+ // "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
+ // }
-// org.apache.usergrid.persistence.model.entity.Entity cpEntity =
-// ecm.load( entityId ).toBlockingObservable().last();
+ // org.apache.usergrid.persistence.model.entity.Entity cpEntity =
+ // ecm.load( entityId ).toBlockingObservable().last();
- org.apache.usergrid.persistence.model.entity.Entity cpEntity =
+ org.apache.usergrid.persistence.model.entity.Entity cpEntity =
new org.apache.usergrid.persistence.model.entity.Entity( entityId );
- cpEntity = CpEntityMapUtils.fromMap(
- cpEntity, entity.getProperties(), entity.getType(), true );
+ cpEntity = CpEntityMapUtils.fromMap( cpEntity, entity.getProperties(), entity.getType(), true );
try {
cpEntity = ecm.update( cpEntity ).toBlockingObservable().last();
+
// need to reload entity so bypass entity cache
cpEntity = ecm.load( entityId ).toBlockingObservable().last();
- logger.debug("Wrote {}:{} version {}", new Object[] {
- cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
+ logger.debug( "Wrote {}:{} version {}", new Object[] {
+ cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
+ } );
}
catch ( WriteUniqueVerifyException wuve ) {
handleWriteUniqueVerifyException( entity, wuve );
@@ -579,38 +574,37 @@ public class CpEntityManager implements EntityManager {
}
// update in all containing collections and connection indexes
- CpRelationManager rm = (CpRelationManager)getRelationManager( entity );
+ CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
}
@Override
public void delete( EntityRef entityRef ) throws Exception {
- deleteAsync( entityRef ).toBlocking().lastOrDefault(null);
+ deleteAsync( entityRef ).toBlocking().lastOrDefault( null );
//delete from our UUID index
MapManager mm = getMapManagerForTypes();
- mm.delete(entityRef.getUuid().toString() );
+ mm.delete( entityRef.getUuid().toString() );
}
private Observable deleteAsync( EntityRef entityRef ) throws Exception {
- CollectionScope collectionScope = new CollectionScopeImpl(
- getApplicationScope().getApplication(),
- getApplicationScope().getApplication(),
- CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
+ CollectionScope collectionScope =
+ new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
-// if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
-// throw new IllegalArgumentException(
-// "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
-// }
+ // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
+ // throw new IllegalArgumentException(
+ // "Entity Id " + entityId.getType() + ":"+ entityId.getUuid() +" uuid not time based");
+ // }
- org.apache.usergrid.persistence.model.entity.Entity entity =
- load( new EntityScope( collectionScope, entityId ) );
+ org.apache.usergrid.persistence.model.entity.Entity entity =
+ load( new EntityScope( collectionScope, entityId ) );
if ( entity != null ) {
@@ -619,15 +613,13 @@ public class CpEntityManager implements EntityManager {
RelationManager rm = getRelationManager( entityRef );
Map<String, Map<UUID, Set<String>>> owners = rm.getOwners();
- logger.debug( "Deleting indexes of all {} collections owning the entity",
- owners.keySet().size() );
+ logger.debug( "Deleting indexes of all {} collections owning the entity", owners.keySet().size() );
- final EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
+ final EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
final EntityIndexBatch batch = ei.createBatch();
-
for ( String ownerType : owners.keySet() ) {
Map<UUID, Set<String>> collectionsByUuid = owners.get( ownerType );
@@ -635,8 +627,7 @@ public class CpEntityManager implements EntityManager {
Set<String> collectionNames = collectionsByUuid.get( uuid );
for ( String coll : collectionNames ) {
- IndexScope indexScope = new IndexScopeImpl(
- new SimpleId( uuid, ownerType ),
+ IndexScope indexScope = new IndexScopeImpl( new SimpleId( uuid, ownerType ),
CpNamingUtils.getCollectionScopeNameFromCollectionName( coll ) );
@@ -646,20 +637,18 @@ public class CpEntityManager implements EntityManager {
}
-
// deindex from default index scope
- IndexScope defaultIndexScope = new IndexScopeImpl(
- getApplicationScope().getApplication(),
+ IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
- batch.deindex(defaultIndexScope, entity );
+ batch.deindex( defaultIndexScope, entity );
-// TODO REMOVE INDEX CODE
-// IndexScope allTypesIndexScope = new IndexScopeImpl(
-// getApplicationScope().getApplication(),
-// CpNamingUtils.ALL_TYPES, entityType );
-//
-// batch.deindex( allTypesIndexScope, entity );
+ // TODO REMOVE INDEX CODE
+ // IndexScope allTypesIndexScope = new IndexScopeImpl(
+ // getApplicationScope().getApplication(),
+ // CpNamingUtils.ALL_TYPES, entityType );
+ //
+ // batch.deindex( allTypesIndexScope, entity );
batch.execute();
@@ -683,8 +672,7 @@ public class CpEntityManager implements EntityManager {
public void decrementEntityCollection( String collection_name, long cassandraTimestamp ) {
try {
- incrementAggregateCounters( null, null, null,
- APPLICATION_COLLECTION + collection_name, -ONE_COUNT,
+ incrementAggregateCounters( null, null, null, APPLICATION_COLLECTION + collection_name, -ONE_COUNT,
cassandraTimestamp );
}
catch ( Exception e ) {
@@ -692,29 +680,26 @@ public class CpEntityManager implements EntityManager {
new Object[] { collection_name, e } );
}
try {
- incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, -ONE_COUNT,
- cassandraTimestamp );
+ incrementAggregateCounters( null, null, null, APPLICATION_ENTITIES, -ONE_COUNT, cassandraTimestamp );
}
catch ( Exception e ) {
- logger.error( "Unable to decrement counter application.entities for collection: {} "
- + "with timestamp: {}",
+ logger.error( "Unable to decrement counter application.entities for collection: {} " + "with timestamp: {}",
new Object[] { collection_name, cassandraTimestamp, e } );
}
}
@Override
- public Results searchCollection( EntityRef entityRef, String collectionName, Query query )
- throws Exception {
+ public Results searchCollection( EntityRef entityRef, String collectionName, Query query ) throws Exception {
return getRelationManager( entityRef ).searchCollection( collectionName, query );
}
-//
-// @Override
-// public void setApplicationId( UUID applicationId ) {
-// this.applicationId = applicationId;
-// }
+ //
+ // @Override
+ // public void setApplicationId( UUID applicationId ) {
+ // this.applicationId = applicationId;
+ // }
@Override
@@ -748,8 +733,7 @@ public class CpEntityManager implements EntityManager {
@Override
public void updateApplication( Map<String, Object> properties ) throws Exception {
- this.updateProperties(
- new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), properties );
+ this.updateProperties( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), properties );
this.application = get( applicationId, Application.class );
}
@@ -766,7 +750,6 @@ public class CpEntityManager implements EntityManager {
public Set<String> getApplicationCollections() throws Exception {
return getRelationManager( getApplication() ).getCollections();
-
}
@@ -778,7 +761,7 @@ public class CpEntityManager implements EntityManager {
if ( collections != null ) {
for ( String collectionCode : collections ) {
- String collectionName = collectionCode.split("\\|")[0];
+ String collectionName = collectionCode.split( "\\|" )[0];
if ( !Schema.isAssociatedEntityType( collectionName ) ) {
Long count = counts.get( APPLICATION_COLLECTION + collectionName );
@@ -791,7 +774,7 @@ public class CpEntityManager implements EntityManager {
}
}
}
- /*
+ /*
* if ((counts != null) && !counts.isEmpty()) { metadata.put("counters",
* counts); }
*/
@@ -807,32 +790,29 @@ public class CpEntityManager implements EntityManager {
@Override
public void createApplicationCollection( String entityType ) throws Exception {
- create(entityType,null);
+ create( entityType, null );
}
@Override
public EntityRef getAlias( String aliasType, String alias ) throws Exception {
- return getAlias( new SimpleEntityRef(
- Application.ENTITY_TYPE, applicationId ), aliasType, alias );
+ return getAlias( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), aliasType, alias );
}
@Override
- public EntityRef getAlias( EntityRef ownerRef, String collectionType, String aliasValue )
- throws Exception {
+ public EntityRef getAlias( EntityRef ownerRef, String collectionType, String aliasValue ) throws Exception {
Assert.notNull( ownerRef, "ownerRef is required" );
Assert.notNull( collectionType, "collectionType is required" );
Assert.notNull( aliasValue, "aliasValue is required" );
- logger.debug("getAlias() for collection type {} alias {}", collectionType, aliasValue );
+ logger.debug( "getAlias() for collection type {} alias {}", collectionType, aliasValue );
String collName = Schema.defaultCollectionName( collectionType );
- Map<String, EntityRef> results = getAlias(
- ownerRef, collName, Collections.singletonList( aliasValue ) );
+ Map<String, EntityRef> results = getAlias( ownerRef, collName, Collections.singletonList( aliasValue ) );
if ( results == null || results.size() == 0 ) {
return null;
@@ -842,7 +822,7 @@ public class CpEntityManager implements EntityManager {
// TODO When we get an event system, trigger a repair if this is detected
if ( results.size() > 1 ) {
logger.warn( "More than 1 entity with Owner id '{}' of type '{}' "
- + "and alias '{}' exists. This is a duplicate alias, and needs audited",
+ + "and alias '{}' exists. This is a duplicate alias, and needs audited",
new Object[] { ownerRef, collectionType, aliasValue } );
}
@@ -851,22 +831,19 @@ public class CpEntityManager implements EntityManager {
@Override
- public Map<String, EntityRef> getAlias( String aliasType, List<String> aliases )
- throws Exception {
+ public Map<String, EntityRef> getAlias( String aliasType, List<String> aliases ) throws Exception {
String collName = Schema.defaultCollectionName( aliasType );
- return getAlias(
- new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), collName, aliases );
+ return getAlias( new SimpleEntityRef( Application.ENTITY_TYPE, applicationId ), collName, aliases );
}
@Override
- public Map<String, EntityRef> getAlias(
- EntityRef ownerRef, String collName, List<String> aliases )
+ public Map<String, EntityRef> getAlias( EntityRef ownerRef, String collName, List<String> aliases )
throws Exception {
- logger.debug("getAliases() for collection {} aliases {}", collName, aliases );
+ logger.debug( "getAliases() for collection {} aliases {}", collName, aliases );
Assert.notNull( ownerRef, "ownerRef is required" );
Assert.notNull( collName, "collectionName is required" );
@@ -878,8 +855,7 @@ public class CpEntityManager implements EntityManager {
for ( String alias : aliases ) {
- Iterable<EntityRef> refs =
- getEntityRefsForUniqueProperty( collName, propertyName, alias );
+ Iterable<EntityRef> refs = getEntityRefsForUniqueProperty( collName, propertyName, alias );
for ( EntityRef ref : refs ) {
results.put( alias, ref );
@@ -890,8 +866,8 @@ public class CpEntityManager implements EntityManager {
}
- private Iterable<EntityRef> getEntityRefsForUniqueProperty( String collName, String propName,
- String alias ) throws Exception {
+ private Iterable<EntityRef> getEntityRefsForUniqueProperty( String collName, String propName, String alias )
+ throws Exception {
final Id id = getIdForUniqueEntityField( collName, propName, alias );
@@ -910,6 +886,7 @@ public class CpEntityManager implements EntityManager {
return validate( entityRef, true );
}
+
public EntityRef validate( EntityRef entityRef, boolean verify ) throws Exception {
if ( ( entityRef == null ) || ( entityRef.getUuid() == null ) ) {
@@ -920,20 +897,18 @@ public class CpEntityManager implements EntityManager {
UUID entityId = entityRef.getUuid();
String entityType = entityRef.getType();
try {
- get( entityRef ).getType();
+ get( entityRef ).getType();
}
catch ( Exception e ) {
- logger.error( "Unable to load entity "
- + entityRef.getType() + ":" + entityRef.getUuid(), e );
+ logger.error( "Unable to load entity " + entityRef.getType() + ":" + entityRef.getUuid(), e );
}
if ( entityRef == null ) {
- throw new EntityNotFoundException(
- "Entity " + entityId.toString() + " cannot be verified" );
+ throw new EntityNotFoundException( "Entity " + entityId.toString() + " cannot be verified" );
}
if ( ( entityType != null ) && !entityType.equalsIgnoreCase( entityRef.getType() ) ) {
- throw new UnexpectedEntityTypeException( "Entity " + entityId
- + " is not the expected type, expected " + entityType
- + ", found " + entityRef.getType() );
+ throw new UnexpectedEntityTypeException(
+ "Entity " + entityId + " is not the expected type, expected " + entityType + ", found "
+ + entityRef.getType() );
}
}
return entityRef;
@@ -949,8 +924,7 @@ public class CpEntityManager implements EntityManager {
@Override
- public List<Entity> getPartialEntities(
- Collection<UUID> ids, Collection<String> properties ) throws Exception {
+ public List<Entity> getPartialEntities( Collection<UUID> ids, Collection<String> properties ) throws Exception {
throw new UnsupportedOperationException( "Not supported yet." );
}
@@ -964,16 +938,14 @@ public class CpEntityManager implements EntityManager {
@Override
- public void setProperty(
- EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
+ public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue ) throws Exception {
setProperty( entityRef, propertyName, propertyValue, false );
}
@Override
- public void setProperty(
- EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
+ public void setProperty( EntityRef entityRef, String propertyName, Object propertyValue, boolean override )
throws Exception {
if ( ( propertyValue instanceof String ) && ( ( String ) propertyValue ).equals( "" ) ) {
@@ -982,8 +954,8 @@ public class CpEntityManager implements EntityManager {
Entity entity = get( entityRef );
- propertyValue = Schema.getDefaultSchema().validateEntityPropertyValue(
- entity.getType(), propertyName, propertyValue );
+ propertyValue =
+ Schema.getDefaultSchema().validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
entity.setProperty( propertyName, propertyValue );
entity.setProperty( PROPERTY_MODIFIED, UUIDUtils.getTimestampInMillis( UUIDUtils.newTimeUUID() ) );
@@ -1015,8 +987,8 @@ public class CpEntityManager implements EntityManager {
boolean entitySchemaHasProperty = defaultSchema.hasProperty( entity.getType(), propertyName );
- propertyValue = Schema.getDefaultSchema().validateEntityPropertyValue(
- entity.getType(), propertyName, propertyValue );
+ propertyValue = Schema.getDefaultSchema()
+ .validateEntityPropertyValue( entity.getType(), propertyName, propertyValue );
if ( entitySchemaHasProperty ) {
@@ -1024,8 +996,7 @@ public class CpEntityManager implements EntityManager {
continue;
}
- if ( ( propertyValue == null )
- && defaultSchema.isRequiredProperty( entity.getType(), propertyName ) ) {
+ if ( ( propertyValue == null ) && defaultSchema.isRequiredProperty( entity.getType(), propertyName ) ) {
continue;
}
}
@@ -1042,66 +1013,63 @@ public class CpEntityManager implements EntityManager {
String collectionName = CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() );
- CollectionScope collectionScope = new CollectionScopeImpl(
- getApplicationScope().getApplication(),
- getApplicationScope().getApplication(),
- collectionName );
+ CollectionScope collectionScope =
+ new CollectionScopeImpl( getApplicationScope().getApplication(), getApplicationScope().getApplication(),
+ collectionName );
- IndexScope defaultIndexScope = new IndexScopeImpl(
- getApplicationScope().getApplication(),
+ IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
CpNamingUtils.getCollectionScopeNameFromEntityType( entityRef.getType() ) );
EntityCollectionManager ecm = managerCache.getEntityCollectionManager( collectionScope );
- EntityIndex ei = managerCache.getEntityIndex(getApplicationScope());
+ EntityIndex ei = managerCache.getEntityIndex( getApplicationScope() );
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
-// if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
-// throw new IllegalArgumentException(
-// "Entity Id " + entityId.getType() + ":"+entityId.getUuid() +" uuid not time based");
-// }
+ // if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
+ // throw new IllegalArgumentException(
+ // "Entity Id " + entityId.getType() + ":"+entityId.getUuid() +" uuid not time based");
+ // }
org.apache.usergrid.persistence.model.entity.Entity cpEntity =
- load( new EntityScope( collectionScope, entityId ) );
+ load( new EntityScope( collectionScope, entityId ) );
cpEntity.removeField( propertyName );
- logger.debug("About to Write {}:{} version {}", new Object[] {
- cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
+ logger.debug( "About to Write {}:{} version {}", new Object[] {
+ cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
+ } );
cpEntity = ecm.write( cpEntity ).toBlockingObservable().last();
- logger.debug("Wrote {}:{} version {}", new Object[] {
- cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion() });
+ logger.debug( "Wrote {}:{} version {}", new Object[] {
+ cpEntity.getId().getType(), cpEntity.getId().getUuid(), cpEntity.getVersion()
+ } );
- ei.createBatch().index(defaultIndexScope, cpEntity ).execute();
+ ei.createBatch().index( defaultIndexScope, cpEntity ).execute();
// update in all containing collections and connection indexes
- CpRelationManager rm = (CpRelationManager)getRelationManager( entityRef );
+ CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
}
@Override
- public Set<Object> getDictionaryAsSet( EntityRef entityRef, String dictionaryName )
- throws Exception {
+ public Set<Object> getDictionaryAsSet( EntityRef entityRef, String dictionaryName ) throws Exception {
return new LinkedHashSet<>( getDictionaryAsMap( entityRef, dictionaryName ).keySet() );
}
@Override
- public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue )
- throws Exception {
+ public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementValue ) throws Exception {
addToDictionary( entityRef, dictionaryName, elementValue, null );
}
@Override
- public void addToDictionary(
- EntityRef entityRef, String dictionaryName, Object elementName, Object elementValue )
+ public void addToDictionary( EntityRef entityRef, String dictionaryName, Object elementName, Object elementValue )
throws Exception {
if ( elementName == null ) {
@@ -1111,11 +1079,9 @@ public class CpEntityManager implements EntityManager {
EntityRef entity = get( entityRef );
UUID timestampUuid = UUIDUtils.newTimeUUID();
- Mutator<ByteBuffer> batch = createMutator(
- cass.getApplicationKeyspace( applicationId ), be );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
- batch = batchUpdateDictionary(
- batch, entity, dictionaryName, elementName, elementValue, false, timestampUuid );
+ batch = batchUpdateDictionary( batch, entity, dictionaryName, elementName, elementValue, false, timestampUuid );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
@@ -1132,12 +1098,10 @@ public class CpEntityManager implements EntityManager {
EntityRef entity = get( entityRef );
UUID timestampUuid = UUIDUtils.newTimeUUID();
- Mutator<ByteBuffer> batch = createMutator(
- cass.getApplicationKeyspace( applicationId ), be );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
for ( Object elementValue : elementValues ) {
- batch = batchUpdateDictionary(
- batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
+ batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue, null, false, timestampUuid );
}
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -1145,8 +1109,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public void addMapToDictionary(
- EntityRef entityRef, String dictionaryName, Map<?, ?> elementValues ) throws Exception {
+ public void addMapToDictionary( EntityRef entityRef, String dictionaryName, Map<?, ?> elementValues )
+ throws Exception {
if ( ( elementValues == null ) || elementValues.isEmpty() || entityRef == null ) {
return;
@@ -1155,8 +1119,7 @@ public class CpEntityManager implements EntityManager {
EntityRef entity = get( entityRef );
UUID timestampUuid = UUIDUtils.newTimeUUID();
- Mutator<ByteBuffer> batch =
- createMutator( cass.getApplicationKeyspace( applicationId ), be );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
for ( Map.Entry<?, ?> elementValue : elementValues.entrySet() ) {
batch = batchUpdateDictionary( batch, entity, dictionaryName, elementValue.getKey(),
@@ -1168,8 +1131,7 @@ public class CpEntityManager implements EntityManager {
@Override
- public Map<Object, Object> getDictionaryAsMap(
- EntityRef entity, String dictionaryName ) throws Exception {
+ public Map<Object, Object> getDictionaryAsMap( EntityRef entity, String dictionaryName ) throws Exception {
entity = validate( entity );
@@ -1177,8 +1139,7 @@ public class CpEntityManager implements EntityManager {
ApplicationCF dictionaryCf = null;
- boolean entityHasDictionary =
- Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
+ boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
@@ -1187,10 +1148,8 @@ public class CpEntityManager implements EntityManager {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
- Class<?> setType = Schema.getDefaultSchema().getDictionaryKeyType(
- entity.getType(), dictionaryName );
- Class<?> setCoType = Schema.getDefaultSchema().getDictionaryValueType(
- entity.getType(), dictionaryName );
+ Class<?> setType = Schema.getDefaultSchema().getDictionaryKeyType( entity.getType(), dictionaryName );
+ Class<?> setCoType = Schema.getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( setCoType );
List<HColumn<ByteBuffer, ByteBuffer>> results =
@@ -1209,8 +1168,7 @@ public class CpEntityManager implements EntityManager {
value = object( setCoType, result.getValue() );
}
else if ( result.getValue().remaining() > 0 ) {
- value = Schema.deserializePropertyValueFromJsonBinary(
- result.getValue().slice(), setCoType );
+ value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), setCoType );
}
if ( name != null ) {
dictionary.put( name, value );
@@ -1222,31 +1180,30 @@ public class CpEntityManager implements EntityManager {
@Override
- public Object getDictionaryElementValue(
- EntityRef entity, String dictionaryName, String elementName ) throws Exception {
+ public Object getDictionaryElementValue( EntityRef entity, String dictionaryName, String elementName )
+ throws Exception {
- if ( entity == null) {
- throw new RuntimeException("Entity is null");
+ if ( entity == null ) {
+ throw new RuntimeException( "Entity is null" );
}
- if ( dictionaryName == null) {
- throw new RuntimeException("dictionaryName is null");
+ if ( dictionaryName == null ) {
+ throw new RuntimeException( "dictionaryName is null" );
}
- if ( elementName == null) {
- throw new RuntimeException("elementName is null");
+ if ( elementName == null ) {
+ throw new RuntimeException( "elementName is null" );
}
- if ( Schema.getDefaultSchema() == null) {
- throw new RuntimeException("Schema.getDefaultSchema() is null");
+ if ( Schema.getDefaultSchema() == null ) {
+ throw new RuntimeException( "Schema.getDefaultSchema() is null" );
}
Object value = null;
ApplicationCF dictionaryCf = null;
- boolean entityHasDictionary =
- Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
+ boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
@@ -1255,23 +1212,22 @@ public class CpEntityManager implements EntityManager {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
- Class<?> dictionaryCoType = Schema.getDefaultSchema().getDictionaryValueType(
- entity.getType(), dictionaryName );
+ Class<?> dictionaryCoType =
+ Schema.getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
- HColumn<ByteBuffer, ByteBuffer> result = cass.getColumn(
- cass.getApplicationKeyspace( applicationId ), dictionaryCf,
- CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
- entityHasDictionary ? bytebuffer( elementName )
- : DynamicComposite.toByteBuffer( elementName ), be, be );
+ HColumn<ByteBuffer, ByteBuffer> result =
+ cass.getColumn( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
+ CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
+ entityHasDictionary ? bytebuffer( elementName ) : DynamicComposite.toByteBuffer( elementName ),
+ be, be );
if ( result != null ) {
if ( entityHasDictionary && coTypeIsBasic ) {
value = object( dictionaryCoType, result.getValue() );
}
else if ( result.getValue().remaining() > 0 ) {
- value = Schema.deserializePropertyValueFromJsonBinary(
- result.getValue().slice(), dictionaryCoType );
+ value = Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(), dictionaryCoType );
}
}
else {
@@ -1283,15 +1239,14 @@ public class CpEntityManager implements EntityManager {
@Metered( group = "core", name = "EntityManager_getDictionaryElementValues" )
- public Map<String, Object> getDictionaryElementValues(
- EntityRef entity, String dictionaryName, String... elementNames ) throws Exception {
+ public Map<String, Object> getDictionaryElementValues( EntityRef entity, String dictionaryName,
+ String... elementNames ) throws Exception {
Map<String, Object> values = null;
ApplicationCF dictionaryCf = null;
- boolean entityHasDictionary =
- Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
+ boolean entityHasDictionary = Schema.getDefaultSchema().hasDictionary( entity.getType(), dictionaryName );
if ( entityHasDictionary ) {
dictionaryCf = ENTITY_DICTIONARIES;
@@ -1300,8 +1255,8 @@ public class CpEntityManager implements EntityManager {
dictionaryCf = ENTITY_COMPOSITE_DICTIONARIES;
}
- Class<?> dictionaryCoType = Schema.getDefaultSchema().getDictionaryValueType(
- entity.getType(), dictionaryName );
+ Class<?> dictionaryCoType =
+ Schema.getDefaultSchema().getDictionaryValueType( entity.getType(), dictionaryName );
boolean coTypeIsBasic = ClassUtils.isBasicType( dictionaryCoType );
ByteBuffer[] columnNames = new ByteBuffer[elementNames.length];
@@ -1312,8 +1267,7 @@ public class CpEntityManager implements EntityManager {
ColumnSlice<ByteBuffer, ByteBuffer> results =
cass.getColumns( cass.getApplicationKeyspace( applicationId ), dictionaryCf,
- CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ),
- columnNames, be, be );
+ CassandraPersistenceUtils.key( entity.getUuid(), dictionaryName ), columnNames, be, be );
if ( results != null ) {
values = new HashMap<String, Object>();
for ( HColumn<ByteBuffer, ByteBuffer> result : results.getColumns() ) {
@@ -1323,8 +1277,8 @@ public class CpEntityManager implements EntityManager {
values.put( name, object( dictionaryCoType, result.getValue() ) );
}
else if ( result.getValue().remaining() > 0 ) {
- values.put( name, Schema.deserializePropertyValueFromJsonBinary(
- result.getValue().slice(), dictionaryCoType ) );
+ values.put( name, Schema.deserializePropertyValueFromJsonBinary( result.getValue().slice(),
+ dictionaryCoType ) );
}
}
}
@@ -1337,8 +1291,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public void removeFromDictionary(
- EntityRef entityRef, String dictionaryName, Object elementName ) throws Exception {
+ public void removeFromDictionary( EntityRef entityRef, String dictionaryName, Object elementName )
+ throws Exception {
if ( elementName == null ) {
return;
}
@@ -1346,11 +1300,9 @@ public class CpEntityManager implements EntityManager {
EntityRef entity = get( entityRef );
UUID timestampUuid = UUIDUtils.newTimeUUID();
- Mutator<ByteBuffer> batch =
- createMutator( cass.getApplicationKeyspace( applicationId ), be );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
- batch = batchUpdateDictionary(
- batch, entity, dictionaryName, elementName, true, timestampUuid );
+ batch = batchUpdateDictionary( batch, entity, dictionaryName, elementName, true, timestampUuid );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
@@ -1370,16 +1322,14 @@ public class CpEntityManager implements EntityManager {
@Override
- public boolean isCollectionMember(
- EntityRef owner, String collectionName, EntityRef entity ) throws Exception {
+ public boolean isCollectionMember( EntityRef owner, String collectionName, EntityRef entity ) throws Exception {
return getRelationManager( owner ).isCollectionMember( collectionName, entity );
}
@Override
- public boolean isConnectionMember(
- EntityRef owner, String connectionName, EntityRef entity ) throws Exception {
+ public boolean isConnectionMember( EntityRef owner, String connectionName, EntityRef entity ) throws Exception {
return getRelationManager( owner ).isConnectionMember( connectionName, entity );
}
@@ -1393,9 +1343,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results getCollection(
- EntityRef entityRef, String collectionName, UUID startResult, int count,
- Level resultsLevel, boolean reversed ) throws Exception {
+ public Results getCollection( EntityRef entityRef, String collectionName, UUID startResult, int count,
+ Level resultsLevel, boolean reversed ) throws Exception {
return getRelationManager( entityRef )
.getCollection( collectionName, startResult, count, resultsLevel, reversed );
@@ -1403,30 +1352,30 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results getCollection(
- UUID entityId, String collectionName, Query query, Level resultsLevel ) throws Exception {
+ public Results getCollection( UUID entityId, String collectionName, Query query, Level resultsLevel )
+ throws Exception {
throw new UnsupportedOperationException( "Cannot get entity by UUID alone" );
}
@Override
- public Entity addToCollection(
- EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
+ public Entity addToCollection( EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
return getRelationManager( entityRef ).addToCollection( collectionName, itemRef );
}
@Override
- public Entity addToCollections(
- List<EntityRef> ownerEntities, String collectionName, EntityRef itemRef ) throws Exception {
+ public Entity addToCollections( List<EntityRef> ownerEntities, String collectionName, EntityRef itemRef )
+ throws Exception {
// don't fetch entity if we've already got one
final Entity entity;
if ( itemRef instanceof Entity ) {
- entity = (Entity)itemRef;
- } else {
+ entity = ( Entity ) itemRef;
+ }
+ else {
entity = get( itemRef );
}
@@ -1439,37 +1388,32 @@ public class CpEntityManager implements EntityManager {
@Override
- public Entity createItemInCollection(
- EntityRef entityRef, String collectionName, String itemType,
- Map<String, Object> props ) throws Exception {
+ public Entity createItemInCollection( EntityRef entityRef, String collectionName, String itemType,
+ Map<String, Object> props ) throws Exception {
- return getRelationManager( entityRef )
- .createItemInCollection( collectionName, itemType, props );
+ return getRelationManager( entityRef ).createItemInCollection( collectionName, itemType, props );
}
@Override
- public void removeFromCollection(
- EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
+ public void removeFromCollection( EntityRef entityRef, String collectionName, EntityRef itemRef ) throws Exception {
- getRelationManager( entityRef ).removeFromCollection(collectionName, itemRef);
+ getRelationManager( entityRef ).removeFromCollection( collectionName, itemRef );
}
@Override
- public Set<String> getCollectionIndexes(
- EntityRef entity, String collectionName ) throws Exception {
+ public Set<String> getCollectionIndexes( EntityRef entity, String collectionName ) throws Exception {
return getRelationManager( entity ).getCollectionIndexes( collectionName );
}
@Override
- public void copyRelationships( EntityRef srcEntityRef, String srcRelationName,
- EntityRef dstEntityRef, String dstRelationName ) throws Exception {
+ public void copyRelationships( EntityRef srcEntityRef, String srcRelationName, EntityRef dstEntityRef,
+ String dstRelationName ) throws Exception {
- getRelationManager( srcEntityRef )
- .copyRelationships( srcRelationName, dstEntityRef, dstRelationName );
+ getRelationManager( srcEntityRef ).copyRelationships( srcRelationName, dstEntityRef, dstRelationName );
}
@@ -1483,53 +1427,50 @@ public class CpEntityManager implements EntityManager {
@Override
public ConnectionRef createConnection( EntityRef connectingEntity, String connectionType,
- EntityRef connectedEntityRef ) throws Exception {
+ EntityRef connectedEntityRef ) throws Exception {
- return getRelationManager( connectingEntity )
- .createConnection( connectionType, connectedEntityRef );
+ return getRelationManager( connectingEntity ).createConnection( connectionType, connectedEntityRef );
}
@Override
- public ConnectionRef createConnection( EntityRef connectingEntity, String pairedConnectionType,
- EntityRef pairedEntity, String connectionType, EntityRef connectedEntityRef )
+ public ConnectionRef createConnection( EntityRef connectingEntity, String pairedConnectionType,
+ EntityRef pairedEntity, String connectionType, EntityRef connectedEntityRef )
throws Exception {
- return getRelationManager( connectingEntity ).createConnection(
- pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
+ return getRelationManager( connectingEntity )
+ .createConnection( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
}
@Override
- public ConnectionRef createConnection(
- EntityRef connectingEntity, ConnectedEntityRef... connections ) throws Exception {
+ public ConnectionRef createConnection( EntityRef connectingEntity, ConnectedEntityRef... connections )
+ throws Exception {
return getRelationManager( connectingEntity ).connectionRef( connections );
}
@Override
- public ConnectionRef connectionRef( EntityRef connectingEntity, String connectionType,
- EntityRef connectedEntityRef ) throws Exception {
+ public ConnectionRef connectionRef( EntityRef connectingEntity, String connectionType,
+ EntityRef connectedEntityRef ) throws Exception {
- return new ConnectionRefImpl( connectingEntity.getType(), connectingEntity.getUuid(),
- connectionType, connectedEntityRef.getType(), connectedEntityRef.getUuid() );
+ return new ConnectionRefImpl( connectingEntity.getType(), connectingEntity.getUuid(), connectionType,
+ connectedEntityRef.getType(), connectedEntityRef.getUuid() );
}
@Override
- public ConnectionRef connectionRef( EntityRef connectingEntity, String pairedConnectionType,
- EntityRef pairedEntity, String connectionType, EntityRef connectedEntityRef )
- throws Exception {
+ public ConnectionRef connectionRef( EntityRef connectingEntity, String pairedConnectionType, EntityRef pairedEntity,
+ String connectionType, EntityRef connectedEntityRef ) throws Exception {
- return getRelationManager( connectingEntity ).connectionRef(
- pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
+ return getRelationManager( connectingEntity )
+ .connectionRef( pairedConnectionType, pairedEntity, connectionType, connectedEntityRef );
}
@Override
- public ConnectionRef connectionRef(
- EntityRef connectingEntity, ConnectedEntityRef... connections ) {
+ public ConnectionRef connectionRef( EntityRef connectingEntity, ConnectedEntityRef... connections ) {
return getRelationManager( connectingEntity ).connectionRef( connections );
}
@@ -1539,7 +1480,7 @@ public class CpEntityManager implements EntityManager {
public void deleteConnection( ConnectionRef connectionRef ) throws Exception {
EntityRef sourceEntity = connectionRef.getConnectedEntity();
-
+
getRelationManager( sourceEntity ).deleteConnection( connectionRef );
}
@@ -1552,8 +1493,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results getConnectedEntities( EntityRef entityRef, String connectionType,
- String connectedEntityType, Level resultsLevel ) throws Exception {
+ public Results getConnectedEntities( EntityRef entityRef, String connectionType, String connectedEntityType,
+ Level resultsLevel ) throws Exception {
return getRelationManager( entityRef )
.getConnectedEntities( connectionType, connectedEntityType, resultsLevel );
@@ -1561,8 +1502,8 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results getConnectingEntities( EntityRef entityRef, String connectionType,
- String connectedEntityType, Level resultsLevel ) throws Exception {
+ public Results getConnectingEntities( EntityRef entityRef, String connectionType, String connectedEntityType,
+ Level resultsLevel ) throws Exception {
return getRelationManager( entityRef )
.getConnectingEntities( connectionType, connectedEntityType, resultsLevel );
@@ -1570,25 +1511,22 @@ public class CpEntityManager implements EntityManager {
@Override
- public Results getConnectingEntities( EntityRef entityRef, String connectionType,
- String entityType, Level level, int count ) throws Exception {
+ public Results getConnectingEntities( EntityRef entityRef, String connectionType, String entityType, Level level,
+ int count ) throws Exception {
- return getRelationManager( entityRef )
- .getConnectingEntities( connectionType, entityType, level, count );
+ return getRelationManager( entityRef ).getConnectingEntities( connectionType, entityType, level, count );
}
@Override
- public Results searchConnectedEntities(
- EntityRef connectingEntity, Query query ) throws Exception {
+ public Results searchConnectedEntities( EntityRef connectingEntity, Query query ) throws Exception {
return getRelationManager( connectingEntity ).searchConnectedEntities( query );
}
@Override
- public Set<String> getConnectionIndexes(
- EntityRef entity, String connectionType ) throws Exception {
+ public Set<String> getConnectionIndexes( EntityRef entity, String connectionType ) throws Exception {
return getRelationManager( entity ).getConnectionIndexes( connectionType );
}
@@ -1606,7 +1544,7 @@ public class CpEntityManager implements EntityManager {
createRole( "admin", "Administrator", 0 );
}
catch ( DuplicateUniquePropertyExistsException dupe ) {
- logger.warn("Role admin already exists ");
+ logger.warn( "Role admin already exists " );
}
catch ( Exception e ) {
logger.error( "Could not create admin role, may already exist", e );
@@ -1616,7 +1554,7 @@ public class CpEntityManager implements EntityManager {
createRole( "default", "Default", 0 );
}
catch ( DuplicateUniquePropertyExistsException dupe ) {
- logger.warn("Role default already exists ");
+ logger.warn( "Role default already exists " );
}
catch ( Exception e ) {
logger.error( "Could not create default role, may already exist", e );
@@ -1626,7 +1564,7 @@ public class CpEntityManager implements EntityManager {
createRole( "guest", "Guest", 0 );
}
catch ( DuplicateUniquePropertyExistsException dupe ) {
- logger.warn("Role guest already exists ");
+ logger.warn( "Role guest already exists " );
}
catch ( Exception e ) {
logger.error( "Could not create guest role, may already exist", e );
@@ -1636,18 +1574,17 @@ public class CpEntityManager implements EntityManager {
grantRolePermissions( "default", Arrays.asList( "get,put,post,delete:/**" ) );
}
catch ( DuplicateUniquePropertyExistsException dupe ) {
- logger.warn("Role default already has permission");
+ logger.warn( "Role default already has permission" );
}
catch ( Exception e ) {
logger.error( "Could not populate default role", e );
}
try {
- grantRolePermissions( "guest",
- Arrays.asList( "post:/users", "post:/devices", "put:/devices/*" ) );
+ grantRolePermissions( "guest", Arrays.asList( "post:/users", "post:/devices", "put:/devices/*" ) );
}
catch ( DuplicateUniquePropertyExistsException dupe ) {
- logger.warn("Role guest already has permission");
+ logger.warn( "Role guest already has permission" );
}
catch ( Exception e ) {
logger.error( "Could not populate guest role", e );
@@ -1656,25 +1593,24 @@ public class CpEntityManager implements EntityManager {
@Override
- public Entity createRole( String roleName, String roleTitle, long inactivity) throws Exception {
+ public Entity createRole( String roleName, String roleTitle, long inactivity ) throws Exception {
if ( roleName == null || roleName.isEmpty() ) {
- throw new RequiredPropertyNotFoundException( "role",roleTitle );
+ throw new RequiredPropertyNotFoundException( "role", roleTitle );
}
String propertyName = roleName;
UUID ownerId = applicationId;
- String batchRoleName = StringUtils.stringOrSubstringAfterLast( roleName.toLowerCase(), ':');
+ String batchRoleName = StringUtils.stringOrSubstringAfterLast( roleName.toLowerCase(), ':' );
return batchCreateRole( batchRoleName, roleTitle, inactivity, propertyName, ownerId, null );
}
- private Entity batchCreateRole( String roleName, String roleTitle, long inactivity,
- String propertyName, UUID ownerId, Map<String, Object> additionalProperties )
- throws Exception {
+ private Entity batchCreateRole( String roleName, String roleTitle, long inactivity, String propertyName,
+ UUID ownerId, Map<String, Object> additionalProperties ) throws Exception {
UUID timestampUuid = UUIDUtils.newTimeUUID();
- long timestamp = UUIDUtils.getUUIDLong(timestampUuid);
+ long timestamp = UUIDUtils.getUUIDLong( timestampUuid );
Map<String, Object> properties = new TreeMap<>( CASE_INSENSITIVE_ORDER );
properties.put( PROPERTY_TYPE, Role.ENTITY_TYPE );
@@ -1692,15 +1628,14 @@ public class CpEntityManager implements EntityManager {
batchCreate( null, Role.ENTITY_TYPE, null, properties, id, timestampUuid );
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
- CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLENAMES ),
- roleName, roleTitle, timestamp );
- CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLETIMES ),
- roleName, inactivity, timestamp );
- CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
- CassandraPersistenceUtils.key( ownerId, DICTIONARY_SETS ),
- Schema.DICTIONARY_ROLENAMES, null, timestamp );
+ CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
+ CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLENAMES ), roleName, roleTitle, timestamp );
+ CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
+ CassandraPersistenceUtils.key( ownerId, Schema.DICTIONARY_ROLETIMES ), roleName, inactivity,
+ timestamp );
+ CassandraPersistenceUtils.addInsertToMutator( batch, ENTITY_DICTIONARIES,
+ CassandraPersistenceUtils.key( ownerId, DICTIONARY_SETS ), Schema.DICTIONARY_ROLENAMES, null,
+ timestamp );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
@@ -1713,44 +1648,42 @@ public class CpEntityManager implements EntityManager {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
- Mutator<ByteBuffer> batch = createMutator(
- cass.getApplicationKeyspace( applicationId ), be );
- CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
- getRolePermissionsKey( roleName ), permission, ByteBuffer.allocate( 0 ), timestamp );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
+ CassandraPersistenceUtils
+ .addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ),
+ permission, ByteBuffer.allocate( 0 ), timestamp );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
- public void grantRolePermissions( String roleName, Collection<String> permissions )
- throws Exception {
+ public void grantRolePermissions( String roleName, Collection<String> permissions ) throws Exception {
roleName = roleName.toLowerCase();
long timestamp = cass.createTimestamp();
- Mutator<ByteBuffer> batch = createMutator(
- cass.getApplicationKeyspace( applicationId ), be );
+ Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
for ( String permission : permissions ) {
permission = permission.toLowerCase();
- CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
- getRolePermissionsKey( roleName ), permission,
- ByteBuffer.allocate( 0 ), timestamp );
+ CassandraPersistenceUtils
+ .addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ),
+ permission, ByteBuffer.allocate( 0 ), timestamp );
}
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
private Object getRolePermissionsKey( String roleName ) {
- return CassandraPersistenceUtils.key( SimpleRoleRef.getIdForRoleName( roleName ),
- DICTIONARY_PERMISSIONS );
+ return CassandraPersistenceUtils.key( SimpleRoleRef.getIdForRoleName( roleName ), DICTIONARY_PERMISSIONS );
}
private Object getRolePermissionsKey( UUID groupId, String roleName ) {
try {
- return CassandraPersistenceUtils.key( getGroupRoleRef( groupId, roleName ).getUuid(),
- DICTIONARY_PERMISSIONS );
- } catch ( Exception e ) {
- logger.error("Error creating role key for uuid {} and role {}", groupId, roleName );
+ return CassandraPersistenceUtils
+ .key( getGroupRoleRef( groupId, roleName ).getUuid(), DICTIONARY_PERMISSIONS );
+ }
+ catch ( Exception e ) {
+ logger.error( "Error creating role key for uuid {} and role {}", groupId, roleName );
return null;
}
}
@@ -1762,9 +1695,9 @@ public class CpEntityManager implements EntityManager {
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
- CassandraPersistenceUtils.addDeleteToMutator(
- batch, ApplicationCF.ENTITY_DICTIONARIES,
- getRolePermissionsKey( roleName ), permission, timestamp );
+ CassandraPersistenceUtils
+ .addDeleteToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ),
+ permission, timestamp );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
@@ -1772,8 +1705,8 @@ public class CpEntityManager implements EntityManager {
@Override
public Set<String> getRolePermissions( String roleName ) throws Exception {
roleName = roleName.toLowerCase();
- return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ),
- ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( roleName ) );
+ return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
+ getRolePermissionsKey( roleName ) );
}
@@ -1783,8 +1716,9 @@ public class CpEntityManager implements EntityManager {
Set<String> permissions = getRolePermissions( roleName );
Iterator<String> itrPermissions = permissions.iterator();
- while ( itrPermissions.hasNext())
+ while ( itrPermissions.hasNext() ) {
revokeRolePermission( roleName, itrPermissions.next() );
+ }
removeFromDictionary( getApplicationRef(), DICTIONARY_ROLENAMES, roleName );
removeFromDictionary( getApplicationRef(), DICTIONARY_ROLETIMES, roleName );
@@ -1797,49 +1731,43 @@ public class CpEntityManager implements EntityManager {
@Override
public Map<String, String> getGroupRoles( UUID groupId ) throws Exception {
- return cast( getDictionaryAsMap( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ),
- DICTIONARY_ROLENAMES ) );
+ return cast( getDictionaryAsMap( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ), DICTIONARY_ROLENAMES ) );
}
@Override
- public Entity createGroupRole( UUID groupId, String roleName, long inactivity )
- throws Exception {
+ public Entity createGroupRole( UUID groupId, String roleName, long inactivity ) throws Exception {
String batchRoleName = StringUtils.stringOrSubstringAfterLast( roleName.toLowerCase(), ':' );
String roleTitle = batchRoleName;
String propertyName = groupId + ":" + batchRoleName;
Map<String, Object> properties = new TreeMap<String, Object>( CASE_INSENSITIVE_ORDER );
properties.put( "group", groupId );
- Entity entity = batchCreateRole( roleName, roleTitle, inactivity, propertyName, groupId,
- properties );
+ Entity entity = batchCreateRole( roleName, roleTitle, inactivity, propertyName, groupId, properties );
getRelationManager( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ) )
.addToCollection( COLLECTION_ROLES, entity );
- logger.info("Created role {} with id {} in group {}",
- new String[] { roleName, entity.getUuid().toString(), groupId.toString() } );
+ logger.info( "Created role {} with id {} in group {}",
+ new String[] { roleName, entity.getUuid().toString(), groupId.toString() } );
return entity;
}
@Override
- public void grantGroupRolePermission( UUID groupId, String roleName, String permission )
- throws Exception {
+ public void grantGroupRolePermission( UUID groupId, String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
Mutator<ByteBuffer> batch = createMutator( cass.getApplicationKeyspace( applicationId ), be );
- CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
- getRolePermissionsKey( groupId, roleName ),
- permission, ByteBuffer.allocate( 0 ), timestamp );
+ CassandraPersistenceUtils.addInsertToMutator( batch, ApplicationCF.ENTITY_DICTIONARIES,
+ getRolePermissionsKey( groupId, roleName ), permission, ByteBuffer.allocate( 0 ), timestamp );
CassandraPersistenceUtils.batchExecute( batch, CassandraService.RETRY_COUNT );
}
@Override
- public void revokeGroupRolePermission( UUID groupId, String roleName, String permission )
- throws Exception {
+ public void revokeGroupRolePermission( UUID groupId, String roleName, String permission ) throws Exception {
roleName = roleName.toLowerCase();
permission = permission.toLowerCase();
long timestamp = cass.createTimestamp();
@@ -1853,18 +1781,16 @@ public class CpEntityManager implements EntityManager {
@Override
public Set<String> getGroupRolePermissions( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
- return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ),
- ApplicationCF.ENTITY_DICTIONARIES, getRolePermissionsKey( groupId, roleName ) );
+ return cass.getAllColumnNames( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
+ getRolePermissionsKey( groupId, roleName ) );
}
@Override
public void deleteGroupRole( UUID groupId, String roleName ) throws Exception {
roleName = roleName.toLowerCase();
- removeFromDictionary( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ),
- DICTIONARY_ROLENAMES, roleName );
- cass.deleteRow( cass.getApplicationKeyspace( applicationId ),
- ApplicationCF.ENTITY_DICTIONARIES,
+ removeFromDictionary( new SimpleEntityRef( Group.ENTITY_TYPE, groupId ), DICTIONARY_ROLENAMES, roleName );
+ cass.deleteRow( cass.getApplicationKeyspace( applicationId ), ApplicationCF.ENTITY_DICTIONARIES,
SimpleRoleRef.getIdForGroupIdAndRoleName( groupId, roleName ) );
}
@@ -18
<TRUNCATED>
[4/4] incubator-usergrid git commit: Merge branch 'two-dot-o' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o
Posted by to...@apache.org.
Merge branch 'two-dot-o' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into two-dot-o
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/43b0ba64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/43b0ba64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/43b0ba64
Branch: refs/heads/two-dot-o
Commit: 43b0ba64a05b9b70be0c90664afe5ddb1565e7b6
Parents: 0f02e82 a143ddf
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 12 17:05:11 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 12 17:05:11 2014 -0700
----------------------------------------------------------------------
.../rest/management/ExportResourceIT.java | 333 +++++++++++++------
1 file changed, 231 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-usergrid git commit: Finished migrations. Also added
utility functions for streaming data.
Posted by to...@apache.org.
Finished migrations. Also added utility functions for streaming data.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/0f02e823
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/0f02e823
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/0f02e823
Branch: refs/heads/two-dot-o
Commit: 0f02e823ad6022fb8f9e0b2105ed79f80239e6dc
Parents: f448704
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Nov 12 17:04:48 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Nov 12 17:04:48 2014 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 1301 ++++++++----------
.../usergrid/corepersistence/GuiceModule.java | 2 +
.../usergrid/corepersistence/NamingUtils.java | 5 +
.../migration/EntityTypeMappingMigration.java | 104 ++
.../migration/GraphShardVersionMigration.java | 150 +-
.../corepersistence/migration/Versions.java | 9 +-
.../rx/AllEntitiesInSystemObservable.java | 98 ++
.../rx/ApplicationObservable.java | 68 +-
.../rx/EdgesFromSourceObservable.java | 29 +-
.../corepersistence/rx/TargetIdObservable.java | 44 +-
.../usergrid/persistence/EntityManager.java | 3 +
.../EdgeMetadataSerializationProxyImpl.java | 2 +-
12 files changed, 929 insertions(+), 886 deletions(-)
----------------------------------------------------------------------