You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/02/27 02:57:15 UTC
[2/3] incubator-usergrid git commit: WIP,
still needs refactored and cleaned up.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
new file mode 100644
index 0000000..36c721a
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/ProgressObserver.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+public interface ProgressObserver{
+ /**
+ * Mark the migration as failed
+ * @param migrationVersion The migration version running during the failure
+ * @param reason The reason to save
+ */
+ public void failed(final int migrationVersion, final String reason);
+
+ /**
+ * Mark the migration as failed with a stack trace
+ * @param migrationVersion The migration version running during the failure
+ * @param reason The error description to save
+ * @param throwable The error that happened
+ */
+ public void failed(final int migrationVersion, final String reason, final Throwable throwable);
+
+
+ /**
+ * Update the status of the migration with the message
+ *
+ * @param message The message to save for the status
+ */
+ public void update(final int migrationVersion, final String message);
+ }
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
new file mode 100644
index 0000000..14d488b
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedData.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+/**
+ * Marker interface for implementations that may migrate their internal format
+ */
+public interface VersionedData {
+
+ /**
+ * Get the version of the implementation
+ * @return
+ */
+ public int getImplementationVersion();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
new file mode 100644
index 0000000..a5123cf
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/newimpls/VersionedSet.java
@@ -0,0 +1,113 @@
+/*
+ *
+ * *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ * *
+ *
+ */
+
+package org.apache.usergrid.persistence.core.migration.data.newimpls;
+
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import com.google.inject.Inject;
+
+
+public class VersionedSet<T extends VersionedData> {
+
+
+ /**
+ * Cache so that after our initial lookup, it O(1) since this will be used heavily
+ *
+ */
+ private Map<Integer, MigrationRelationship<T>> cacheVersion = new HashMap<>();
+
+ private List<T> orderedVersions = new ArrayList<>();
+
+
+
+
+ public VersionedSet(final Collection<T> versions){
+
+ orderedVersions.addAll(versions );
+ Collections.sort( orderedVersions, new VersionedDataComparator() );
+
+ }
+
+
+ /**
+ * Get the migration relationship based on our current version
+ * @param currentVersion
+ * @return
+ */
+ public MigrationRelationship<T> getCurrentReadVersion(final int currentVersion){
+
+ final MigrationRelationship<T> relationship = cacheVersion.get( currentVersion );
+
+ if(relationship != null){
+ return relationship;
+ }
+
+ //not there, find it. Not the most efficient, but it happens once per version, which rarely changes, so not a big deal
+
+
+ for(T current: orderedVersions){
+ //not our instance
+ if(current.getImplementationVersion() > currentVersion){
+ continue;
+ }
+
+
+ //we always go from our first match to our highest version. Any versions between can be skipped
+ final MigrationRelationship<T> migrationRelationship = new MigrationRelationship<>( current, orderedVersions.get( 0 ) );
+
+ cacheVersion.put( currentVersion, migrationRelationship );
+
+ return migrationRelationship;
+
+ }
+
+ //if we get here, something is wrong
+ throw new IllegalArgumentException( "Could not find a migration version for version " + currentVersion + " min found was " + orderedVersions.get( orderedVersions.size()-1 ) );
+
+
+ }
+
+
+ /**
+ * Orders from high to low
+ */
+ private static final class VersionedDataComparator implements Comparator<VersionedData>
+ {
+
+ @Override
+ public int compare( final VersionedData o1, final VersionedData o2 ) {
+ return Integer.compare( o1.getImplementationVersion(), o2.getImplementationVersion())*-1;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
deleted file mode 100644
index e1714d0..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationStrategy.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.migration.schema;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Interface to encapsulate directional migrations
- */
-public interface MigrationStrategy<T> {
- /**
- * Returns the migration pattern to use
- * @return
- */
- public MigrationRelationship<T> getMigration();
-
- public int getVersion();
-
- public class MigrationRelationship<T> {
- private final T from;
- private final T to;
-
- public MigrationRelationship(T from,T to){
- this.from = from;
- this.to = to;
- }
- public T from(){
- return from;
- }
-
- public T to(){
- return to;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
index e029f92..e7a7aca 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/AllEntitiesInSystemObservable.java
@@ -21,7 +21,7 @@ package org.apache.usergrid.persistence.core.rx;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
+
import rx.Observable;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
deleted file mode 100644
index d0d81c7..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/ApplicationEntityGroup.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.scope;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import java.util.List;
-
-/**
- * Get the entity data. Immutable bean for fast access
- */
-public final class ApplicationEntityGroup<T extends ApplicationScope> {
- public final T applicationScope;
- public final List<EntityIdScope<T>> entityIds;
-
- public ApplicationEntityGroup(final T applicationScope, final List<EntityIdScope<T>> entityIds) {
- this.applicationScope = applicationScope;
- this.entityIds = entityIds;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
deleted file mode 100644
index 73b47fd..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/scope/EntityIdScope.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.core.scope;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-/**
- * Tuple containing collectionscope and entityid
- */
-public class EntityIdScope<T extends ApplicationScope>{
- private final Id id;
- private final T collectionScope;
-
- public EntityIdScope(Id id, T collectionScope){
- this.id = id;
- this.collectionScope = collectionScope;
- }
-
-
- public Id getId() {
- return id;
- }
-
- public T getCollectionScope() {
- return collectionScope;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
index d3b9b49..58cf5a6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/MaxMigrationVersion.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.core.guice;
import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import rx.Observable;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
index 98b057b..3aeb629 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/guice/TestModule.java
@@ -24,9 +24,8 @@ import java.io.IOException;
import com.google.inject.AbstractModule;
import com.netflix.config.ConfigurationManager;
-import org.apache.usergrid.persistence.core.rx.AllEntitiesInSystemObservable;
+
import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
-import rx.Observable;
public abstract class TestModule extends AbstractModule {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
index e067f60..20972f6 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/migration/data/DataMigrationManagerImplTest.java
@@ -33,7 +33,6 @@ import org.apache.usergrid.persistence.core.scope.EntityIdScope;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.junit.Test;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -42,7 +41,6 @@ import rx.Observable;
import rx.Subscriber;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.*;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d3f8ee61/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
index 7d4fb2e..785e341 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeDataMigrationImpl.java
@@ -23,9 +23,8 @@ import com.google.inject.Inject;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import org.apache.usergrid.persistence.core.migration.data.ApplicationDataMigration;
-import org.apache.usergrid.persistence.core.migration.data.DataMigration;
-import org.apache.usergrid.persistence.core.scope.ApplicationEntityGroup;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.DataMigration2;
+import org.apache.usergrid.persistence.core.migration.data.newimpls.MigrationDataProvider;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
@@ -45,7 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
* Encapsulates data mi
*/
-public class EdgeDataMigrationImpl implements ApplicationDataMigration {
+public class EdgeDataMigrationImpl implements DataMigration2<ApplicationScope> {
private static final Logger logger = LoggerFactory.getLogger(EdgeDataMigrationImpl.class);
@@ -68,67 +67,69 @@ public class EdgeDataMigrationImpl implements ApplicationDataMigration {
}
+
+
+
@Override
- public Observable migrate(final Observable<ApplicationScope> scopes,
- final DataMigration.ProgressObserver observer) {
+ public void migrate( final MigrationDataProvider<ApplicationScope> migrationDataProvider,
+ final ProgressObserver observer ) {
final AtomicLong counter = new AtomicLong();
- return scopes.flatMap(new Func1<ApplicationScope, Observable<?>>() {
- @Override
- public Observable call(final ApplicationScope applicationScope) {
- final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
- final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
- logger.info("Migrating edges scope {}", applicationScope);
-
- //get each edge from this node as a source
- return edgesFromSource
-
- //for each edge, re-index it in v2 every 1000 edges or less
- .buffer(1000)
- .doOnNext(new Action1<List<Edge>>() {
- @Override
- public void call(List<Edge> edges) {
- final MutationBatch batch =
- keyspace.prepareMutationBatch();
-
- for (Edge edge : edges) {
- logger.info("Migrating meta for edge {}", edge);
- final MutationBatch edgeBatch = edgeMigrationStrategy.getMigration().to()
- .writeEdge(applicationScope,
- edge);
- batch.mergeShallow(edgeBatch);
- }
-
- try {
- batch.execute();
- } catch (ConnectionException e) {
- throw new RuntimeException(
- "Unable to perform migration", e);
- }
-
- //update the observer so the admin can see it
- final long newCount =
- counter.addAndGet(edges.size());
-
- observer.update(getVersion(), String.format(
- "Currently running. Rewritten %d edge types",
- newCount));
- }
- });
- }
- });
+ migrationDataProvider.getData().flatMap(new Func1<ApplicationScope, Observable<?>>() {
+ @Override
+ public Observable call(final ApplicationScope applicationScope) {
+ final GraphManager gm = graphManagerFactory.createEdgeManager(applicationScope);
+ final Observable<Edge> edgesFromSource = edgesFromSourceObservable.edgesFromSource(gm, applicationScope.getApplication());
+ logger.info("Migrating edges scope {}", applicationScope);
+
+ //get each edge from this node as a source
+ return edgesFromSource
+
+ //for each edge, re-index it in v2 every 1000 edges or less
+ .buffer( 1000 )
+ //do the writes of 1k in parallel
+ .parallel( new Func1<Observable<List<Edge>>, Observable>() {
+ @Override
+ public Observable call( final Observable<List<Edge>> listObservable ) {
+ return listObservable.doOnNext( new Action1<List<Edge>>() {
+ @Override
+ public void call( List<Edge> edges ) {
+ final MutationBatch batch = keyspace.prepareMutationBatch();
+
+ for ( Edge edge : edges ) {
+ logger.info( "Migrating meta for edge {}", edge );
+ final MutationBatch edgeBatch =
+ edgeMigrationStrategy.getMigration().to()
+ .writeEdge( applicationScope, edge );
+ batch.mergeShallow( edgeBatch );
+ }
+
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to perform migration", e );
+ }
+
+ //update the observer so the admin can see it
+ final long newCount = counter.addAndGet( edges.size() );
+
+ observer.update( getVersion(),
+ String.format( "Currently running. Rewritten %d edge types",
+ newCount ) );
+ }
+ } );
+ }
+ } );
+ }
+ });
}
-
@Override
public int getVersion() {
return edgeMigrationStrategy.getVersion();
}
- @Override
- public MigrationType getType() {
- return MigrationType.Applications;
- }
}