You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/27 02:57:15 UTC

[2/3] incubator-usergrid git commit: WIP, still needs refactored and cleaned up.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
new file mode 100644
index 0000000..36c721a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+public interface ProgressObserver{
+            /**
+             * Mark the migration as failed
+             * @param migrationVersion The migration version running during the failure
+             * @param reason The reason to save
+             */
+            public void failed(final int migrationVersion, final String reason);
+
+            /**
+             * Mark the migration as failed with a stack trace
+             * @param migrationVersion The migration version running during the failure
+             * @param reason The error description to save
+             * @param throwable The error that happened
+             */
+            public void failed(final int migrationVersion, final String reason, final Throwable throwable);
+
+
+            /**
+             * Update the status of the migration with the message
+             *
+             * @param message The message to save for the status
+             */
+            public void update(final int migrationVersion, final String message);
+        }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
new file mode 100644
index 0000000..14d488b
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+/**
+ * Marker interface for implementations that may migrate their internal format
+ */
+public interface VersionedData {
+
+    /**
+     * Get the version of the implementation
+     * @return
+     */
+    public int getImplementationVersion();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
new file mode 100644
index 0000000..a5123cf
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
@@ -0,0 +1,113 @@
+/*
+ *
+ *  *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *  *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.inject.Inject;
+
+
+public class VersionedSet<T extends VersionedData> {
+
+
+    /**
+     * Cache so that after our initial lookup, it O(1) since this will be used heavily
+     *
+     */
+    private Map<Integer, MigrationRelationship<T>> cacheVersion = new HashMap<>();
+
+    private List<T> orderedVersions = new ArrayList<>();
+
+
+
+
+    public VersionedSet(final Collection<T> versions){
+
+        orderedVersions.addAll(versions  );
+        Collections.sort( orderedVersions, new VersionedDataComparator() );
+
+    }
+
+
+    /**
+     * Get the migration relationship based on our current version
+     * @param currentVersion
+     * @return
+     */
+    public MigrationRelationship<T> getCurrentReadVersion(final int currentVersion){
+
+        final MigrationRelationship<T> relationship = cacheVersion.get( currentVersion );
+
+        if(relationship != null){
+            return relationship;
+        }
+
+        //not there, find it.  Not the most efficient, but it happens once per version, which rarely changes, so not a big deal
+
+
+        for(T current: orderedVersions){
+            //not our instance
+            if(current.getImplementationVersion() > currentVersion){
+                continue;
+            }
+
+
+            //we always go from our first match to our highest version.  Any versions between can be skipped
+            final MigrationRelationship<T> migrationRelationship = new MigrationRelationship<>( current, orderedVersions.get( 0 )  );
+
+            cacheVersion.put( currentVersion, migrationRelationship );
+
+            return migrationRelationship;
+
+        }
+
+        //if we get here, something is wrong
+        throw new IllegalArgumentException( "Could not find a migration version for version " + currentVersion + " min found was " + orderedVersions.get( orderedVersions.size()-1 ) );
+
+
+    }
+
+
+    /**
+     * Orders from high to low
+     */
+    private static final class VersionedDataComparator implements Comparator<VersionedData>
+    {
+
+        @Override
+        public int compare( final VersionedData o1, final VersionedData o2 ) {
+            return Integer.compare( o1.getImplementationVersion(), o2.getImplementationVersion())*-1;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
deleted file mode 100644
index e1714d0..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.migration.schema;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Interface to encapsulate directional migrations
- */
-public interface MigrationStrategy<T>  {
-    /**
-     * Returns the migration pattern to use
-     * @return
-     */
-    public MigrationRelationship<T> getMigration();
-
-    public int getVersion();
-
-    public class MigrationRelationship<T>  {
-        private final T from;
-        private final T to;
-
-        public MigrationRelationship(T from,T to){
-            this.from = from;
-            this.to = to;
-        }
-        public T from(){
-            return from;
-        }
-
-        public T to(){
-            return to;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index e029f92..e7a7aca 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.core.rx;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
+
 import rx.Observable;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
deleted file mode 100644
index d0d81c7..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.scope;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import java.util.List;
-
-/**
- * Get the entity data.  Immutable bean for fast access
- */
-public final class ApplicationEntityGroup<T extends ApplicationScope> {
-    public final T applicationScope;
-    public final List<EntityIdScope<T>> entityIds;
-
-    public ApplicationEntityGroup(final T applicationScope, final List<EntityIdScope<T>> entityIds) {
-        this.applicationScope = applicationScope;
-        this.entityIds = entityIds;
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
deleted file mode 100644
index 73b47fd..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.scope;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-/**
- * Tuple containing collectionscope and entityid
- */
-public class EntityIdScope<T extends ApplicationScope>{
-    private final Id id;
-    private final T collectionScope;
-
-    public EntityIdScope(Id id, T collectionScope){
-        this.id = id;
-        this.collectionScope = collectionScope;
-    }
-
-
-    public Id getId() {
-        return id;
-    }
-
-    public T getCollectionScope() {
-        return collectionScope;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index d3b9b49..58cf5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.core.guice;
 
 
 import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import rx.Observable;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
index 98b057b..3aeb629 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
@@ -24,9 +24,8 @@ import java.io.IOException;
 
 import com.google.inject.AbstractModule;
 import com.netflix.config.ConfigurationManager;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+
 import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
 
 
 public abstract class TestModule extends AbstractModule {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index e067f60..20972f6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -33,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.EntityIdScope;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 import org.junit.Test;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -42,7 +41,6 @@ import rx.Observable;
 import rx.Subscriber;
 
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.*;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
index 7d4fb2e..785e341 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
@@ -23,9 +23,8 @@ import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -45,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
  * Encapsulates data mi
  */
 
-public class EdgeDataMigrationImpl implements ApplicationDataMigration {
+public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
 
     private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
 
@@ -68,67 +67,69 @@ public class EdgeDataMigrationImpl implements ApplicationDataMigration {
     }
 
 
+
+
+
     @Override
-    public Observable migrate(final Observable<ApplicationScope> scopes,
-                              final DataMigration.ProgressObserver observer) {
+    public void migrate( final MigrationDataProvider<ApplicationScope> migrationDataProvider,
+                         final ProgressObserver observer ) {
         final AtomicLong counter = new AtomicLong();
 
-        return scopes.flatMap(new Func1<ApplicationScope, Observable<?>>() {
-            @Override
-            public Observable call(final ApplicationScope applicationScope) {
-                final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
-                final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
-                logger.info("Migrating edges scope {}", applicationScope);
-
-                //get each edge from this node as a source
-                return edgesFromSource
-
-                    //for each edge, re-index it in v2  every 1000 edges or less
-                    .buffer(1000)
-                    .doOnNext(new Action1<List<Edge>>() {
-                        @Override
-                        public void call(List<Edge> edges) {
-                            final MutationBatch batch =
-                                keyspace.prepareMutationBatch();
-
-                            for (Edge edge : edges) {
-                                logger.info("Migrating meta for edge {}", edge);
-                                final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
-                                    .writeEdge(applicationScope,
-                                        edge);
-                                batch.mergeShallow(edgeBatch);
-                            }
-
-                            try {
-                                batch.execute();
-                            } catch (ConnectionException e) {
-                                throw new RuntimeException(
-                                    "Unable to perform migration", e);
-                            }
-
-                            //update the observer so the admin can see it
-                            final long newCount =
-                                counter.addAndGet(edges.size());
-
-                            observer.update(getVersion(), String.format(
-                                "Currently running.  Rewritten %d edge types",
-                                newCount));
-                        }
-                    });
-            }
-        });
+               migrationDataProvider.getData().flatMap(new Func1<ApplicationScope, Observable<?>>() {
+                  @Override
+                  public Observable call(final ApplicationScope applicationScope) {
+                      final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+                      final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
+                      logger.info("Migrating edges scope {}", applicationScope);
+
+                      //get each edge from this node as a source
+                      return edgesFromSource
+
+                          //for each edge, re-index it in v2  every 1000 edges or less
+                          .buffer( 1000 )
+                          //do the writes of 1k in parallel
+                          .parallel( new Func1<Observable<List<Edge>>, Observable>() {
+                                  @Override
+                                  public Observable call( final Observable<List<Edge>> listObservable ) {
+                                      return listObservable.doOnNext( new Action1<List<Edge>>() {
+                                                @Override
+                                                public void call( List<Edge> edges ) {
+                                                    final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                                    for ( Edge edge : edges ) {
+                                                        logger.info( "Migrating meta for edge {}", edge );
+                                                        final MutationBatch edgeBatch =
+                                                                edgeMigrationStrategy.getMigration().to()
+                                                                                     .writeEdge( applicationScope, edge );
+                                                        batch.mergeShallow( edgeBatch );
+                                                    }
+
+                                                    try {
+                                                        batch.execute();
+                                                    }
+                                                    catch ( ConnectionException e ) {
+                                                        throw new RuntimeException( "Unable to perform migration", e );
+                                                    }
+
+                                                    //update the observer so the admin can see it
+                                                    final long newCount = counter.addAndGet( edges.size() );
+
+                                                    observer.update( getVersion(),
+                                                            String.format( "Currently running.  Rewritten %d edge types",
+                                                                    newCount ) );
+                                                }
+                                            } );
+                                  }
+                              } );
+                  }
+              });
 
     }
 
 
-
     @Override
     public int getVersion() {
         return edgeMigrationStrategy.getVersion();
     }
 
-    @Override
-    public MigrationType getType() {
-        return MigrationType.Applications;
-    }
 }