You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/02/10 19:05:19 UTC
[1/4] incubator-usergrid git commit: moving dependencies to core
persistence
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-365 dd5e6ea90 -> c38693716
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
new file mode 100644
index 0000000..2205f72
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import rx.Observable;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public interface AllEntitiesInSystemObservable {
+ /**
+ * Return an observable that emits all entities in the system.
+ *
+ * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
+ * you may be more than 1 ApplicationEntityGroup with the same application and different ids
+ */
+ public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize);
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
new file mode 100644
index 0000000..4b70462
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ApplicationObservable.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.rx;
+
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface ApplicationObservable {
+ /**
+ * Get all applicationIds as an observable
+ */
+ Observable<Id> getAllApplicationIds();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
deleted file mode 100644
index a623d00..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.scope;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import java.util.List;
-
-
-/**
- * Get the entity data. Immutable bean for fast access
- */
-public final class ApplicationEntityGroup {
- public final ApplicationScope applicationScope;
- public final List<Id> entityIds;
-
-
- public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
- this.applicationScope = applicationScope;
- this.entityIds = entityIds;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index aa1a4a8..4c38c13 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.persistence.graph;
+import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -50,7 +51,7 @@ import rx.Observable;
* @author tnine
* @see Edge
*/
-public interface GraphManager {
+public interface GraphManager extends CPManager {
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index c199312..eb49711 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -37,4 +37,6 @@ public interface GraphManagerFactory
* @param collectionScope The context to use when creating the graph manager
*/
public GraphManager createEdgeManager( ApplicationScope collectionScope );
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 001087f..f50c7cc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.graph.guice;
import org.apache.usergrid.persistence.core.guice.V1Impl;
import org.apache.usergrid.persistence.core.guice.V2Impl;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
+import org.apache.usergrid.persistence.graph.serialization.*;
import org.apache.usergrid.persistence.graph.serialization.impl.*;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -43,9 +43,6 @@ import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListenerImpl;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
@@ -87,16 +84,19 @@ public class GraphModule extends AbstractModule {
bind( TimeService.class ).to( TimeServiceImpl.class );
- // create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( GraphManager.class, GraphManagerImpl.class )
- .build( GraphManagerFactory.class ) );
+ bind( GraphManagerFactory.class ).to(GraphManagerFactoryImpl.class);
+ bind(GraphManager.class).to(GraphManagerImpl.class );
+
+ bind(EdgesFromSourceObservable.class).to(EdgesFromSourceObservableImpl.class);
+
+ bind(EdgesToTargetObservable.class).to(EdgesToTargetObservableImpl.class);
/**
* bindings for shard allocations
*/
- bind( NodeShardAllocation.class ).to( NodeShardAllocationImpl.class );
+ bind(NodeShardAllocation.class).to( NodeShardAllocationImpl.class );
bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
new file mode 100644
index 0000000..35ad41c
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesFromSourceObservable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface EdgesFromSourceObservable {
+ Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
new file mode 100644
index 0000000..cfec5bd
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesToTargetObservable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Classy class class.
+ */
+public interface EdgesToTargetObservable {
+ Observable<Edge> getEdgesToTarget(final GraphManager gm, final Id targetNode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
new file mode 100644
index 0000000..1725f2d
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesFromSourceObservableImpl.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Emits the edges that are edges from the specified source node
+ */
+public class EdgesFromSourceObservableImpl implements EdgesFromSourceObservable {
+
+ private static final Logger logger = LoggerFactory.getLogger(EdgesFromSourceObservableImpl.class);
+ public EdgesFromSourceObservableImpl(){
+
+ }
+
+ /**
+ * Get all edges from the source
+ */
+ @Override
+ public Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
+ Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
+
+ return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final String edgeType ) {
+
+ logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
+
+ return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
+ }
+ } );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
new file mode 100644
index 0000000..d84b286
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesToTargetObservableImpl.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Classy class class.
+ */
+public class EdgesToTargetObservableImpl implements EdgesToTargetObservable {
+
+ public EdgesToTargetObservableImpl(){
+
+ }
+
+ private final Logger logger = LoggerFactory.getLogger(EdgesToTargetObservableImpl.class);
+
+
+ /**
+ * Get all edges from the source
+ */
+ @Override
+ public Observable<Edge> getEdgesToTarget(final GraphManager gm, final Id targetNode) {
+ Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
+
+ return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
+ @Override
+ public Observable<Edge> call( final String edgeType ) {
+
+ logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
+
+ return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
+ SearchByEdgeType.Order.DESCENDING, null ) );
+ }
+ } );
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
new file mode 100644
index 0000000..7c04cee
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/GraphManagerFactoryImpl.java
@@ -0,0 +1,88 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.impl.GraphManagerImpl;
+import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
+import org.apache.usergrid.persistence.graph.impl.stage.NodeDeleteListener;
+import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class GraphManagerFactoryImpl implements GraphManagerFactory {
+
+ private final EdgeMetadataSerialization edgeMetadataSerialization;
+ private final EdgeSerialization edgeSerialization;
+ private final NodeSerialization nodeSerialization;
+ private final GraphFig graphFig;
+ private final EdgeDeleteListener edgeDeleteListener;
+ private final NodeDeleteListener nodeDeleteListener;
+
+ private LoadingCache<ApplicationScope, GraphManager> gmCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, GraphManager>() {
+ public GraphManager load(
+ ApplicationScope scope ) {
+ return new GraphManagerImpl(edgeMetadataSerialization,edgeSerialization,nodeSerialization,graphFig,edgeDeleteListener,nodeDeleteListener,scope);
+ }
+ } );
+
+ public GraphManagerFactoryImpl(final EdgeMetadataSerialization edgeMetadataSerialization,
+ final EdgeSerialization edgeSerialization,
+ final NodeSerialization nodeSerialization,
+ final GraphFig graphFig,
+ final EdgeDeleteListener edgeDeleteListener,
+ final NodeDeleteListener nodeDeleteListener
+ ){
+ this.edgeMetadataSerialization = edgeMetadataSerialization;
+ this.edgeSerialization = edgeSerialization;
+ this.nodeSerialization = nodeSerialization;
+ this.graphFig = graphFig;
+ this.edgeDeleteListener = edgeDeleteListener;
+ this.nodeDeleteListener = nodeDeleteListener;
+ }
+
+ @Override
+ public GraphManager createEdgeManager(ApplicationScope collectionScope) {
+ try {
+ return gmCache.get(collectionScope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ gmCache.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
index e6d8125..07a2bb9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManager.java
@@ -18,6 +18,8 @@
package org.apache.usergrid.persistence.map;
+import org.apache.usergrid.persistence.core.CPManager;
+
import java.util.UUID;
@@ -25,7 +27,7 @@ import java.util.UUID;
/**
* Generator of a map manager instance
*/
-public interface MapManager {
+public interface MapManager extends CPManager {
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
index a60cdfc..81531c9 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/MapManagerFactory.java
@@ -27,4 +27,6 @@ public interface MapManagerFactory {
* Get the map manager
*/
public MapManager createMapManager( final MapScope scope );
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
index d167151..22d37c7 100644
--- a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/guice/MapModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.map.guice;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.impl.MapManagerFactoryImpl;
import org.apache.usergrid.persistence.map.impl.MapManagerImpl;
import org.apache.usergrid.persistence.map.impl.MapSerialization;
import org.apache.usergrid.persistence.map.impl.MapSerializationImpl;
@@ -41,13 +42,8 @@ public class MapModule extends AbstractModule {
@Override
protected void configure() {
-
- // create a guice factory for getting our collection manager
- install( new FactoryModuleBuilder().implement( MapManager.class, MapManagerImpl.class )
- .build( MapManagerFactory.class ) );
-
-
- bind( MapSerialization.class).to( MapSerializationImpl.class );
+ bind(MapManagerFactory.class).to(MapManagerFactoryImpl.class);
+ bind(MapSerialization.class).to( MapSerializationImpl.class );
Multibinder<Migration> migrationBinding = Multibinder.newSetBinder( binder(), Migration.class );
migrationBinding.addBinding().to( Key.get( MapSerialization.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
new file mode 100644
index 0000000..f5b9527
--- /dev/null
+++ b/stack/corepersistence/map/src/main/java/org/apache/usergrid/persistence/map/impl/MapManagerFactoryImpl.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.map.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.netflix.astyanax.Execution;
+import org.apache.usergrid.persistence.map.MapManager;
+import org.apache.usergrid.persistence.map.MapManagerFactory;
+import org.apache.usergrid.persistence.map.MapScope;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class MapManagerFactoryImpl implements MapManagerFactory {
+ private final MapSerialization mapSerialization;
+ private LoadingCache<MapScope, MapManager> mmCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<MapScope, MapManager>() {
+ public MapManager load( MapScope scope ) {
+ return new MapManagerImpl(scope,mapSerialization);
+ }
+ } );
+
+ public MapManagerFactoryImpl(final MapSerialization mapSerialization){
+
+ this.mapSerialization = mapSerialization;
+ }
+
+ @Override
+ public MapManager createMapManager(MapScope scope) {
+ try{
+ return mmCache.get(scope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ mmCache.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index e221f0e..4222253 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index;
import java.util.UUID;
+import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.CandidateResults;
@@ -32,10 +33,10 @@ import java.util.Map;
/**
* Provides indexing of Entities within a scope.
*/
-public interface EntityIndex {
+public interface EntityIndex extends CPManager {
/**
- * This should ONLY ever be called once on application create.
+ * This should ONLY ever be called once on application create.
* Otherwise we're introducing slowness into our system
*/
public void initializeIndex();
@@ -95,12 +96,12 @@ public interface EntityIndex {
* Check health of cluster.
*/
public Health getClusterHealth();
-
+
/**
* Check health of this specific index.
*/
public Health getIndexHealth();
-
+
public void deleteIndex();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
index 78a5137..10752d1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
@@ -25,6 +25,8 @@ import com.google.inject.assistedinject.Assisted;
public interface EntityIndexFactory {
- public EntityIndex createEntityIndex(
+ public EntityIndex createEntityIndex(
@Assisted ApplicationScope appScope);
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index edc938b..45813e1 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -24,6 +24,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.impl.EntityIndexFactoryImpl;
import org.apache.usergrid.persistence.index.impl.EsEntityIndexImpl;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -36,9 +37,8 @@ public class IndexModule extends AbstractModule {
// install our configuration
install (new GuicyFigModule( IndexFig.class ));
- install( new FactoryModuleBuilder()
- .implement( EntityIndex.class, EsEntityIndexImpl.class )
- .build( EntityIndexFactory.class ) );
+ bind(EntityIndexFactory.class).to(EntityIndexFactoryImpl.class);
+ bind(EntityIndex.class).to(EsEntityIndexImpl.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
new file mode 100644
index 0000000..e45c6c5
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EntityIndexFactoryImpl.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.IndexFig;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityIndexFactoryImpl implements EntityIndexFactory{
+
+ private final IndexFig config;
+ private final EsProvider provider;
+ private final EsIndexCache indexCache;
+
+ private LoadingCache<ApplicationScope, EntityIndex> eiCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() {
+ public EntityIndex load( ApplicationScope scope ) {
+ return new EsEntityIndexImpl(scope,config,provider,indexCache);
+ }
+ } );
+ public EntityIndexFactoryImpl(final IndexFig config, final EsProvider provider, final EsIndexCache indexCache){
+ this.config = config;
+ this.provider = provider;
+ this.indexCache = indexCache;
+ }
+
+ @Override
+ public EntityIndex createEntityIndex(final ApplicationScope appScope) {
+ try{
+ return eiCache.get(appScope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public void invalidate() {
+ eiCache.invalidateAll();
+ }
+}
[3/4] incubator-usergrid git commit: moving dependencies to core
persistence
Posted by sf...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index be45ab5..8184f09 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -21,15 +21,19 @@ package org.apache.usergrid.persistence.core.guice;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
/**
* A simple migration that sets the version to max. This way our integration tests always test the latest code
*/
public class MaxMigrationVersion implements DataMigration {
+
@Override
- public void migrate( final ProgressObserver observer ) throws Throwable {
+ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer ) throws Throwable {
//no op, just needs to run to be set
+ return Observable.empty();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index 3ffcd89..0a80cb7 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -25,11 +25,14 @@ package org.apache.usergrid.persistence.core.migration.data;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.usergrid.persistence.core.migration.schema.MigrationException;
+import rx.Observable;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -46,16 +49,20 @@ import static org.mockito.Mockito.when;
*/
public class DataMigrationManagerImplTest {
+ AllEntitiesInSystemObservable allEntitiesInSystemObservable = new AllEntitiesInSystemObservable() {
+ @Override
+ public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(int bufferSize) {
+ return null;
+ }
+ };
@Test
public void noMigrations() throws MigrationException {
final MigrationInfoSerialization serialization = mock( MigrationInfoSerialization.class );
-
Set<DataMigration> emptyMigration = new HashSet<>();
-
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, emptyMigration, allEntitiesInSystemObservable );
migrationManager.migrate();
@@ -82,13 +89,13 @@ public class DataMigrationManagerImplTest {
migrations.add( v2 );
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
migrationManager.migrate();
- verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
- verify( v2 ).migrate( any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
+ verify( v2 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 2 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -115,8 +122,8 @@ public class DataMigrationManagerImplTest {
when( v1.getVersion() ).thenReturn( 1 );
//throw an exception
- doThrow( new RuntimeException( "Something bad happened" ) ).when( v1 ).migrate(
- any( DataMigration.ProgressObserver.class ) );
+ doThrow( new RuntimeException( "Something bad happened" ) ).when( v1 ).migrate(any(ApplicationEntityGroup.class),
+ any(DataMigration.ProgressObserver.class) );
final DataMigration v2 = mock( DataMigration.class );
when( v2.getVersion() ).thenReturn( 2 );
@@ -127,15 +134,15 @@ public class DataMigrationManagerImplTest {
migrations.add( v2 );
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
migrationManager.migrate();
- verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
//verify we don't run migration
- verify( v2, never() ).migrate( any( DataMigration.ProgressObserver.class ) );
+ verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
@@ -177,7 +184,7 @@ public class DataMigrationManagerImplTest {
progressObserver.failed( returnedCode, reason );
return null;
}
- } ).when( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+ } ).when( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
@@ -190,15 +197,15 @@ public class DataMigrationManagerImplTest {
migrations.add( v2 );
- DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations );
+ DataMigrationManagerImpl migrationManager = new DataMigrationManagerImpl( serialization, migrations,allEntitiesInSystemObservable );
migrationManager.migrate();
- verify( v1 ).migrate( any( DataMigration.ProgressObserver.class ) );
+ verify( v1 ).migrate(any(ApplicationEntityGroup.class), any( DataMigration.ProgressObserver.class ) );
//verify we don't run migration
- verify( v2, never() ).migrate( any( DataMigration.ProgressObserver.class ) );
+ verify( v2, never() ).migrate( any(ApplicationEntityGroup.class),any( DataMigration.ProgressObserver.class ) );
//verify we set the running status
verify( serialization, times( 1 ) ).setStatusCode( DataMigrationManagerImpl.StatusCode.RUNNING.status );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f50c7cc..4786a42 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.guice;
import org.apache.usergrid.persistence.core.guice.V1Impl;
import org.apache.usergrid.persistence.core.guice.V2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.serialization.*;
import org.apache.usergrid.persistence.graph.serialization.impl.*;
import org.safehaus.guicyfig.GuicyFigModule;
@@ -90,6 +91,8 @@ public class GraphModule extends AbstractModule {
bind(EdgesFromSourceObservable.class).to(EdgesFromSourceObservableImpl.class);
+ bind(TargetIdObservable.class).to(TargetIdObservableImpl.class);
+
bind(EdgesToTargetObservable.class).to(EdgesToTargetObservableImpl.class);
/**
@@ -112,6 +115,9 @@ public class GraphModule extends AbstractModule {
bind( EdgeMetaRepair.class ).to( EdgeMetaRepairImpl.class );
bind( EdgeDeleteRepair.class ).to( EdgeDeleteRepairImpl.class );
+ Multibinder<DataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), DataMigration.class );
+ dataMigrationMultibinder.addBinding().to( EdgeMetadataSerializationProxyImpl.class );
/**
* Add our listeners
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
index 6333f87..ed46e43 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgeMigrationStrategy.java
@@ -27,7 +27,7 @@ import java.util.List;
/**
* Classy class class.
*/
-public interface EdgeMigrationStrategy extends MigrationStrategy<EdgeMetadataSerialization,Edge> {
+public interface EdgeMigrationStrategy extends MigrationStrategy<EdgeMetadataSerialization> {
public static final int MIGRATION_VERSION = 2;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
new file mode 100644
index 0000000..31164df
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/TargetIdObservable.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization;
+
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.model.entity.Id;
+import rx.Observable;
+
+/**
+ * Emits the id of all nodes that are target nodes from the given source node
+ */
+public interface TargetIdObservable {
+ /**
+ * Get all nodes that are target nodes from the sourceNode
+ * @param gm
+ * @param sourceNode
+ *
+ * @return
+ */
+ Observable<Id> getTargetNodes(GraphManager gm, Id sourceNode);
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
index 60b6a5f..6059569 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationProxyImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
@@ -61,6 +62,8 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
private final DataMigrationManager dataMigrationManager;
private final Keyspace keyspace;
+ private final GraphManagerFactory graphManagerFactory;
+ private final EdgesFromSourceObservable edgesFromSourceObservable;
private final EdgeMetadataSerialization previous;
private final EdgeMetadataSerialization current;
@@ -70,10 +73,14 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
*/
@Inject
public EdgeMetadataSerializationProxyImpl(final DataMigrationManager dataMigrationManager, final Keyspace keyspace,
+ final GraphManagerFactory graphManagerFactory,
+ final EdgesFromSourceObservable edgesFromSourceObservable,
@V1Impl final EdgeMetadataSerialization previous,
@V2Impl final EdgeMetadataSerialization current) {
this.dataMigrationManager = dataMigrationManager;
this.keyspace = keyspace;
+ this.graphManagerFactory = graphManagerFactory;
+ this.edgesFromSourceObservable = edgesFromSourceObservable;
this.previous = previous;
this.current = current;
}
@@ -291,14 +298,16 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
}
@Override
- public int getMigrationVersion() {
+ public int getVersion() {
return EdgeMigrationStrategy.MIGRATION_VERSION;
}
@Override
- public Observable<Long> executeMigration(final Observable<Edge> edgesFromSource ,final ApplicationEntityGroup applicationEntityGroup,
- final DataMigration.ProgressObserver observer,
- Func1<Id, ? extends ApplicationScope> getScopeFromEntityId) {
+ public Observable<Long> migrate(final ApplicationEntityGroup applicationEntityGroup,
+ final DataMigration.ProgressObserver observer) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
+ final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
+
final AtomicLong counter = new AtomicLong();
rx.Observable o =
Observable
@@ -344,7 +353,7 @@ public class EdgeMetadataSerializationProxyImpl implements EdgeMetadataSerializa
final long newCount =
counter.addAndGet(edges.size());
- observer.update(getMigrationVersion(), String.format(
+ observer.update(getVersion(), String.format(
"Currently running. Rewritten %d edge types",
newCount));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
new file mode 100644
index 0000000..bf0a03e
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -0,0 +1,70 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl;
+
+
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import rx.Observable;
+import rx.functions.Func1;
+
+/**
+ * Emits the id of all nodes that are target nodes from the given source node
+ */
+public class TargetIdObservableImpl implements TargetIdObservable {
+
+ private static final Logger logger = LoggerFactory.getLogger(TargetIdObservableImpl.class);
+ private final EdgesFromSourceObservable edgesFromSourceObservable;
+
+ public TargetIdObservableImpl(final EdgesFromSourceObservable edgesFromSourceObservable){
+ this.edgesFromSourceObservable = edgesFromSourceObservable;
+ }
+
+ /**
+ * Get all nodes that are target nodes from the sourceNode
+ * @param gm
+ * @param sourceNode
+ *
+ * @return
+ */
+ @Override
+ public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
+
+ //only search edge types that start with collections
+ return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
+
+
+ @Override
+ public Id call( final Edge edge ) {
+ final Id targetNode = edge.getTargetNode();
+
+ logger.debug( "Emitting targetId of {}", edge );
+
+
+ return targetNode;
+ }
+ } );
+ }
+}
[2/4] incubator-usergrid git commit: moving dependencies to core
persistence
Posted by sf...@apache.org.
moving dependencies to core persistence
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7daca75e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7daca75e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7daca75e
Branch: refs/heads/USERGRID-365
Commit: 7daca75ef9954b6da4d0a549755cf2afe85834dc
Parents: dd5e6ea
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Feb 9 15:59:30 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Feb 9 15:59:30 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 21 +--
.../corepersistence/CpManagerCache.java | 71 ++--------
.../usergrid/corepersistence/GuiceModule.java | 13 +-
.../usergrid/corepersistence/ManagerCache.java | 12 +-
.../migration/EntityDataMigration.java | 28 ++--
.../migration/EntityTypeMappingMigration.java | 14 +-
.../migration/GraphShardVersionMigration.java | 50 +++----
.../rx/AllEntitiesInSystemObservable.java | 90 ------------
.../rx/ApplicationObservable.java | 128 -----------------
.../rx/EdgesFromSourceObservable.java | 63 ---------
.../corepersistence/rx/TargetIdObservable.java | 9 +-
.../impl/AllEntitiesInSystemObservableImpl.java | 95 +++++++++++++
.../rx/impl/ApplicationObservableImpl.java | 135 ++++++++++++++++++
.../migration/EntityDataMigrationIT.java | 8 +-
.../migration/EntityTypeMappingMigrationIT.java | 4 +-
.../migration/GraphShardVersionMigrationIT.java | 6 +-
.../rx/AllEntitiesInSystemObservableIT.java | 3 +-
.../rx/ApplicationObservableTestIT.java | 5 +-
.../collection/EntityCollectionManager.java | 8 +-
.../EntityCollectionManagerFactory.java | 22 +--
.../collection/guice/CollectionModule.java | 8 +-
.../EntityCollectionManagerFactoryImpl.java | 137 +++++++++++++++++++
.../usergrid/persistence/core/CPManager.java | 28 ++++
.../core/entity/ApplicationEntityGroup.java | 40 ++++++
.../migration/schema/MigrationStrategy.java | 2 +-
.../core/rx/AllEntitiesInSystemObservable.java | 40 ++++++
.../core/rx/ApplicationObservable.java | 33 +++++
.../core/scope/ApplicationEntityGroup.java | 39 ------
.../persistence/graph/GraphManager.java | 3 +-
.../persistence/graph/GraphManagerFactory.java | 2 +
.../persistence/graph/guice/GraphModule.java | 16 +--
.../EdgesFromSourceObservable.java | 32 +++++
.../serialization/EdgesToTargetObservable.java | 32 +++++
.../impl/EdgesFromSourceObservableImpl.java | 62 +++++++++
.../impl/EdgesToTargetObservableImpl.java | 65 +++++++++
.../impl/GraphManagerFactoryImpl.java | 88 ++++++++++++
.../usergrid/persistence/map/MapManager.java | 4 +-
.../persistence/map/MapManagerFactory.java | 2 +
.../persistence/map/guice/MapModule.java | 10 +-
.../map/impl/MapManagerFactoryImpl.java | 62 +++++++++
.../usergrid/persistence/index/EntityIndex.java | 9 +-
.../persistence/index/EntityIndexFactory.java | 4 +-
.../persistence/index/guice/IndexModule.java | 6 +-
.../index/impl/EntityIndexFactoryImpl.java | 67 +++++++++
44 files changed, 1077 insertions(+), 499 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 93eb15e..5a4fcc1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.AbstractEntity;
import org.apache.usergrid.persistence.Entity;
@@ -61,14 +62,10 @@ import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.exceptions.ApplicationAlreadyExistsException;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.query.Query;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.utils.UUIDUtils;
@@ -110,8 +107,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
});
-
private ManagerCache managerCache;
+
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+
private DataMigrationManager dataMigrationManager;
CassandraService cass;
@@ -161,6 +160,12 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
return managerCache;
}
+ private AllEntitiesInSystemObservable getAllEntitiesObservable(){
+ if(allEntitiesInSystemObservable==null)
+ allEntitiesInSystemObservable = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+ return allEntitiesInSystemObservable;
+ }
+
@Override
public String getImplementationDescription() throws Exception {
@@ -426,8 +431,8 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
if ( e == null ) {
logger.warn("Applicaion {} in index but not found in collections", targetId );
continue;
- }
-
+ }
+
appMap.put(
(String)e.getField( PROPERTY_NAME ).getValue(),
(UUID)e.getField( "applicationUuid" ).getValue());
@@ -576,7 +581,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
public long performEntityCount() {
//TODO, this really needs to be a task that writes this data somewhere since this will get
//progressively slower as the system expands
- return AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).longCount().toBlocking().last();
+ return getAllEntitiesObservable().getAllEntitiesInSystem(1000).longCount().toBlocking().last();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
index e29e5c2..8292170 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpManagerCache.java
@@ -18,6 +18,7 @@ package org.apache.usergrid.corepersistence;
import java.util.concurrent.ExecutionException;
+import com.amazonaws.services.cloudfront.model.InvalidArgumentException;
import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -45,35 +46,8 @@ public class CpManagerCache implements ManagerCache {
// TODO: consider making these cache sizes and timeouts configurable
- private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
- CacheBuilder.newBuilder().maximumSize( 1000 )
- .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
- public EntityCollectionManager load( CollectionScope scope ) {
- return ecmf.createCollectionManager( scope );
- }
- } );
-
- private LoadingCache<ApplicationScope, EntityIndex> eiCache =
- CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, EntityIndex>() {
- public EntityIndex load( ApplicationScope scope ) {
- return eif.createEntityIndex( scope );
- }
- } );
-
- private LoadingCache<ApplicationScope, GraphManager> gmCache =
- CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<ApplicationScope, GraphManager>() {
- public GraphManager load(
- ApplicationScope scope ) {
- return gmf.createEdgeManager( scope );
- }
- } );
-
- private LoadingCache<MapScope, MapManager> mmCache =
- CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<MapScope, MapManager>() {
- public MapManager load( MapScope scope ) {
- return mmf.createMapManager( scope );
- }
- } );
+
+
@Inject
@@ -89,53 +63,34 @@ public class CpManagerCache implements ManagerCache {
@Override
public EntityCollectionManager getEntityCollectionManager( CollectionScope scope ) {
- try {
- return ecmCache.get( scope );
- }
- catch ( ExecutionException ex ) {
- throw new RuntimeException( "Error getting manager", ex );
- }
+ return ecmf.createCollectionManager(scope);
}
@Override
public EntityIndex getEntityIndex( ApplicationScope appScope ) {
- try {
- return eiCache.get( appScope );
- }
- catch ( ExecutionException ex ) {
- throw new RuntimeException( "Error getting manager", ex );
- }
+
+ return eif.createEntityIndex( appScope );
+
}
@Override
public GraphManager getGraphManager( ApplicationScope appScope ) {
- try {
- return gmCache.get( appScope );
- }
- catch ( ExecutionException ex ) {
- throw new RuntimeException( "Error getting manager", ex );
- }
+ return gmf.createEdgeManager(appScope);
}
@Override
public MapManager getMapManager( MapScope mapScope ) {
- try {
- return mmCache.get( mapScope );
- }
- catch ( ExecutionException ex ) {
- throw new RuntimeException( "Error getting manager", ex );
- }
+ return mmf.createMapManager(mapScope);
}
-
@Override
public void invalidate() {
- ecmCache.invalidateAll();
- eiCache.invalidateAll();
- gmCache.invalidateAll();
- mmCache.invalidateAll();
+ ecmf.invalidate();
+ eif.invalidate();
+ gmf.invalidate();
+ mmf.invalidate();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index ed6cba2..0143a89 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -24,6 +24,10 @@ import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
+import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.ApplicationObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
@@ -69,18 +73,21 @@ public class GuiceModule extends AbstractModule {
install(new QueueModule());
bind(ManagerCache.class).to( CpManagerCache.class );
+ bind(AllEntitiesInSystemObservable.class).to( AllEntitiesInSystemObservableImpl.class );
+ bind(ApplicationObservable.class).to( ApplicationObservableImpl.class );
- Multibinder<DataMigration> dataMigrationMultibinder =
+
+ Multibinder<DataMigration> dataMigrationMultibinder =
Multibinder.newSetBinder( binder(), DataMigration.class );
dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
- Multibinder<EntityDeleted> entityBinder =
+ Multibinder<EntityDeleted> entityBinder =
Multibinder.newSetBinder(binder(), EntityDeleted.class);
entityBinder.addBinding().to(EntityDeletedHandler.class);
- Multibinder<EntityVersionDeleted> versionBinder =
+ Multibinder<EntityVersionDeleted> versionBinder =
Multibinder.newSetBinder(binder(), EntityVersionDeleted.class);
versionBinder.addBinding().to(EntityVersionDeletedHandler.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
index c1b7b95..921fde5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/ManagerCache.java
@@ -33,6 +33,7 @@ import org.apache.usergrid.persistence.map.MapScope;
* The cache of the manager
*/
public interface ManagerCache {
+
/**
* Get the entity collection manager for the specified scope
* @param scope
@@ -42,27 +43,30 @@ public interface ManagerCache {
/**
* Get the entity index for the specified app scope
+ *
* @param appScope
* @return
*/
- EntityIndex getEntityIndex( ApplicationScope appScope );
+ EntityIndex getEntityIndex(ApplicationScope appScope);
/**
* Get the graph manager for the graph scope
+ *
* @param appScope
* @return
*/
- GraphManager getGraphManager( ApplicationScope appScope );
+ GraphManager getGraphManager(ApplicationScope appScope);
/**
* Get the map manager for the map scope
+ *
* @param mapScope
* @return
*/
- MapManager getMapManager( MapScope mapScope );
+ MapManager getMapManager(MapScope mapScope);
/**
- * Invalidate all cache entries
+ * invalidate the cache
*/
void invalidate();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
index 22bc472..861c343 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
@@ -20,21 +20,14 @@
package org.apache.usergrid.corepersistence.migration;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import com.google.inject.Inject;
-import org.apache.usergrid.persistence.model.entity.Id;
import rx.functions.Action1;
-import rx.functions.Func1;
/**
@@ -43,24 +36,29 @@ import rx.functions.Func1;
public class EntityDataMigration implements DataMigration {
- private final ManagerCache managerCache;
private final MvccEntityMigrationStrategy migrationStrategy;
+ private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Inject
public EntityDataMigration( final MvccEntityMigrationStrategy migrationStrategy,
- final ManagerCache managerCache ) {
+ final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
this.migrationStrategy = migrationStrategy;
- this.managerCache = managerCache;
+ this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
}
@Override
public void migrate( final ProgressObserver observer ) throws Throwable {
- AllEntitiesInSystemObservable
- .getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable
+ .getAllEntitiesInSystem(1000)
.doOnNext(new Action1<ApplicationEntityGroup>() {
@Override
public void call( final ApplicationEntityGroup applicationEntityGroup ) {
- migrationStrategy.executeMigration( null,applicationEntityGroup, observer, CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope));
+ migrationStrategy.executeMigration(
+ null,
+ applicationEntityGroup,
+ observer,
+ CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
+ );
}
})
.toBlocking().last();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index a7655ad..08001d1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -23,14 +23,13 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -46,12 +45,13 @@ import rx.functions.Action1;
public class EntityTypeMappingMigration implements DataMigration {
private final ManagerCache managerCache;
-
+ private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Inject
- public EntityTypeMappingMigration( final ManagerCache managerCache) {
+ public EntityTypeMappingMigration( final ManagerCache managerCache, final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
this.managerCache = managerCache;
+ this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
}
@@ -60,7 +60,7 @@ public class EntityTypeMappingMigration implements DataMigration {
final AtomicLong atomicLong = new AtomicLong();
- AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000 )
+ allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
index 0a1fbd2..41b390b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
@@ -22,32 +22,20 @@
package org.apache.usergrid.corepersistence.migration;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.EdgesFromSourceObservable;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.functions.Action1;
import rx.functions.Func1;
@@ -57,34 +45,46 @@ import rx.functions.Func1;
public class GraphShardVersionMigration implements DataMigration {
- private final ManagerCache managerCache;
+ private final GraphManagerFactory graphManagerFactory;
+ private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
+ private final EdgesFromSourceObservable edgesFromSourceObservable;
private final EdgeMigrationStrategy migrationStrategy;
@Inject
public GraphShardVersionMigration( final EdgeMigrationStrategy migrationStrategy,
- final ManagerCache managerCache ) {
+ final GraphManagerFactory graphManagerFactory,
+ final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
+ final EdgesFromSourceObservable edgesFromSourceObservable
+ ) {
this.migrationStrategy = migrationStrategy;
- this.managerCache = managerCache;
+ this.graphManagerFactory = graphManagerFactory;
+ this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
+ this.edgesFromSourceObservable = edgesFromSourceObservable;
}
@Override
public void migrate( final ProgressObserver observer ) throws Throwable {
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
.flatMap(
new Func1<ApplicationEntityGroup, Observable<Long>>() {
@Override
public Observable<Long> call(
final ApplicationEntityGroup applicationEntityGroup) {
- final GraphManager gm = managerCache.getGraphManager(applicationEntityGroup.applicationScope);
- Observable<Edge> edgesFromSource = EdgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
+ Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
//emit a stream of all ids from this group
- return migrationStrategy.executeMigration(edgesFromSource,applicationEntityGroup, observer, CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope));
+ return migrationStrategy.executeMigration(
+ edgesFromSource,
+ applicationEntityGroup,
+ observer,
+ CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
+ );
}
- } )
- .toBlocking().lastOrDefault( null );
+ })
+ .toBlocking().lastOrDefault(null);
;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
deleted file mode 100644
index d3add56..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservable.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * An observable that will emit every entity Id stored in our entire system across all apps.
- * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
- * source node
- */
-public class AllEntitiesInSystemObservable {
-
-
- /**
- * Return an observable that emits all entities in the system.
- * @param managerCache the managerCache to use
- * @param bufferSize The amount of entityIds to buffer into each ApplicationEntityGroup. Note that if we exceed the buffer size
- * you may be more than 1 ApplicationEntityGroup with the same application and different ids
- */
- public static Observable<ApplicationEntityGroup> getAllEntitiesInSystem( final ManagerCache managerCache, final int bufferSize) {
- //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
- return ApplicationObservable.getAllApplicationIds( managerCache )
-
- .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
- @Override
- public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
-
- //set up our application scope and graph manager
- final ApplicationScope applicationScope = new ApplicationScopeImpl(
- applicationId );
-
-
- final GraphManager gm =
- managerCache.getGraphManager( applicationScope );
-
-
- //load all nodes that are targets of our application node. I.E.
- // entities that have been saved
- final Observable<Id> entityNodes =
- TargetIdObservable.getTargetNodes(gm, applicationId );
-
- //create our application node to emit since it's an entity as well
- final Observable<Id> applicationNode = Observable.just( applicationId );
-
- //merge both the specified application node and the entity node
- // so they all get used
- return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
- .map( new Func1<List<Id>, ApplicationEntityGroup>() {
- @Override
- public ApplicationEntityGroup call( final List<Id> id ) {
- return new ApplicationEntityGroup( applicationScope, id );
- }
- } );
- }
- } );
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
deleted file mode 100644
index 6019bca..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/ApplicationObservable.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import java.util.Arrays;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.CollectionScope;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
-
-/**
- * An observable that will emit all application stored in the system.
- */
-public class ApplicationObservable {
-
- private static final Logger logger = LoggerFactory.getLogger( ApplicationObservable.class );
-
- /**
- * Get all applicationIds as an observable
- */
- public static Observable<Id> getAllApplicationIds( final ManagerCache managerCache ) {
-
- //emit our 3 hard coded applications that are used the manage the system first.
- //this way consumers can perform whatever work they need to on the root system first
-
-
- final Observable<Id> systemIds = Observable.from( Arrays
- .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
- generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
-
-
- final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
-
- final CollectionScope appInfoCollectionScope =
- new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
-
- final EntityCollectionManager collectionManager =
- managerCache.getEntityCollectionManager( appInfoCollectionScope );
-
-
- final GraphManager gm = managerCache.getGraphManager( appScope );
-
-
- String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
-
- Id rootAppId = appScope.getApplication();
-
-
- //we have app infos. For each of these app infos, we have to load the application itself
- Observable<Id> appIds = gm.loadEdgesFromSource(
- new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
- @Override
- public Observable<Id> call( final Edge edge ) {
- //get the app info and load it
- final Id appInfo = edge.getTargetNode();
-
- return collectionManager.load( appInfo )
- //filter out null entities
- .filter( new Func1<Entity, Boolean>() {
- @Override
- public Boolean call( final Entity entity ) {
- if ( entity == null ) {
- logger.warn( "Encountered a null application info for id {}", appInfo );
- return false;
- }
-
- return true;
- }
- } )
- //get the id from the entity
- .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
-
-
- @Override
- public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
-
- final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
-
- return CpNamingUtils.generateApplicationId( uuid );
- }
- } );
- }
- } );
-
- return Observable.merge( systemIds, appIds );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
deleted file mode 100644
index d3e2ee5..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the edges that are edges from the specified source node
- */
-public class EdgesFromSourceObservable {
-
- private static final Logger logger = LoggerFactory.getLogger( EdgesFromSourceObservable.class );
-
-
- /**
- * Get all edges from the source
- */
- public static Observable<Edge> edgesFromSource( final GraphManager gm, final Id sourceNode){
- Observable<String> edgeTypes = gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
-
- return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
-
- return gm.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, null ) );
- }
- } );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
index c4b6526..96b638b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.rx;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,7 +38,11 @@ import rx.functions.Func1;
public class TargetIdObservable {
private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class );
+ private final EdgesFromSourceObservable edgesFromSourceObservable;
+ public TargetIdObservable(final EdgesFromSourceObservable edgesFromSourceObservable){
+ this.edgesFromSourceObservable = edgesFromSourceObservable;
+ }
/**
* Get all nodes that are target nodes from the sourceNode
@@ -46,10 +51,10 @@ public class TargetIdObservable {
*
* @return
*/
- public static Observable<Id> getTargetNodes( final GraphManager gm, final Id sourceNode) {
+ public Observable<Id> getTargetNodes( final GraphManager gm, final Id sourceNode) {
//only search edge types that start with collections
- return EdgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
+ return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
new file mode 100644
index 0000000..4218107
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.List;
+
+import org.apache.usergrid.corepersistence.rx.TargetIdObservable;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+
+/**
+ * An observable that will emit every entity Id stored in our entire system across all apps.
+ * Note that this only walks each application applicationId graph, and emits edges from the applicationId and it's edges as the s
+ * source node
+ */
+public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObservable {
+
+ private final ApplicationObservable applicationObservable;
+ private final GraphManagerFactory graphManagerFactory;
+ private final TargetIdObservable targetIdObservable;
+
+ public AllEntitiesInSystemObservableImpl(ApplicationObservable applicationObservable, GraphManagerFactory graphManagerFactory, TargetIdObservable targetIdObservable){
+
+ this.applicationObservable = applicationObservable;
+ this.graphManagerFactory = graphManagerFactory;
+ this.targetIdObservable = targetIdObservable;
+ }
+
+
+ public Observable<ApplicationEntityGroup> getAllEntitiesInSystem(final int bufferSize) {
+ //traverse all nodes in the graph, load all source edges from them, then re-save the meta data
+ return applicationObservable.getAllApplicationIds( )
+
+ .flatMap( new Func1<Id, Observable<ApplicationEntityGroup>>() {
+ @Override
+ public Observable<ApplicationEntityGroup> call( final Id applicationId ) {
+
+ //set up our application scope and graph manager
+ final ApplicationScope applicationScope = new ApplicationScopeImpl(
+ applicationId );
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+
+ //load all nodes that are targets of our application node. I.E.
+ // entities that have been saved
+ final Observable<Id> entityNodes =
+ targetIdObservable.getTargetNodes(gm, applicationId);
+
+ //create our application node to emit since it's an entity as well
+ final Observable<Id> applicationNode = Observable.just( applicationId );
+
+ //merge both the specified application node and the entity node
+ // so they all get used
+ return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
+ .map( new Func1<List<Id>, ApplicationEntityGroup>() {
+ @Override
+ public ApplicationEntityGroup call( final List<Id> id ) {
+ return new ApplicationEntityGroup( applicationScope, id );
+ }
+ } );
+ }
+ } );
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
new file mode 100644
index 0000000..10a043d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/ApplicationObservableImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.rx.impl;
+
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
+import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.collection.CollectionScope;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import rx.Observable;
+import rx.functions.Func1;
+
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateApplicationId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
+
+
+/**
+ * An observable that will emit all application stored in the system.
+ */
+public class ApplicationObservableImpl implements ApplicationObservable {
+
+ private static final Logger logger = LoggerFactory.getLogger( ApplicationObservableImpl.class );
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final GraphManagerFactory graphManagerFactory;
+
+ public ApplicationObservableImpl(EntityCollectionManagerFactory entityCollectionManagerFactory, GraphManagerFactory graphManagerFactory){
+
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.graphManagerFactory = graphManagerFactory;
+ }
+
+
+ @Override
+ public Observable<Id> getAllApplicationIds() {
+
+ //emit our 3 hard coded applications that are used the manage the system first.
+ //this way consumers can perform whatever work they need to on the root system first
+ final Observable<Id> systemIds = Observable.from( Arrays
+ .asList( generateApplicationId( CpNamingUtils.DEFAULT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.MANAGEMENT_APPLICATION_ID ),
+ generateApplicationId( CpNamingUtils.SYSTEM_APP_ID ) ) );
+
+
+ final ApplicationScope appScope = getApplicationScope( CpNamingUtils.SYSTEM_APP_ID );
+
+ final CollectionScope appInfoCollectionScope =
+ new CollectionScopeImpl( appScope.getApplication(), appScope.getApplication(),
+ CpNamingUtils.getCollectionScopeNameFromCollectionName( CpNamingUtils.APPINFOS ) );
+
+ final EntityCollectionManager collectionManager =
+ entityCollectionManagerFactory.createCollectionManager( appInfoCollectionScope );
+
+
+ final GraphManager gm = graphManagerFactory.createEdgeManager(appScope);
+
+
+ String edgeType = CpNamingUtils.getEdgeTypeFromCollectionName( CpNamingUtils.APPINFOS );
+
+ Id rootAppId = appScope.getApplication();
+
+
+ //we have app infos. For each of these app infos, we have to load the application itself
+ Observable<Id> appIds = gm.loadEdgesFromSource(
+ new SimpleSearchByEdgeType( rootAppId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+ null ) ).flatMap( new Func1<Edge, Observable<Id>>() {
+ @Override
+ public Observable<Id> call( final Edge edge ) {
+ //get the app info and load it
+ final Id appInfo = edge.getTargetNode();
+
+ return collectionManager.load( appInfo )
+ //filter out null entities
+ .filter( new Func1<Entity, Boolean>() {
+ @Override
+ public Boolean call( final Entity entity ) {
+ if ( entity == null ) {
+ logger.warn( "Encountered a null application info for id {}", appInfo );
+ return false;
+ }
+
+ return true;
+ }
+ } )
+ //get the id from the entity
+ .map( new Func1<org.apache.usergrid.persistence.model.entity.Entity, Id>() {
+
+
+ @Override
+ public Id call( final org.apache.usergrid.persistence.model.entity.Entity entity ) {
+
+ final UUID uuid = ( UUID ) entity.getField( "applicationUuid" ).getValue();
+
+ return CpNamingUtils.generateApplicationId( uuid );
+ }
+ } );
+ }
+ } );
+
+ return Observable.merge( systemIds, appIds );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index 50b14ec..f58696e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -34,7 +34,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
@@ -132,7 +132,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types. Assumes we're
//using a test system, and it's not a huge amount of data, otherwise we'll overflow.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -184,7 +184,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//now visit all entities in the system again, load them from v2, and ensure they're the same
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -220,7 +220,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//now visit all entities in the system again, and load them from the EM,
// ensure we see everything we did in the v1 traversal
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 )
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index ee88cd4..ee95d58 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.Entity;
import org.apache.usergrid.persistence.EntityManager;
@@ -126,7 +126,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
entityTypeMappingMigration.migrate( progressObserver );
- AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000)
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 6f87623..45f41aa 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -32,7 +32,7 @@ import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.EntityWriteHelper;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.persistence.EntityManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManagerImpl;
@@ -115,7 +115,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000)
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -172,7 +172,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
- AllEntitiesInSystemObservable.getAllEntitiesInSystem(managerCache, 1000)
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index fcb9813..b246e04 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.rx;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Test;
import org.slf4j.Logger;
@@ -93,7 +94,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- AllEntitiesInSystemObservable.getAllEntitiesInSystem( managerCache, 1000 ).doOnNext( new Action1<ApplicationEntityGroup>() {
+ AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call( final ApplicationEntityGroup entity ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index f8f3c50..daeca87 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,15 +24,14 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.corepersistence.CpSetup;
import org.apache.usergrid.corepersistence.ManagerCache;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.entities.Application;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -66,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
//clean up our wiring
ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
- Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds( managerCache );
+ Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds(managerCache);
appObservable.doOnNext( new Action1<Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
index 4de18fe..90cade0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManager.java
@@ -20,6 +20,8 @@ package org.apache.usergrid.persistence.collection;
import java.util.Collection;
+
+import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,7 +32,7 @@ import rx.Observable;
/**
* The operations for performing changes on an entity
*/
-public interface EntityCollectionManager {
+public interface EntityCollectionManager extends CPManager {
/**
* Write the entity in the entity collection. This is an entire entity, it's contents will
@@ -68,12 +70,12 @@ public interface EntityCollectionManager {
/**
* Takes the change and reloads an entity with all changes applied in this entity applied.
- * The resulting entity from calling load will be the previous version of this entity plus
+ * The resulting entity from calling load will be the previous version of this entity plus
* the entity in this object applied to it.
*/
public Observable<Entity> update ( Entity entity );
- /**
+ /**
* Returns health of entity data store.
*/
public Health getHealth();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
index ef579f8..9140913 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/EntityCollectionManagerFactory.java
@@ -20,40 +20,42 @@ package org.apache.usergrid.persistence.collection;
/**
- * A basic factory that creates a collection manager with the given context.
+ * A basic factory that creates a collection manager with the given context.
* Each instance of this factory should exist for a Single ApplicationScope
*/
public interface EntityCollectionManagerFactory {
/**
- * Create a new EntityCollectionManager for the given context.
- * The EntityCollectionManager can safely be used on the current thread
+ * Create a new EntityCollectionManager for the given context.
+ * The EntityCollectionManager can safely be used on the current thread
* and will shard responses. The returned instance should not be shared
* among threads it will not be guaranteed to be thread safe.
*
- * @param collectionScope The collectionScope collectionScope to use
+ * @param collectionScope The collectionScope collectionScope to use
* when creating the collectionScope manager
*
* @return The collectionScope manager to perform operations within the provided context
*/
- public EntityCollectionManager
+ public EntityCollectionManager
createCollectionManager( CollectionScope collectionScope );
/**
- * Create a new EntityCollectionManagerSync for the given context.
- * The EntityCollectionManager can safely be used on the current thread
+ * Create a new EntityCollectionManagerSync for the given context.
+ * The EntityCollectionManager can safely be used on the current thread
* and will shard responses. The returned instance should not be shared
- * among threads it will not be guaranteed to be thread safe.
+ * among threads it will not be guaranteed to be thread safe.
* This implementation will be synchronous. Try to use the org.apache.usergrid.persistence.core.consistency
* implementation if possible
*
- * @param collectionScope The collectionScope collectionScope to use when
+ * @param collectionScope The collectionScope collectionScope to use when
* creating the collectionScope manager
*
* @return The collectionScope manager to perform operations within the provided context
*/
- public EntityCollectionManagerSync
+ public EntityCollectionManagerSync
createCollectionManagerSync( CollectionScope collectionScope );
+
+ void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 1c3e258..c5dd961 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -19,6 +19,7 @@ package org.apache.usergrid.persistence.collection.guice;
+import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerFactoryImpl;
import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
@@ -81,10 +82,7 @@ public class CollectionModule extends AbstractModule {
Multibinder.newSetBinder( binder(), EntityDeleted.class );
// create a guice factor for getting our collection manager
- install( new FactoryModuleBuilder()
- .implement( EntityCollectionManager.class, EntityCollectionManagerImpl.class )
- .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
- .build( EntityCollectionManagerFactory.class ) );
+ bind(EntityCollectionManagerFactory.class).to(EntityCollectionManagerFactoryImpl.class);
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
@@ -116,7 +114,7 @@ public class CollectionModule extends AbstractModule {
@Provides
@CollectionTaskExecutor
public TaskExecutor collectionTaskExecutor(final SerializationFig serializationFig){
- return new NamedTaskExecutorImpl( "collectiontasks",
+ return new NamedTaskExecutorImpl( "collectiontasks",
serializationFig.getTaskPoolThreadSize(), serializationFig.getTaskPoolQueueSize() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
new file mode 100644
index 0000000..790be19
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.collection.impl;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.inject.assistedinject.Assisted;
+import com.netflix.astyanax.Keyspace;
+import org.apache.usergrid.persistence.collection.*;
+import org.apache.usergrid.persistence.collection.guice.CollectionTaskExecutor;
+import org.apache.usergrid.persistence.collection.guice.Write;
+import org.apache.usergrid.persistence.collection.guice.WriteUpdate;
+import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkCommit;
+import org.apache.usergrid.persistence.collection.mvcc.stage.delete.MarkStart;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.*;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
+import org.apache.usergrid.persistence.core.task.TaskExecutor;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Classy class class.
+ */
+public class EntityCollectionManagerFactoryImpl implements EntityCollectionManagerFactory {
+
+
+ private final WriteStart writeStart;
+ private final WriteStart writeUpdate;
+ private final WriteUniqueVerify writeVerifyUnique;
+ private final WriteOptimisticVerify writeOptimisticVerify;
+ private final WriteCommit writeCommit;
+ private final RollbackAction rollback;
+ private final MarkStart markStart;
+ private final MarkCommit markCommit;
+ private final MvccEntitySerializationStrategy entitySerializationStrategy;
+ private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+ private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
+ private final Keyspace keyspace;
+ private final SerializationFig config;
+ private final EntityVersionCleanupFactory entityVersionCleanupFactory;
+ private final EntityVersionCreatedFactory entityVersionCreatedFactory;
+ private final EntityDeletedFactory entityDeletedFactory;
+ private final TaskExecutor taskExecutor;
+ private final CollectionScope collectionScope;
+ private LoadingCache<CollectionScope, EntityCollectionManager> ecmCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 )
+ .build( new CacheLoader<CollectionScope, EntityCollectionManager>() {
+ public EntityCollectionManager load( CollectionScope scope ) {
+ return new EntityCollectionManagerImpl(
+ writeStart,
+ writeUpdate,
+ writeVerifyUnique,
+ writeOptimisticVerify,writeCommit,rollback,markStart,markCommit,entitySerializationStrategy,uniqueValueSerializationStrategy,mvccLogEntrySerializationStrategy,keyspace,config,entityVersionCleanupFactory,entityVersionCreatedFactory,entityDeletedFactory,taskExecutor,collectionScope);
+ }
+ } );
+
+
+
+ public EntityCollectionManagerFactoryImpl( @Write final WriteStart writeStart,
+ @WriteUpdate final WriteStart writeUpdate,
+ final WriteUniqueVerify writeVerifyUnique,
+ final WriteOptimisticVerify writeOptimisticVerify,
+ final WriteCommit writeCommit,
+ final RollbackAction rollback,
+ final MarkStart markStart,
+ final MarkCommit markCommit,
+ @ProxyImpl final MvccEntitySerializationStrategy entitySerializationStrategy,
+ final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+ final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy,
+ final Keyspace keyspace,
+ final SerializationFig config,
+ final EntityVersionCleanupFactory entityVersionCleanupFactory,
+ final EntityVersionCreatedFactory entityVersionCreatedFactory,
+ final EntityDeletedFactory entityDeletedFactory,
+ @CollectionTaskExecutor final TaskExecutor taskExecutor,
+ @Assisted final CollectionScope collectionScope){
+
+ this.writeStart = writeStart;
+ this.writeUpdate = writeUpdate;
+ this.writeVerifyUnique = writeVerifyUnique;
+ this.writeOptimisticVerify = writeOptimisticVerify;
+ this.writeCommit = writeCommit;
+ this.rollback = rollback;
+ this.markStart = markStart;
+ this.markCommit = markCommit;
+ this.entitySerializationStrategy = entitySerializationStrategy;
+ this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+ this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
+ this.keyspace = keyspace;
+ this.config = config;
+ this.entityVersionCleanupFactory = entityVersionCleanupFactory;
+ this.entityVersionCreatedFactory = entityVersionCreatedFactory;
+ this.entityDeletedFactory = entityDeletedFactory;
+ this.taskExecutor = taskExecutor;
+ this.collectionScope = collectionScope;
+ }
+ @Override
+ public EntityCollectionManager createCollectionManager(CollectionScope collectionScope) {
+ try{
+ return ecmCache.get(collectionScope);
+ }catch (ExecutionException ee){
+ throw new RuntimeException(ee);
+ }
+ }
+
+ @Override
+ public EntityCollectionManagerSync createCollectionManagerSync(CollectionScope collectionScope) {
+ return new EntityCollectionManagerSyncImpl(this,collectionScope);
+ }
+
+ @Override
+ public void invalidate() {
+ ecmCache.invalidateAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
new file mode 100644
index 0000000..347080f
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/CPManager.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+
+
+package org.apache.usergrid.persistence.core;
+/**
+ * Base Manager Class
+ */
+
+public interface CPManager {
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
new file mode 100644
index 0000000..fd3bba6
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.entity;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+/**
+ * Get the entity data. Immutable bean for fast access
+ */
+public final class ApplicationEntityGroup {
+ public final ApplicationScope applicationScope;
+ public final List<Id> entityIds;
+
+
+ public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
+ this.applicationScope = applicationScope;
+ this.entityIds = entityIds;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7daca75e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
index cf30016..cb2f2eb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
@@ -19,8 +19,8 @@
*/
package org.apache.usergrid.persistence.core.migration.schema;
+import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
[4/4] incubator-usergrid git commit: moving dependencies to core
persistence
Posted by sf...@apache.org.
moving dependencies to core persistence
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c3869371
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c3869371
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c3869371
Branch: refs/heads/USERGRID-365
Commit: c3869371611cedb8682a9f74a3811685da9d3206
Parents: 7daca75
Author: Shawn Feldman <sf...@apache.org>
Authored: Tue Feb 10 10:50:24 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Tue Feb 10 10:50:24 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 3 +-
.../usergrid/corepersistence/GuiceModule.java | 12 +--
.../migration/EntityDataMigration.java | 71 ---------------
.../migration/EntityTypeMappingMigration.java | 53 ++++++-----
.../migration/GraphShardVersionMigration.java | 96 --------------------
.../rx/EdgesToTargetObservable.java | 63 -------------
.../corepersistence/rx/TargetIdObservable.java | 71 ---------------
.../impl/AllEntitiesInSystemObservableImpl.java | 27 ++++--
.../corepersistence/util/CpNamingUtils.java | 14 ---
.../migration/EntityDataMigrationIT.java | 32 +++++--
.../migration/EntityTypeMappingMigrationIT.java | 79 +++++++++-------
.../migration/GraphShardVersionMigrationIT.java | 30 ++++--
.../rx/AllEntitiesInSystemObservableIT.java | 10 +-
.../rx/ApplicationObservableTestIT.java | 6 +-
.../rx/EdgesFromSourceObservableIT.java | 4 +-
.../rx/EdgesToTargetObservableIT.java | 6 +-
.../rx/TargetIdObservableTestIT.java | 7 +-
.../mvcc/MvccEntityMigrationStrategy.java | 3 +-
.../MvccEntitySerializationStrategyProxy.java | 13 +--
...cEntitySerializationStrategyProxyV1Impl.java | 2 +-
...cEntitySerializationStrategyProxyV2Impl.java | 2 +-
.../serialization/impl/SerializationModule.java | 5 +
.../core/entity/ApplicationEntityGroup.java | 40 --------
.../core/migration/data/DataMigration.java | 5 +-
.../data/DataMigrationManagerImpl.java | 93 +++++++++++--------
.../migration/schema/MigrationStrategy.java | 9 +-
.../core/rx/AllEntitiesInSystemObservable.java | 2 +-
.../core/scope/ApplicationEntityGroup.java | 40 ++++++++
.../core/guice/MaxMigrationVersion.java | 6 +-
.../data/DataMigrationManagerImplTest.java | 37 +++++---
.../persistence/graph/guice/GraphModule.java | 6 ++
.../serialization/EdgeMigrationStrategy.java | 2 +-
.../graph/serialization/TargetIdObservable.java | 38 ++++++++
.../EdgeMetadataSerializationProxyImpl.java | 19 +++-
.../impl/TargetIdObservableImpl.java | 70 ++++++++++++++
35 files changed, 436 insertions(+), 540 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 5a4fcc1..0ca98e6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -35,8 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.AbstractEntity;
import org.apache.usergrid.persistence.Entity;
@@ -55,6 +53,7 @@ import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.impl.CollectionScopeImpl;
import org.apache.usergrid.persistence.core.migration.data.DataMigrationManager;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.util.Health;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
index 0143a89..b58a246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/GuiceModule.java
@@ -18,14 +18,10 @@ package org.apache.usergrid.corepersistence;
import com.google.inject.AbstractModule;
import com.google.inject.multibindings.Multibinder;
-import org.apache.usergrid.corepersistence.migration.EntityDataMigration;
import org.apache.usergrid.corepersistence.migration.EntityTypeMappingMigration;
-import org.apache.usergrid.corepersistence.migration.GraphShardVersionMigration;
import org.apache.usergrid.corepersistence.events.EntityDeletedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionCreatedHandler;
import org.apache.usergrid.corepersistence.events.EntityVersionDeletedHandler;
-import org.apache.usergrid.corepersistence.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.corepersistence.rx.ApplicationObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservableImpl;
import org.apache.usergrid.persistence.EntityManagerFactory;
@@ -33,9 +29,15 @@ import org.apache.usergrid.persistence.collection.event.EntityDeleted;
import org.apache.usergrid.persistence.collection.event.EntityVersionCreated;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxy;
import org.apache.usergrid.persistence.core.guice.CommonModule;
+import org.apache.usergrid.persistence.core.guice.ProxyImpl;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.graph.guice.GraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
import org.apache.usergrid.persistence.index.guice.IndexModule;
import org.apache.usergrid.persistence.map.guice.MapModule;
import org.apache.usergrid.persistence.queue.guice.QueueModule;
@@ -80,8 +82,6 @@ public class GuiceModule extends AbstractModule {
Multibinder<DataMigration> dataMigrationMultibinder =
Multibinder.newSetBinder( binder(), DataMigration.class );
dataMigrationMultibinder.addBinding().to( EntityTypeMappingMigration.class );
- dataMigrationMultibinder.addBinding().to( GraphShardVersionMigration.class );
- dataMigrationMultibinder.addBinding().to( EntityDataMigration.class );
Multibinder<EntityDeleted> entityBinder =
Multibinder.newSetBinder(binder(), EntityDeleted.class);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
deleted file mode 100644
index 861c343..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityDataMigration.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-
-import com.google.inject.Inject;
-import rx.functions.Action1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class EntityDataMigration implements DataMigration {
-
-
- private final MvccEntityMigrationStrategy migrationStrategy;
- private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
-
- @Inject
- public EntityDataMigration( final MvccEntityMigrationStrategy migrationStrategy,
- final AllEntitiesInSystemObservable allEntitiesInSystemObservable) {
- this.migrationStrategy = migrationStrategy;
- this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
- }
-
- @Override
- public void migrate( final ProgressObserver observer ) throws Throwable {
- allEntitiesInSystemObservable
- .getAllEntitiesInSystem(1000)
- .doOnNext(new Action1<ApplicationEntityGroup>() {
- @Override
- public void call( final ApplicationEntityGroup applicationEntityGroup ) {
- migrationStrategy.executeMigration(
- null,
- applicationEntityGroup,
- observer,
- CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
- );
- }
- })
- .toBlocking().last();
- }
-
- @Override
- public int getVersion() {
- return migrationStrategy.getMigrationVersion();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
index 08001d1..8352cff 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigration.java
@@ -25,9 +25,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.map.MapManager;
@@ -36,7 +35,11 @@ import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
+import rx.Observable;
+import rx.Scheduler;
import rx.functions.Action1;
+import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -56,34 +59,30 @@ public class EntityTypeMappingMigration implements DataMigration {
@Override
- public void migrate( final ProgressObserver observer ) throws Throwable {
+ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final ProgressObserver observer) throws Throwable {
final AtomicLong atomicLong = new AtomicLong();
- allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
- .doOnNext( new Action1<ApplicationEntityGroup>() {
-
-
- @Override
- public void call( final ApplicationEntityGroup applicationEntityGroup ) {
-
- final MapScope ms = CpNamingUtils.getEntityTypeMapScope( applicationEntityGroup.applicationScope.getApplication() );
-
-
- final MapManager mapManager = managerCache.getMapManager( ms );
-
- for(Id entityId: applicationEntityGroup.entityIds) {
- final UUID entityUuid = entityId.getUuid();
- final String entityType = entityId.getType();
-
- mapManager.putString( entityUuid.toString(), entityType );
-
- if ( atomicLong.incrementAndGet() % 100 == 0 ) {
- updateStatus( atomicLong, observer );
- }
- }
- }
- } ).toBlocking().lastOrDefault( null );
+ final MapScope ms = CpNamingUtils.getEntityTypeMapScope(applicationEntityGroup.applicationScope.getApplication());
+
+ final MapManager mapManager = managerCache.getMapManager(ms);
+ return Observable.from(applicationEntityGroup.entityIds).subscribeOn(Schedulers.io())
+ .map(new Func1<Id, Long>() {
+ @Override
+ public Long call(Id id) {
+ for (Id entityId : applicationEntityGroup.entityIds) {
+ final UUID entityUuid = entityId.getUuid();
+ final String entityType = entityId.getType();
+
+ mapManager.putString(entityUuid.toString(), entityType);
+
+ if (atomicLong.incrementAndGet() % 100 == 0) {
+ updateStatus(atomicLong, observer);
+ }
+ }
+ return atomicLong.get();
+ }
+ });
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
deleted file mode 100644
index 41b390b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigration.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- *
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing,
-* software distributed under the License is distributed on an
-* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-* KIND, either express or implied. See the License for the
-* specific language governing permissions and limitations
-* under the License.
- *
- */
-
-package org.apache.usergrid.corepersistence.migration;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.serialization.EdgeMigrationStrategy;
-import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
-
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Migration for migrating graph edges to the new Shards
- */
-public class GraphShardVersionMigration implements DataMigration {
-
-
- private final GraphManagerFactory graphManagerFactory;
- private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
- private final EdgesFromSourceObservable edgesFromSourceObservable;
- private final EdgeMigrationStrategy migrationStrategy;
-
-
- @Inject
- public GraphShardVersionMigration( final EdgeMigrationStrategy migrationStrategy,
- final GraphManagerFactory graphManagerFactory,
- final AllEntitiesInSystemObservable allEntitiesInSystemObservable,
- final EdgesFromSourceObservable edgesFromSourceObservable
- ) {
- this.migrationStrategy = migrationStrategy;
- this.graphManagerFactory = graphManagerFactory;
- this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
- this.edgesFromSourceObservable = edgesFromSourceObservable;
- }
-
-
- @Override
- public void migrate( final ProgressObserver observer ) throws Throwable {
-
- allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
- .flatMap(
- new Func1<ApplicationEntityGroup, Observable<Long>>() {
- @Override
- public Observable<Long> call(
- final ApplicationEntityGroup applicationEntityGroup) {
- final GraphManager gm = graphManagerFactory.createEdgeManager(applicationEntityGroup.applicationScope);
- Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationEntityGroup.applicationScope.getApplication());
- //emit a stream of all ids from this group
- return migrationStrategy.executeMigration(
- edgesFromSource,
- applicationEntityGroup,
- observer,
- CpNamingUtils.getCollectionScopeByEntityIdFunc1(applicationEntityGroup.applicationScope)
- );
- }
- })
- .toBlocking().lastOrDefault(null);
- ;
- }
-
-
- @Override
- public int getVersion() {
- return migrationStrategy.getMigrationVersion();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
deleted file mode 100644
index c5dc54d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservable.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all edges where the given node is the target node
- */
-public class EdgesToTargetObservable {
-
- private static final Logger logger = LoggerFactory.getLogger( EdgesToTargetObservable.class );
-
-
- /**
- * Get all edges from the source
- */
- public static Observable<Edge> getEdgesToTarget(final GraphManager gm, final Id targetNode) {
- Observable<String> edgeTypes = gm.getEdgeTypesToTarget( new SimpleSearchEdgeType( targetNode, null, null ) );
-
- return edgeTypes.flatMap( new Func1<String, Observable<Edge>>() {
- @Override
- public Observable<Edge> call( final String edgeType ) {
-
- logger.debug( "Loading edges of edgeType {} to {}", edgeType, targetNode);
-
- return gm.loadEdgesToTarget( new SimpleSearchByEdgeType( targetNode, edgeType, Long.MAX_VALUE,
- SearchByEdgeType.Order.DESCENDING, null ) );
- }
- } );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
deleted file mode 100644
index 96b638b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/TargetIdObservable.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.rx;
-
-
-import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Emits the id of all nodes that are target nodes from the given source node
- */
-public class TargetIdObservable {
-
- private static final Logger logger = LoggerFactory.getLogger( TargetIdObservable.class );
- private final EdgesFromSourceObservable edgesFromSourceObservable;
-
- public TargetIdObservable(final EdgesFromSourceObservable edgesFromSourceObservable){
- this.edgesFromSourceObservable = edgesFromSourceObservable;
- }
-
- /**
- * Get all nodes that are target nodes from the sourceNode
- * @param gm
- * @param sourceNode
- *
- * @return
- */
- public Observable<Id> getTargetNodes( final GraphManager gm, final Id sourceNode) {
-
- //only search edge types that start with collections
- return edgesFromSourceObservable.edgesFromSource(gm, sourceNode ).map( new Func1<Edge, Id>() {
-
-
- @Override
- public Id call( final Edge edge ) {
- final Id targetNode = edge.getTargetNode();
-
- logger.debug( "Emitting targetId of {}", edge );
-
-
- return targetNode;
- }
- } );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
index 4218107..4608bd2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntitiesInSystemObservableImpl.java
@@ -22,7 +22,7 @@ package org.apache.usergrid.corepersistence.rx.impl;
import java.util.List;
-import org.apache.usergrid.corepersistence.rx.TargetIdObservable;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
@@ -30,6 +30,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
@@ -74,18 +75,28 @@ public class AllEntitiesInSystemObservableImpl implements AllEntitiesInSystemObs
final Observable<Id> entityNodes =
targetIdObservable.getTargetNodes(gm, applicationId);
+
+ //get scope here
+
+
+ //emit Scope + ID
+
//create our application node to emit since it's an entity as well
final Observable<Id> applicationNode = Observable.just( applicationId );
//merge both the specified application node and the entity node
// so they all get used
- return Observable.merge( applicationNode, entityNodes ).buffer(bufferSize)
- .map( new Func1<List<Id>, ApplicationEntityGroup>() {
- @Override
- public ApplicationEntityGroup call( final List<Id> id ) {
- return new ApplicationEntityGroup( applicationScope, id );
- }
- } );
+ return Observable
+ .merge(applicationNode, entityNodes)
+ .buffer(bufferSize)
+ .map(new Func1<List<Id>, ApplicationEntityGroup>() {
+ @Override
+ public ApplicationEntityGroup call(final List<Id> ids) {
+ //CpNamingUtils.getCollectionScopeNameFromEntityType(applicationId,);
+
+ return new ApplicationEntityGroup(applicationScope, ids);
+ }
+ });
}
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 36c3a82..154519b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -72,20 +72,6 @@ public class CpNamingUtils {
*/
public static String TYPES_BY_UUID_MAP = "zzz_typesbyuuid_zzz";
- /**
- * Returns a function to retreive collection scope
- * @param applicationScope
- * @return
- */
- public static Func1<Id,? extends ApplicationScope> getCollectionScopeByEntityIdFunc1(final ApplicationScope applicationScope){
- Func1<Id,ApplicationScope> func = new Func1<Id, ApplicationScope>() {
- @Override
- public ApplicationScope call(Id id) {
- return CpNamingUtils.getCollectionScopeNameFromEntityType(applicationScope.getApplication(), id.getType());
- }
- };
- return func;
- }
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
index f58696e..2509771 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityDataMigrationIT.java
@@ -25,6 +25,9 @@ import java.util.Iterator;
import java.util.Set;
import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyProxyV2Impl;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Before;
import org.junit.Rule;
@@ -66,8 +69,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
private Injector injector;
- private EntityDataMigration entityDataMigration;
- private ManagerCache managerCache;
+ private DataMigration entityDataMigration;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
private MvccEntitySerializationStrategy v1Strategy;
@@ -81,17 +83,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
*/
@Rule
public MigrationTestRule migrationTestRule =
- new MigrationTestRule( app, CpSetup.getInjector() ,EntityDataMigration.class );
+ new MigrationTestRule( app, CpSetup.getInjector() ,MvccEntitySerializationStrategyProxyV2Impl.class );
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
public void setup() {
emf = setup.getEmf();
injector = CpSetup.getInjector();
- entityDataMigration = injector.getInstance( EntityDataMigration.class );
- managerCache = injector.getInstance( ManagerCache.class );
+ entityDataMigration = injector.getInstance( MvccEntitySerializationStrategyProxyV2Impl.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
MvccEntityMigrationStrategy strategy = injector.getInstance(Key.get(MvccEntityMigrationStrategy.class));
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
v1Strategy = strategy.getMigration().from();
v2Strategy = strategy.getMigration().to();
}
@@ -132,7 +135,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types. Assumes we're
//using a test system, and it's not a huge amount of data, otherwise we'll overflow.
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -168,7 +171,18 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
assertTrue( "Saved new entities", savedEntities.size() > 0 );
//perform the migration
- entityDataMigration.migrate( progressObserver );
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(ApplicationEntityGroup applicationEntityGroup) {
+ try {
+ entityDataMigration.migrate(applicationEntityGroup, progressObserver).toBlocking().last();
+ }catch (Throwable e){
+ throw new RuntimeException(e);
+ }
+ }
+ }).toBlocking().last();
+
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
@@ -184,7 +198,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//now visit all entities in the system again, load them from v2, and ensure they're the same
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -220,7 +234,7 @@ public class EntityDataMigrationIT extends AbstractCoreIT {
//now visit all entities in the system again, and load them from the EM,
// ensure we see everything we did in the v1 traversal
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
index ee95d58..3da0b85 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/EntityTypeMappingMigrationIT.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.junit.Before;
import org.junit.Rule;
@@ -72,7 +73,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
@Rule
public MigrationTestRule migrationTestRule = new MigrationTestRule(
app, CpSetup.getInjector() ,EntityTypeMappingMigration.class );
-
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
@@ -83,6 +84,7 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
keyspace = injector.getInstance( Keyspace.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
}
@@ -123,54 +125,61 @@ public class EntityTypeMappingMigrationIT extends AbstractCoreIT {
final TestProgressObserver progressObserver = new TestProgressObserver();
- entityTypeMappingMigration.migrate( progressObserver );
-
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(final ApplicationEntityGroup entity) {
+ try {
+ entityTypeMappingMigration.migrate(entity, progressObserver).toBlocking().last();
+ }catch (Throwable e ){
+ throw new RuntimeException(e);
+ }
+ }
+ });
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
- .doOnNext( new Action1<ApplicationEntityGroup>() {
+ allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+ .doOnNext(new Action1<ApplicationEntityGroup>() {
@Override
public void call(
- final ApplicationEntityGroup entity ) {
+ final ApplicationEntityGroup entity) {
//ensure that each one has a type
final EntityManager em = emf.getEntityManager(
- entity.applicationScope.getApplication().getUuid() );
+ entity.applicationScope.getApplication().getUuid());
- for ( final Id id : entity.entityIds ) {
+ for (final Id id : entity.entityIds) {
try {
- final Entity returned = em.get( id.getUuid() );
+ final Entity returned = em.get(id.getUuid());
//we seem to occasionally get phantom edges. If this is the
// case we'll store the type _> uuid mapping, but we won't have
// anything to load
- if ( returned != null ) {
- assertEquals( id.getUuid(), returned.getUuid() );
- assertEquals( id.getType(), returned.getType() );
- }
- else {
- final String type = managerCache.getMapManager( CpNamingUtils
- .getEntityTypeMapScope(
- entity.applicationScope.getApplication() ) )
- .getString( id.getUuid()
- .toString() );
-
- assertEquals( id.getType(), type );
+ if (returned != null) {
+ assertEquals(id.getUuid(), returned.getUuid());
+ assertEquals(id.getType(), returned.getType());
+ } else {
+ final String type = managerCache.getMapManager(CpNamingUtils
+ .getEntityTypeMapScope(
+ entity.applicationScope.getApplication()))
+ .getString(id.getUuid()
+ .toString());
+
+ assertEquals(id.getType(), type);
}
- }
- catch ( Exception e ) {
- throw new RuntimeException( "Unable to get entity " + id
- + " by UUID, migration failed", e );
- }
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to get entity " + id
+ + " by UUID, migration failed", e);
+ }
- allEntities.remove( id );
- }
- }
- } ).toBlocking().lastOrDefault( null );
+ allEntities.remove(id);
+ }
+ }
+ }).toBlocking().lastOrDefault(null);
- assertEquals( "Every element should have been encountered", 0, allEntities.size() );
- assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
- assertTrue( "Progress observer should have update messages", progressObserver.getUpdates().size() > 0 );
- }
-}
+ assertEquals("Every element should have been encountered", 0, allEntities.size());
+ assertFalse("Progress observer should not have failed", progressObserver.getFailed());
+ assertTrue("Progress observer should have update messages", progressObserver.getUpdates().size() > 0);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
index 45f41aa..2d6d0d9 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/migration/GraphShardVersionMigrationIT.java
@@ -23,7 +23,10 @@ package org.apache.usergrid.corepersistence.migration;
import java.util.HashSet;
import java.util.Set;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationProxyImpl;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -56,7 +59,7 @@ import org.junit.Ignore;
public class GraphShardVersionMigrationIT extends AbstractCoreIT {
private Injector injector;
- private GraphShardVersionMigration graphShardVersionMigration;
+ private DataMigration graphShardVersionMigration;
private ManagerCache managerCache;
private DataMigrationManager dataMigrationManager;
private MigrationInfoSerialization migrationInfoSerialization;
@@ -66,18 +69,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
* Rule to do the resets we need
*/
@Rule
- public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,GraphShardVersionMigration.class );
-
+ public MigrationTestRule migrationTestRule = new MigrationTestRule( app, CpSetup.getInjector() ,EdgeMetadataSerializationProxyImpl.class );
+ private AllEntitiesInSystemObservable allEntitiesInSystemObservable;
@Before
public void setup() {
injector = CpSetup.getInjector();
- graphShardVersionMigration = injector.getInstance( GraphShardVersionMigration.class );
+ graphShardVersionMigration = injector.getInstance( EdgeMetadataSerializationProxyImpl.class );
managerCache = injector.getInstance( ManagerCache.class );
dataMigrationManager = injector.getInstance( DataMigrationManager.class );
migrationInfoSerialization = injector.getInstance( MigrationInfoSerialization.class );
-
+ allEntitiesInSystemObservable = injector.getInstance(AllEntitiesInSystemObservable.class);
}
@@ -115,7 +118,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//read everything in previous version format and put it into our types.
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
@@ -155,7 +158,18 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//perform the migration
- graphShardVersionMigration.migrate( progressObserver );
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
+ .doOnNext( new Action1<ApplicationEntityGroup>() {
+ @Override
+ public void call(
+ final ApplicationEntityGroup entity) {
+ try {
+ graphShardVersionMigration.migrate(entity, progressObserver).toBlocking().last();
+ }catch (Throwable e){
+ throw new RuntimeException(e);
+ }
+ }
+ }).toBlocking().last();
assertEquals( "Newly saved entities encounterd", 0, allEntities.size() );
assertFalse( "Progress observer should not have failed", progressObserver.getFailed() );
@@ -172,7 +186,7 @@ public class GraphShardVersionMigrationIT extends AbstractCoreIT {
//now visit all nodes in the system and remove their types from the multi maps, it should be empty at the end
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000)
+ allEntitiesInSystemObservable.getAllEntitiesInSystem( 1000)
.doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call(
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
index b246e04..30c9ac3 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/AllEntitiesInSystemObservableIT.java
@@ -24,7 +24,10 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.usergrid.corepersistence.rx.impl.AllEntitiesInSystemObservableImpl;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +60,9 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ AllEntitiesInSystemObservable allEntitiesInSystemObservableImpl = CpSetup.getInjector().getInstance(AllEntitiesInSystemObservable.class);
+ TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
final EntityManager em = app.getEntityManager();
final String type1 = "type1thing";
@@ -94,7 +100,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- AllEntitiesInSystemObservableImpl.getAllEntitiesInSystem(managerCache, 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
+ allEntitiesInSystemObservableImpl.getAllEntitiesInSystem( 1000).doOnNext( new Action1<ApplicationEntityGroup>() {
@Override
public void call( final ApplicationEntityGroup entity ) {
@@ -127,7 +133,7 @@ public class AllEntitiesInSystemObservableIT extends AbstractCoreIT {
//test connections
- TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
index daeca87..7c902ea 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/ApplicationObservableTestIT.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
-import org.apache.usergrid.corepersistence.rx.impl.ApplicationObservable;
+import org.apache.usergrid.persistence.core.rx.ApplicationObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
import rx.functions.Action1;
-import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
@@ -51,6 +50,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
final Application createdApplication = app.getEntityManager().getApplication();
+ ApplicationObservable applicationObservable = CpSetup.getInjector().getInstance(ApplicationObservable.class);
//now our get all apps we expect. There may be more, but we don't care about those.
final Set<UUID> applicationIds = new HashSet<UUID>() {{
@@ -65,7 +65,7 @@ public class ApplicationObservableTestIT extends AbstractCoreIT {
//clean up our wiring
ManagerCache managerCache = CpSetup.getInjector().getInstance( ManagerCache.class );
- Observable<Id> appObservable = ApplicationObservable.getAllApplicationIds(managerCache);
+ Observable<Id> appObservable = applicationObservable.getAllApplicationIds();
appObservable.doOnNext( new Action1<Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 2aa7fbc..2564fe5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.EdgesToTargetObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -60,6 +61,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ EdgesToTargetObservable edgesToTargetObservable = CpSetup.getInjector().getInstance(EdgesToTargetObservable.class);
final EntityManager em = app.getEntityManager();
final Application createdApplication = em.getApplication();
@@ -96,7 +98,7 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- EdgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
+ edgesToTargetObservable.getEdgesToTarget( gm, target ).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index ef0d953..5d25f62 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.EdgesFromSourceObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ EdgesFromSourceObservable edgesFromSourceObservable= CpSetup.getInjector().getInstance(EdgesFromSourceObservable.class);
final EntityManager em = app.getEntityManager();
final String type1 = "type1things";
@@ -92,7 +94,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- EdgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
+ edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
@@ -124,7 +126,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
//test connections
- EdgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
+ edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
@Override
public void call( final Edge edge ) {
final String edgeType = edge.getType();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
index cde8866..e5b0319 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/TargetIdObservableTestIT.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
+import org.apache.usergrid.persistence.graph.serialization.TargetIdObservable;
import org.junit.Test;
import org.apache.usergrid.AbstractCoreIT;
@@ -59,6 +60,8 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
@Test
public void testEntities() throws Exception {
+ TargetIdObservable targetIdObservable = CpSetup.getInjector().getInstance(TargetIdObservable.class);
+
final EntityManager em = app.getEntityManager();
@@ -93,7 +96,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- TargetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, applicationId ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
@@ -116,7 +119,7 @@ public class TargetIdObservableTestIT extends AbstractCoreIT {
//test connections
- TargetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
+ targetIdObservable.getTargetNodes( gm, source ).doOnNext( new Action1<Id>() {
@Override
public void call( final Id target ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
index 08082e7..0a3cd3a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/MvccEntityMigrationStrategy.java
@@ -19,13 +19,12 @@
*/
package org.apache.usergrid.persistence.collection.mvcc;
-import org.apache.usergrid.persistence.collection.MvccEntity;
import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategy;
import org.apache.usergrid.persistence.core.migration.schema.MigrationStrategy;
/**
* Classy class class.
*/
-public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy,MvccEntity> {
+public interface MvccEntityMigrationStrategy extends MigrationStrategy<MvccEntitySerializationStrategy> {
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
index 940fc3f..bb192cd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxy.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
* migration data goes to both sources and is read from the old source. After the ugprade completes,
* it will be available from the new source
*/
-public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
+public abstract class MvccEntitySerializationStrategyProxy implements MvccEntitySerializationStrategy, MvccEntityMigrationStrategy {
private final DataMigrationManager dataMigrationManager;
@@ -146,7 +146,7 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
* Return true if we're on an old version
*/
private boolean isOldVersion() {
- return dataMigrationManager.getCurrentVersion() < getMigrationVersion();
+ return dataMigrationManager.getCurrentVersion() < getVersion();
}
@@ -156,7 +156,7 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
}
@Override
- public Observable<Long> executeMigration(final Observable<MvccEntity> entityObservable,final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer, final Func1<Id,? extends ApplicationScope> getCollectionScopeFromEntityId) {
+ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer) {
final AtomicLong atomicLong = new AtomicLong();
final MutationBatch totalBatch = keyspace.prepareMutationBatch();
@@ -172,11 +172,12 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
@Override
public Id call(Id entityId) {
- ApplicationScope applicationScope = getCollectionScopeFromEntityId.call(entityId);
+ ApplicationScope applicationScope = applicationEntityGroup.applicationScope;
if (!(applicationScope instanceof CollectionScope)) {
throw new IllegalArgumentException("getCollectionScopeFromEntityId must return a collection scope");
}
+
CollectionScope currentScope = (CollectionScope) applicationScope;
MigrationStrategy.MigrationRelationship<MvccEntitySerializationStrategy> migration = getMigration();
//for each element in the history in the previous version,
@@ -214,10 +215,10 @@ public abstract class MvccEntitySerializationStrategyProxy implements MvccEntit
try {
batch.execute();
- po.update( getMigrationVersion(), "Finished copying " + count + " entities to the new format" );
+ po.update( getVersion(), "Finished copying " + count + " entities to the new format" );
}
catch ( ConnectionException e ) {
- po.failed( getMigrationVersion(), "Failed to execute mutation in cassandra" );
+ po.failed( getVersion(), "Failed to execute mutation in cassandra" );
throw new DataMigrationException( "Unable to migrate batches ", e );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
index 6bb6e11..7be7f70 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV1Impl.java
@@ -59,7 +59,7 @@ public class MvccEntitySerializationStrategyProxyV1Impl extends MvccEntitySerial
}
@Override
- public int getMigrationVersion() {
+ public int getVersion() {
return V2Impl.MIGRATION_VERSION;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
index 4ec2d2a..6b24ac7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyProxyV2Impl.java
@@ -26,7 +26,7 @@ public class MvccEntitySerializationStrategyProxyV2Impl extends MvccEntitySerial
}
@Override
- public int getMigrationVersion() {
+ public int getVersion() {
return V3Impl.MIGRATION_VERSION;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
index ab0b489..cf55eca 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/SerializationModule.java
@@ -22,6 +22,7 @@ import org.apache.usergrid.persistence.collection.mvcc.MvccEntityMigrationStrate
import org.apache.usergrid.persistence.collection.mvcc.MvccLogEntrySerializationStrategy;
import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.core.guice.*;
+import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
import com.google.inject.AbstractModule;
@@ -53,6 +54,10 @@ public class SerializationModule extends AbstractModule {
bind(MvccEntitySerializationStrategy.class ).annotatedWith( ProxyImpl.class )
.to(MvccEntitySerializationStrategyProxyV2Impl.class);
+ Multibinder<DataMigration> dataMigrationMultibinder =
+ Multibinder.newSetBinder( binder(), DataMigration.class );
+ dataMigrationMultibinder.addBinding().to( MvccEntitySerializationStrategyProxyV2Impl.class );
+
bind( MvccEntityMigrationStrategy.class ).to(MvccEntitySerializationStrategyProxyV2Impl.class);
bind( MvccLogEntrySerializationStrategy.class ).to( MvccLogEntrySerializationStrategyImpl.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
deleted file mode 100644
index fd3bba6..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/entity/ApplicationEntityGroup.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.entity;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import java.util.List;
-
-/**
- * Get the entity data. Immutable bean for fast access
- */
-public final class ApplicationEntityGroup {
- public final ApplicationScope applicationScope;
- public final List<Id> entityIds;
-
-
- public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
- this.applicationScope = applicationScope;
- this.entityIds = entityIds;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
index 775df5d..f945375 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigration.java
@@ -19,6 +19,9 @@
package org.apache.usergrid.persistence.core.migration.data;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import rx.Observable;
+
/**
* An interface for updating data. Has 2 basic functions. First it will perform the migration and update the status
* object.
@@ -45,7 +48,7 @@ public interface DataMigration {
* @param observer
* @throws Throwable
*/
- public void migrate(final ProgressObserver observer) throws Throwable;
+ public Observable migrate(final ApplicationEntityGroup applicationEntityGroup,final ProgressObserver observer) throws Throwable;
/**
* Get the version of this migration. It must be unique.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
index a9719b7..8ad3295 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImpl.java
@@ -27,6 +27,8 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +40,8 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import rx.Observable;
+import rx.functions.Func1;
@Singleton
@@ -48,13 +52,14 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
private final TreeMap<Integer, DataMigration> migrationTreeMap = new TreeMap<>();
private final MigrationInfoSerialization migrationInfoSerialization;
+ private final AllEntitiesInSystemObservable allEntitiesInSystemObservable;
/**
* Cache to cache versions temporarily
*/
private final LoadingCache<String, Integer> versionCache = CacheBuilder.newBuilder()
//cache the local value for 1 minute
- .expireAfterWrite( 1, TimeUnit.MINUTES ).build( new CacheLoader<String, Integer>() {
+ .expireAfterWrite(1, TimeUnit.MINUTES).build( new CacheLoader<String, Integer>() {
@Override
public Integer load( final String key ) throws Exception {
return migrationInfoSerialization.getVersion();
@@ -64,14 +69,17 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
@Inject
public DataMigrationManagerImpl( final MigrationInfoSerialization migrationInfoSerialization,
- final Set<DataMigration> migrations ) {
+ final Set<DataMigration> migrations,
+ final AllEntitiesInSystemObservable allEntitiesInSystemObservable
+ ) {
- Preconditions.checkNotNull( migrationInfoSerialization,
+ Preconditions.checkNotNull( migrationInfoSerialization,
"migrationInfoSerialization must not be null" );
Preconditions.checkNotNull( migrations, "migrations must not be null" );
+ Preconditions.checkNotNull( allEntitiesInSystemObservable, "allentitiesobservable must not be null" );
this.migrationInfoSerialization = migrationInfoSerialization;
-
+ this.allEntitiesInSystemObservable = allEntitiesInSystemObservable;
for ( DataMigration migration : migrations ) {
@@ -89,8 +97,8 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
final Class<? extends DataMigration> currentClass = migration.getClass();
- throw new DataMigrationException( String.format(
- "Data migrations must be unique. Both classes %s and %s have version %d",
+ throw new DataMigrationException( String.format(
+ "Data migrations must be unique. Both classes %s and %s have version %d",
existingClass, currentClass, version ) );
}
@@ -114,49 +122,56 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
migrationTreeMap.lastKey() );
//we have our migrations to run, execute them
- final NavigableMap<Integer, DataMigration> migrationsToRun =
+ final NavigableMap<Integer, DataMigration> migrationsToRun =
migrationTreeMap.tailMap( currentVersion, false );
- CassandraProgressObserver observer = new CassandraProgressObserver();
+ final CassandraProgressObserver observer = new CassandraProgressObserver();
+ allEntitiesInSystemObservable.getAllEntitiesInSystem(1000)
+ .map(
+ new Func1<ApplicationEntityGroup, Long>() {
+ @Override
+ public Long call(
+ final ApplicationEntityGroup applicationEntityGroup) {
+ for (DataMigration migration : migrationsToRun.values()) {
- for ( DataMigration migration : migrationsToRun.values() ) {
+ migrationInfoSerialization.setStatusCode(StatusCode.RUNNING.status);
- migrationInfoSerialization.setStatusCode( StatusCode.RUNNING.status );
+ final int migrationVersion = migration.getVersion();
- final int migrationVersion = migration.getVersion();
+ LOG.info("Running migration version {}", migrationVersion);
- LOG.info( "Running migration version {}", migrationVersion );
+ observer.update(migrationVersion, "Starting migration");
- observer.update( migrationVersion, "Starting migration" );
+ //perform this migration, if it fails, short circuit
+ try {
+ migration.migrate(applicationEntityGroup,observer);
+ } catch (Throwable throwable) {
+ observer.failed(migrationVersion, "Exception thrown during migration", throwable);
- //perform this migration, if it fails, short circuit
- try {
- migration.migrate( observer );
- }
- catch ( Throwable throwable ) {
- observer.failed( migrationVersion, "Exception thrown during migration", throwable );
+ LOG.error("Unable to migrate to version {}.", migrationVersion, throwable);
- LOG.error( "Unable to migrate to version {}.", migrationVersion, throwable );
+ return 0L;
+ }
- return;
- }
+ //we had an unhandled exception or the migration failed, short circuit
+ if (observer.failed) {
+ return 0L;
+ }
- //we had an unhandled exception or the migration failed, short circuit
- if ( observer.failed ) {
- return;
- }
+ //set the version
+ migrationInfoSerialization.setVersion(migrationVersion);
- //set the version
- migrationInfoSerialization.setVersion( migrationVersion );
-
- versionCache.invalidateAll();
-
- //update the observer for progress so other nodes can see it
- observer.update( migrationVersion, "Completed successfully" );
- }
+ versionCache.invalidateAll();
+ //update the observer for progress so other nodes can see it
+ observer.update(migrationVersion, "Completed successfully");
+ }
+ return 0L;
+ }
+ }).toBlocking().lastOrDefault(null);
+ ;
migrationInfoSerialization.setStatusCode( StatusCode.COMPLETE.status );
}
@@ -188,7 +203,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
public void resetToVersion( final int version ) {
final int highestAllowed = migrationTreeMap.lastKey();
- Preconditions.checkArgument( version <= highestAllowed,
+ Preconditions.checkArgument( version <= highestAllowed,
"You cannot set a version higher than the max of " + highestAllowed);
Preconditions.checkArgument( version >= 0, "You must specify a version of 0 or greater" );
@@ -225,7 +240,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
@Override
public void failed( final int migrationVersion, final String reason ) {
- final String storedMessage = String.format(
+ final String storedMessage = String.format(
"Failed to migrate, reason is appended. Error '%s'", reason );
@@ -245,13 +260,13 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
throwable.printStackTrace( new PrintWriter( stackTrace ) );
- final String storedMessage = String.format(
+ final String storedMessage = String.format(
"Failed to migrate, reason is appended. Error '%s' %s", reason, stackTrace.toString() );
update( migrationVersion, storedMessage );
- LOG.error( "Unable to migrate version {} due to reason {}.",
+ LOG.error( "Unable to migrate version {} due to reason {}.",
migrationVersion, reason, throwable );
failed = true;
@@ -262,7 +277,7 @@ public class DataMigrationManagerImpl implements DataMigrationManager {
@Override
public void update( final int migrationVersion, final String message ) {
- final String formattedOutput = String.format(
+ final String formattedOutput = String.format(
"Migration version %d. %s", migrationVersion, message );
//Print this to the info log
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
index cb2f2eb..758f54e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
@@ -19,7 +19,7 @@
*/
package org.apache.usergrid.persistence.core.migration.schema;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -30,18 +30,13 @@ import rx.functions.Func1;
/**
* Interface to encapsulate directional migrations
*/
-public interface MigrationStrategy<T,E> {
+public interface MigrationStrategy<T> extends DataMigration {
/**
* Returns the migration pattern to use
* @return
*/
public MigrationRelationship<T> getMigration();
- public int getMigrationVersion();
-
- Observable<Long> executeMigration(final Observable<E> sourceObservable, final ApplicationEntityGroup applicationEntityGroup, final DataMigration.ProgressObserver observer, final Func1<Id,? extends ApplicationScope> getScopeFromEntityId);
-
-
public class MigrationRelationship<T> {
private final T from;
private final T to;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index 2205f72..1ec017e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -19,7 +19,7 @@
*/
package org.apache.usergrid.persistence.core.rx;
-import org.apache.usergrid.persistence.core.entity.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import rx.Observable;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c3869371/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
new file mode 100644
index 0000000..a767f4a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.core.scope;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import java.util.List;
+
+/**
+ * Get the entity data. Immutable bean for fast access
+ */
+public final class ApplicationEntityGroup {
+ public final ApplicationScope applicationScope;
+ public final List<Id> entityIds;
+
+ public ApplicationEntityGroup(final ApplicationScope applicationScope, final List<Id> entityIds) {
+ this.applicationScope = applicationScope;
+ this.entityIds = entityIds;
+ }
+
+}
+