You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/26 22:21:11 UTC
[27/35] git commit: Updated Cassandra to Resolve issue
Updated Cassandra to Resolve issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/16721c87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/16721c87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/16721c87
Branch: refs/heads/two-dot-o
Commit: 16721c87921100bbfafa3697801a6579c6a1cb6e
Parents: 029c065
Author: Todd Nine <tn...@apigee.com>
Authored: Wed Mar 26 13:02:30 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed Mar 26 13:02:30 2014 -0700
----------------------------------------------------------------------
stack/corepersistence/collection/pom.xml | 7 +
.../usergrid/persistence/graph/GraphFig.java | 31 ++-
.../persistence/graph/guice/EdgeWrite.java | 37 ----
.../persistence/graph/guice/GraphModule.java | 6 +-
.../graph/hystrix/HystrixGraphObservable.java | 53 +++--
.../graph/impl/EdgeWriteListener.java | 125 -----------
.../graph/impl/GraphManagerImpl.java | 30 +--
.../graph/impl/NodeDeleteListener.java | 193 ++++++++++-------
.../graph/impl/stage/AbstractEdgeRepair.java | 3 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 27 +--
.../graph/impl/stage/EdgeWriteRepair.java | 44 ----
.../graph/impl/stage/EdgeWriteRepairImpl.java | 76 -------
.../impl/EdgeMetadataSerializationImpl.java | 2 +-
.../impl/EdgeSerializationImpl.java | 22 +-
.../impl/parse/ColumnNameIterator.java | 19 +-
.../impl/parse/ObservableIterator.java | 18 +-
.../graph/impl/NodeDeleteListenerTest.java | 26 ++-
.../graph/impl/stage/EdgeWriteRepairTest.java | 208 -------------------
.../graph/serialization/TestCount.java | 36 ++--
.../graph/src/test/resources/log4j.properties | 36 ++++
stack/corepersistence/pom.xml | 1 +
21 files changed, 316 insertions(+), 684 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 808347e..038fb67 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -101,6 +101,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra-all</artifactId>
+ <version>${cassandra.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.safehaus.guicyfig</groupId>
<artifactId>guicyfig</artifactId>
<version>${guicyfig.version}</version>
@@ -176,6 +182,7 @@
</dependency>
+
<!-- Re-add once this is done
https://github.com/Netflix/Hystrix/pull/209-->
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index de7f72f..80e87f6 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -19,7 +19,6 @@ public interface GraphFig extends GuicyFig {
public static final String REPAIR_TIMEOUT = "usergrid.graph.repair.timeout";
-
public static final String TIMEOUT_SIZE = "usergrid.graph.timeout.page.size";
public static final String TIMEOUT_TASK_TIME = "usergrid.graph.timeout.task.time";
@@ -32,11 +31,11 @@ public interface GraphFig extends GuicyFig {
public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
- @Default("1000")
+ @Default( "1000" )
@Key(SCAN_PAGE_SIZE)
int getScanPageSize();
- @Default("CL_ONE")
+ @Default("CL_QUORUM")
@Key(READ_CL)
String getReadCL();
@@ -44,31 +43,31 @@ public interface GraphFig extends GuicyFig {
@Key(WRITE_CL)
String getWriteCL();
-// @Default("10000")
-// @Key(WRITE_TIMEOUT)
-// int getWriteTimeout();
+ @Default( "10000" )
+ @Key( WRITE_TIMEOUT )
+ int getWriteTimeout();
/**
* Get the read timeout (in milliseconds) that we should allow when reading from the data source
*/
- @Default( "10000" )
- @Key( READ_TIMEOUT )
+ @Default("10000")
+ @Key(READ_TIMEOUT)
int getReadTimeout();
- @Default( "100" )
- @Key( TIMEOUT_SIZE )
+ @Default("100")
+ @Key(TIMEOUT_SIZE)
int getTimeoutReadSize();
- @Default( "500" )
- @Key( TIMEOUT_TASK_TIME )
+ @Default("500")
+ @Key(TIMEOUT_TASK_TIME)
long getTaskLoopTime();
- @Default( "10" )
- @Key( REPAIR_CONCURRENT_SIZE )
+ @Default("5")
+ @Key(REPAIR_CONCURRENT_SIZE)
int getRepairConcurrentSize();
@Default("10000")
- @Key(WRITE_TIMEOUT)
- int getRepairTimeout();
+ @Key(REPAIR_TIMEOUT)
+ int getRepairTimeout();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
deleted file mode 100644
index b8afb13..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/EdgeWrite.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-import static java.lang.annotation.ElementType.FIELD;
-import static java.lang.annotation.ElementType.METHOD;
-import static java.lang.annotation.ElementType.PARAMETER;
-import static java.lang.annotation.RetentionPolicy.RUNTIME;
-
-
-@BindingAnnotation
-@Target( { FIELD, PARAMETER, METHOD } )
-@Retention( RUNTIME )
-public @interface EdgeWrite {}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index d575471..072cefc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -24,9 +24,9 @@ import org.safehaus.guicyfig.GuicyFigModule;
import org.apache.usergrid.persistence.collection.guice.CollectionModule;
import org.apache.usergrid.persistence.collection.migration.Migration;
import org.apache.usergrid.persistence.collection.mvcc.event.PostProcessObserver;
+import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessorImpl;
import org.apache.usergrid.persistence.graph.consistency.LocalTimeoutQueue;
@@ -37,8 +37,6 @@ import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepairImpl;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepairImpl;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepair;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeWriteRepairImpl;
import org.apache.usergrid.persistence.graph.serialization.CassandraConfig;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -97,13 +95,11 @@ public class GraphModule extends AbstractModule {
bind(TimeoutQueue.class).to( LocalTimeoutQueue.class );
bind(AsyncProcessor.class).annotatedWith( EdgeDelete.class ).to( AsyncProcessorImpl.class );
- bind(AsyncProcessor.class).annotatedWith( EdgeWrite.class ).to( AsyncProcessorImpl.class );
bind(AsyncProcessor.class).annotatedWith( NodeDelete.class ).to( AsyncProcessorImpl.class );
//Repair/cleanup classes
bind( EdgeMetaRepair.class).to( EdgeMetaRepairImpl.class );
- bind( EdgeWriteRepair.class).to( EdgeWriteRepairImpl.class );
bind( EdgeDeleteRepair.class).to( EdgeDeleteRepairImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
index 8a44389..209dfbd 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
@@ -20,33 +20,52 @@
package org.apache.usergrid.persistence.graph.hystrix;
-import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Observable;
-import rx.schedulers.Schedulers;
/**
- *
- *
+ * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit breakers.
*/
public class HystrixGraphObservable {
/**
- * Wrap the observable in the timeout
- * @param observable
- * @param <T>
- * @return
+ * Command group used for realtime user commands
+ */
+ private static final HystrixCommandGroupKey USER_GROUP = HystrixCommandGroupKey.Factory.asKey( "Graph-User" );
+
+ /**
+ * Command group for asynchronous operations
+ */
+ private static final HystrixCommandGroupKey ASYNC_GROUP = HystrixCommandGroupKey.Factory.asKey( "Graph-Async" );
+
+
+ /**
+ * Wrap the observable in the timeout for user facing operation. This is for user reads and deletes.
*/
- public static <T> Observable<T> wrap(final Observable<T> observable){
- return new HystrixObservableCommand<T>( HystrixCommandGroupKey.Factory.asKey( "Graph" ) ){
-
- @Override
- protected Observable<T> run() {
- return observable;
- }
- }.toObservable( Schedulers.io() );
- }
+ public static <T> Observable<T> user( final Observable<T> observable ) {
+ return new HystrixObservableCommand<T>( USER_GROUP ) {
+
+ @Override
+ protected Observable<T> run() {
+ return observable;
+ }
+ }.observe();
+ }
+
+
+ /**
+ * Wrap the observable in the timeout for asynchronous operations. This is for compaction and cleanup processing.
+ */
+ public static <T> Observable<T> async( final Observable<T> observable ) {
+ return new HystrixObservableCommand<T>( ASYNC_GROUP ) {
+
+ @Override
+ protected Observable<T> run() {
+ return observable;
+ }
+ }.observe();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
deleted file mode 100644
index 90146dc..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeWriteListener.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl;
-
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
-import org.apache.usergrid.persistence.graph.consistency.MessageListener;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.parse.ObservableIterator;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import rx.Observable;
-import rx.functions.Func1;
-
-
-/**
- * Construct the asynchronous edge lister for the repair operation.
- */
-@Singleton
-public class EdgeWriteListener implements MessageListener<EdgeEvent<Edge>, EdgeEvent<Edge>> {
-
- private final EdgeSerialization edgeSerialization;
- private final GraphFig graphFig;
- private final Keyspace keyspace;
-
-
- public EdgeWriteListener( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
- final Keyspace keyspace, @EdgeWrite final AsyncProcessor edgeWrite ) {
- this.edgeSerialization = edgeSerialization;
- this.graphFig = graphFig;
- this.keyspace = keyspace;
- edgeWrite.addListener( this );
- }
-
-
- @Override
- public Observable<EdgeEvent<Edge>> receive( final EdgeEvent<Edge> write ) {
-
- final Edge edge = write.getData();
- final OrganizationScope scope = write.getOrganizationScope();
- final UUID maxVersion = edge.getVersion();
-
- return Observable.empty();
-
-// TODO T.N, some async processing for balancing here
-// return Observable.create( new ObservableIterator<MarkedEdge>() {
-// @Override
-// protected Iterator<MarkedEdge> getIterator() {
-//
-// final SimpleSearchByEdge search =
-// new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(), maxVersion,
-// null );
-//
-// return edgeSerialization.getEdgeVersions( scope, search );
-// }
-// } ).filter( new Func1<MarkedEdge, Boolean>() {
-//
-// //TODO, reuse this for delete operation
-//
-//
-// /**
-// * We only want to return edges < this version so we remove them
-// * @param markedEdge
-// * @return
-// */
-// @Override
-// public Boolean call( final MarkedEdge markedEdge ) {
-// return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
-// }
-// //buffer the deletes and issue them in a single mutation
-// } ).buffer( graphFig.getScanPageSize() ).map( new Func1<List<MarkedEdge>, EdgeEvent<Edge>>() {
-// @Override
-// public EdgeEvent<Edge> call( final List<MarkedEdge> markedEdges ) {
-//
-// final MutationBatch batch = keyspace.prepareMutationBatch();
-//
-// for ( MarkedEdge edge : markedEdges ) {
-// final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
-//
-// batch.mergeShallow( delete );
-// }
-//
-// try {
-// batch.execute();
-// }
-// catch ( ConnectionException e ) {
-// throw new RuntimeException( "Unable to issue write to cassandra", e );
-// }
-//
-// return write;
-// }
-// } );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 94d4dc3..ff63a60 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -39,7 +39,6 @@ import org.apache.usergrid.persistence.graph.SearchIdType;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.AsynchronousMessage;
import org.apache.usergrid.persistence.graph.guice.EdgeDelete;
-import org.apache.usergrid.persistence.graph.guice.EdgeWrite;
import org.apache.usergrid.persistence.graph.guice.NodeDelete;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -56,7 +55,6 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -76,7 +74,6 @@ public class GraphManagerImpl implements GraphManager {
private final NodeSerialization nodeSerialization;
- private final AsyncProcessor<Edge> edgeWriteAsyncProcessor;
private final AsyncProcessor<Edge> edgeDeleteAsyncProcessor;
private final AsyncProcessor<Id> nodeDeleteAsyncProcessor;
@@ -86,7 +83,7 @@ public class GraphManagerImpl implements GraphManager {
@Inject
public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
- final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
+ final GraphFig graphFig,
@EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
@Assisted final OrganizationScope scope ) {
@@ -100,9 +97,6 @@ public class GraphManagerImpl implements GraphManager {
this.graphFig = graphFig;
- this.edgeWriteAsyncProcessor = edgeWrite;
-
-
this.edgeDeleteAsyncProcessor = edgeDelete;
@@ -123,8 +117,6 @@ public class GraphManagerImpl implements GraphManager {
mutation.mergeShallow( edgeMutation );
- final AsynchronousMessage<Edge> event = edgeWriteAsyncProcessor.setVerification( edge, getTimeout() );
-
try {
mutation.execute();
}
@@ -132,8 +124,6 @@ public class GraphManagerImpl implements GraphManager {
throw new RuntimeException( "Unable to connect to cassandra", e );
}
- edgeWriteAsyncProcessor.start( event );
-
return edge;
}
} );
@@ -199,7 +189,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgeVersions( final SearchByEdge searchByEdge ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgeVersions" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgeVersions( scope, searchByEdge );
@@ -211,7 +201,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
@@ -223,7 +213,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTarget" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
@@ -235,7 +225,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSourceByTargetType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -248,7 +238,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>() {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTargetBySourceType" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -261,7 +251,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>() {
+ return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -272,7 +262,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>() {
+ return Observable.create( new ObservableIterator<String>( "getIdTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -284,7 +274,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>() {
+ return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -295,7 +285,7 @@ public class GraphManagerImpl implements GraphManager {
@Override
public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>() {
+ return Observable.create( new ObservableIterator<String>( "getIdTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index d0c0bec..aa4f94e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -1,17 +1,16 @@
package org.apache.usergrid.persistence.graph.impl;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.thrift.Mutation;
-
import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -19,7 +18,6 @@ import org.apache.usergrid.persistence.graph.SearchEdgeType;
import org.apache.usergrid.persistence.graph.consistency.AsyncProcessor;
import org.apache.usergrid.persistence.graph.consistency.MessageListener;
import org.apache.usergrid.persistence.graph.guice.NodeDelete;
-import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteRepair;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeMetaRepair;
import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -35,7 +33,6 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.functions.Action0;
-import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -51,7 +48,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
private final NodeSerialization nodeSerialization;
private final EdgeSerialization edgeSerialization;
private final EdgeMetadataSerialization edgeMetadataSerialization;
- private final EdgeDeleteRepair edgeDeleteRepair;
private final EdgeMetaRepair edgeMetaRepair;
private final GraphFig graphFig;
protected final Keyspace keyspace;
@@ -63,8 +59,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
@Inject
public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
- final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
+ final EdgeMetadataSerialization edgeMetadataSerialization, final EdgeMetaRepair edgeMetaRepair,
final GraphFig graphFig, @NodeDelete final AsyncProcessor nodeDelete,
final Keyspace keyspace ) {
@@ -72,7 +67,6 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
this.nodeSerialization = nodeSerialization;
this.edgeSerialization = edgeSerialization;
this.edgeMetadataSerialization = edgeMetadataSerialization;
- this.edgeDeleteRepair = edgeDeleteRepair;
this.edgeMetaRepair = edgeMetaRepair;
this.graphFig = graphFig;
this.keyspace = keyspace;
@@ -121,7 +115,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
//get all edges pointing to the target node and buffer then into groups for deletion
Observable<MarkedEdge> targetEdges =
- getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) )
+ getEdgesTypesToTarget( scope, new SimpleSearchEdgeType( node, null ) ).subscribeOn(
+ Schedulers.io() )
.flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
public Observable<MarkedEdge> call( final String edgeType ) {
@@ -133,7 +128,8 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
//get all edges pointing to the source node and buffer them into groups for deletion
Observable<MarkedEdge> sourceEdges =
- getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) )
+ getEdgesTypesFromSource( scope, new SimpleSearchEdgeType( node, null ) ).subscribeOn(
+ Schedulers.io() )
.flatMap( new Func1<String, Observable<MarkedEdge>>() {
@Override
public Observable<MarkedEdge> call( final String edgeType ) {
@@ -142,78 +138,87 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
}
} );
-
- //each time an edge is emitted, delete it via batch mutation since we'll already be buffered
+ //merge both source and target into 1 observable. We'll need to check them all regardless of order
return Observable.merge( targetEdges, sourceEdges );
}
} )
- //buffer and delete marked edges in our buffer size
- .buffer( graphFig.getScanPageSize() ).flatMap(
- new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
- @Override
- public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
+ //buffer and delete marked edges in our buffer size so we're making less trips to cassandra
+ .buffer( graphFig.getScanPageSize() ).flatMap( new Func1<List<MarkedEdge>, Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
- LOG.debug( "Batching {} edges for deletion" , markedEdges.size());
+ LOG.debug( "Batching {} edges for node {} for deletion", markedEdges.size(), node );
- final MutationBatch batch = keyspace.prepareMutationBatch();
+ final MutationBatch batch = keyspace.prepareMutationBatch();
- for(MarkedEdge edge: markedEdges){
+ Set<TargetPair> sourceNodes = new HashSet<TargetPair>( markedEdges.size() );
+ Set<TargetPair> targetNodes = new HashSet<TargetPair>( markedEdges.size() );
- //delete the newest edge <= the version on the node delete
- LOG.debug( "Deleting edge {}", edge );
- final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
+ for ( MarkedEdge edge : markedEdges ) {
- batch.mergeShallow( delete );
- }
+ //delete the newest edge <= the version on the node delete
+ final MutationBatch delete = edgeSerialization.deleteEdge( scope, edge );
- try {
- batch.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to delete edges", e );
- }
+ batch.mergeShallow( delete );
- return Observable.from(markedEdges);
- }
- } )
- //TODO Fix this
-// .flatMap( new Func1<MarkedEdge, Observable<MarkedEdge>>() {
-// @Override
-// public Observable<MarkedEdge> call( final MarkedEdge edge ) {
-//
-//
-// return Observable.just( edge );
+ sourceNodes.add( new TargetPair( edge.getSourceNode(), edge.getType() ) );
+ targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType() ) );
+ }
+ try {
+ batch.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to delete edges", e );
+ }
+ //now delete meta data
-// //delete both the source and target meta data in parallel for the edge we deleted in the previous step
-// //if nothing else is using them
-// Observable<Integer> sourceMetaRepaired =
-// edgeMetaRepair.repairSources( scope, edge.getSourceNode(), edge.getType(), version );
-//
-// Observable<Integer> targetMetaRepaired =
-// edgeMetaRepair.repairTargets( scope, edge.getTargetNode(), edge.getType(), version );
-//
-// //sum up the number of subtypes we retain
-// return Observable.concat( sourceMetaRepaired, targetMetaRepaired ).last()
-// .map( new Func1<Integer, MarkedEdge>() {
-// @Override
-// public MarkedEdge call( final Integer integer ) {
-//
-// LOG.debug( "Retained {} subtypes for edge {}", integer, edge );
+ //delete both the source and target meta data in parallel for the edge we deleted in the
+ // previous step
+ //if nothing else is using them. We purposefully do not schedule them on a new scheduler
+ //we want them running on the i/o thread from the Observable emitting all the edges
+
//
-// return edge;
-// }
-// } );
+ LOG.debug( "About to audit {} source types", sourceNodes.size() );
+
+ Observable<Integer> sourceMetaCleanup = Observable.from( sourceNodes ).flatMap(
+ new Func1<TargetPair, Observable<Integer>>() {
+ @Override
+ public Observable<Integer> call( final TargetPair targetPair ) {
+ return edgeMetaRepair
+ .repairSources( scope, targetPair.id, targetPair.edgeType, version );
+ }
+ } ).last();
+
+
+ LOG.debug( "About to audit {} target types", targetNodes.size() );
+
+ Observable<Integer> targetMetaCleanup = Observable.from( targetNodes ).flatMap( new Func1<TargetPair, Observable<Integer>>() {
+ @Override
+ public Observable<Integer> call( final TargetPair targetPair ) {
+ return edgeMetaRepair
+ .repairTargets( scope, targetPair.id, targetPair.edgeType, version );
+ }
+ } ).last();
-// }
-// })
+ //run both the source/target edge type cleanup, then proceed
+ return Observable.merge( sourceMetaCleanup, targetMetaCleanup ).last().flatMap( new Func1<Integer,
+ Observable<MarkedEdge>>() {
+ @Override
+ public Observable<MarkedEdge> call( final Integer integer ) {
+ return Observable.from( markedEdges );
+ }
+ } );
+ }
+ } )
- .count()
- //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove the
- // target node in the mark
+ .count()
+ //if nothing is ever emitted, emit 0 so that we know no operations took place. Finally remove
+ // the
+ // target node in the mark
.defaultIfEmpty( 0 ).doOnCompleted( new Action0() {
@Override
public void call() {
@@ -233,12 +238,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
*/
private Observable<String> getEdgesTypesToTarget( final OrganizationScope scope, final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>( "getEdgeTypesToTarget" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@@ -247,12 +252,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
*/
private Observable<String> getEdgesTypesFromSource( final OrganizationScope scope, final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>( "getEdgeTypesFromSource" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@@ -261,12 +266,12 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
*/
private Observable<MarkedEdge> loadEdgesToTarget( final OrganizationScope scope, final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesToTarget" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@@ -275,11 +280,53 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
*/
private Observable<MarkedEdge> loadEdgesFromSource( final OrganizationScope scope, final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "getEdgesFromSource" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
+ }
+
+
+ private static class TargetPair {
+ protected final Id id;
+ protected final String edgeType;
+
+
+ private TargetPair( final Id id, final String edgeType ) {
+ this.id = id;
+ this.edgeType = edgeType;
+ }
+
+
+ @Override
+ public boolean equals( final Object o ) {
+ if ( this == o ) {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() ) {
+ return false;
+ }
+
+ final TargetPair that = ( TargetPair ) o;
+
+ if ( !edgeType.equals( that.edgeType ) ) {
+ return false;
+ }
+ if ( !id.equals( that.id ) ) {
+ return false;
+ }
+
+ return true;
+ }
+
+
+ @Override
+ public int hashCode() {
+ int result = id.hashCode();
+ result = 31 * result + edgeType.hashCode();
+ return result;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index 310fa1f..898a04d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -42,7 +42,6 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
@@ -122,7 +121,7 @@ public abstract class AbstractEdgeRepair {
*/
private Observable<MarkedEdge> getEdgeVersions( final OrganizationScope scope, final Edge edge ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "edgeVersions" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 8b81d0a..022548c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -47,7 +47,6 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.MathObservable;
@@ -123,7 +122,8 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
LOG.debug( "Checking for edges with nodeId {}, type {}, and subtype {}", node, edgeType, subType );
Observable<Integer> search =
- serialization.loadEdges( scope, node, edgeType, subType, version ).take( 1 ).count()
+ //load each edge in it's own thread
+ serialization.loadEdges( scope, node, edgeType, subType, version ).subscribeOn( Schedulers.io() ).take( 1 ).count()
.doOnNext( new Action1<Integer>() {
@Override
@@ -145,7 +145,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
- LOG.debug( "No edges with nodeId {}, type {}, and subtype {}. Removing", node, edgeType, subType );
+ LOG.debug( "No edges with nodeId {}, type {}, and subtype {}. Removing subtype.", node, edgeType, subType );
batch.mergeShallow( serialization
.removeEdgeSubType( scope, node, edgeType, subType,
version ) );
@@ -166,7 +166,10 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
public void call( final Integer count ) {
- LOG.debug( "Executing batch for subtype deletion with type {}. Mutation has {} rows to mutate ", edgeType, batch.getRowCount() );
+ LOG.debug(
+ "Executing batch for subtype deletion with type {}. " +
+ "Mutation has {} rows to mutate ",
+ edgeType, batch.getRowCount() );
try {
batch.execute();
@@ -250,26 +253,26 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
final String edgeType, final UUID version ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>( "edgeTargetIdTypes" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization
.getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@Override
public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
final String subType, final UUID version ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "edgeTargetSubTypes" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTargetBySourceType( scope,
new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@@ -295,26 +298,26 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
@Override
public Observable<String> loadEdgeSubTypes( final OrganizationScope scope, final Id nodeId,
final String edgeType, final UUID version ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>( "edgeSourceIdTypes" ) {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization
.getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
@Override
public Observable<MarkedEdge> loadEdges( final OrganizationScope scope, final Id nodeId, final String edgeType,
final String subType, final UUID version ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>( "edgeSourceSubTypes" ) {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSourceByTargetType( scope,
new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
}
- } ).subscribeOn( Schedulers.io() );
+ } );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
deleted file mode 100644
index 92be7a7..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepair.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-
-import rx.Observable;
-
-
-/**
- * Interface to perform repair operations on an edge when it is written
- */
-public interface EdgeWriteRepair {
-
- /**
- * Repair this edge. Remove previous entries
- * @param scope The scope to use
- * @param edge The last edge to retain. All versions < this edge's version will be deleted
- *
- * @return An observable that emits every version of the edge we delete. Note that it may emit duplicates
- * since this is a streaming API.
- */
- public Observable<MarkedEdge> repair( OrganizationScope scope, Edge edge );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
deleted file mode 100644
index f77ef95..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.functions.Func1;
-
-
-/**
- * SimpleRepair operation
- */
-@Singleton
-public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWriteRepair {
-
-
- @Inject
- public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
- final Keyspace keyspace) {
- super( edgeSerialization, graphFig, keyspace );
- }
-
-
- @Override
- public Observable<MarkedEdge> repair( final OrganizationScope scope, final Edge edge ) {
-
- return super.repair( scope, edge );
- }
-
-
- @Override
- protected Func1<MarkedEdge, Boolean> getFilter( final UUID maxVersion ) {
- return new Func1<MarkedEdge, Boolean>() {
- /**
- * We only want to return edges < this version so we remove them
- * @param markedEdge
- * @return
- */
- @Override
- public Boolean call( final MarkedEdge markedEdge ) {
- return UUIDComparator.staticCompare( markedEdge.getVersion(), maxVersion ) < 0;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
index b230399..38893e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationImpl.java
@@ -132,7 +132,7 @@ public class EdgeMetadataSerializationImpl implements EdgeMetadataSerialization,
EdgeUtils.validateEdge( edge );
- MutationBatch batch = keyspace.prepareMutationBatch();
+ MutationBatch batch = keyspace.prepareMutationBatch().withConsistencyLevel( cassandraConfig.getWriteCL() );
final Id source = edge.getSourceNode();
final Id target = edge.getTargetNode();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index 0341cac..790c386 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -299,7 +299,7 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final String type = search.getType();
final UUID maxVersion = search.getMaxVersion();
- return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, search.last() ) {
+ return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, search.last() ) {
@Override
@@ -345,8 +345,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final Id sourceId = edgeType.getNode();
final String type = edgeType.getType();
+ final UUID maxVersion = edgeType.getMaxVersion();
- return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, edgeType.last() ) {
+ return getEdges( GRAPH_SOURCE_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, edgeType.last() ) {
@Override
protected RowKey generateRowKey() {
return new RowKey( sourceId, type );
@@ -377,8 +378,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final Id targetId = edgeType.getNode();
final String type = edgeType.getType();
final String targetType = edgeType.getIdType();
+ final UUID maxVersion = edgeType.getMaxVersion();
- return getEdges( GRAPH_SOURCE_NODE_TARGET_TYPE, new EdgeSearcher<RowKeyType>( scope, edgeType.last() ) {
+ return getEdges( GRAPH_SOURCE_NODE_TARGET_TYPE, new EdgeSearcher<RowKeyType>( scope, maxVersion, edgeType.last() ) {
@Override
protected RowKeyType generateRowKey() {
return new RowKeyType( targetId, type, targetType );
@@ -407,8 +409,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final Id targetId = edgeType.getNode();
final String type = edgeType.getType();
+ final UUID maxVersion = edgeType.getMaxVersion();
- return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, edgeType.last() ) {
+ return getEdges( GRAPH_TARGET_NODE_EDGES, new EdgeSearcher<RowKey>( scope, maxVersion, edgeType.last() ) {
@Override
@@ -441,8 +444,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
final Id targetId = edgeType.getNode();
final String sourceType = edgeType.getIdType();
final String type = edgeType.getType();
+ final UUID maxVersion = edgeType.getMaxVersion();
- return getEdges( GRAPH_TARGET_NODE_SOURCE_TYPE, new EdgeSearcher<RowKeyType>( scope, edgeType.last() ) {
+ return getEdges( GRAPH_TARGET_NODE_SOURCE_TYPE, new EdgeSearcher<RowKeyType>( scope, maxVersion, edgeType.last()) {
@Override
protected RowKeyType generateRowKey() {
return new RowKeyType( targetId, type, sourceType );
@@ -672,11 +676,13 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
private static abstract class EdgeSearcher<R> implements ColumnParser<DirectedEdge, MarkedEdge> {
protected final Optional<Edge> last;
+ protected final UUID maxVersion;
protected final OrganizationScope scope;
- protected EdgeSearcher( final OrganizationScope scope, final Optional<Edge> last ) {
+ protected EdgeSearcher( final OrganizationScope scope, final UUID maxVersion , final Optional<Edge> last) {
this.scope = scope;
+ this.maxVersion = maxVersion;
this.last = last;
}
@@ -692,7 +698,11 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration {
builder.setStart( sourceEdge, EDGE_SERIALIZER );
+ }else{
+
+
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
index b6d17c6..1287dbb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ColumnNameIterator.java
@@ -4,6 +4,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.query.RowQuery;
import com.netflix.hystrix.HystrixCommand;
@@ -87,17 +88,11 @@ public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
private void advanceIterator() {
//run producing the values within a hystrix command. This way we'll time out if the read takes too long
- sourceIterator = new HystrixCommand<Iterator<Column<C>>>( HystrixCommand.Setter.withGroupKey( GROUP_KEY )
- .andCommandPropertiesDefaults(
- HystrixCommandProperties
- .Setter()
- .withExecutionIsolationThreadTimeoutInMilliseconds(
- executionTimeout ) ) ) {
-
- @Override
- protected Iterator<Column<C>> run() throws Exception {
- return rowQuery.execute().getResult().iterator();
- }
- }.execute();
+ try {
+ sourceIterator = rowQuery.execute().getResult().iterator();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to get next page", e );
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index 2d2456b..a222bac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -3,15 +3,11 @@ package org.apache.usergrid.persistence.graph.serialization.impl.parse;
import java.util.Iterator;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
-import rx.Observer;
import rx.Subscriber;
-import rx.Subscription;
-import rx.subscriptions.Subscriptions;
/**
@@ -23,6 +19,14 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
private static final Logger log = LoggerFactory.getLogger( ObservableIterator.class );
+ private final String name;
+
+
+ /**
+ * @param name The simple name of the iterator, used for debugging
+ */
+ protected ObservableIterator( final String name ) {this.name = name;}
+
@Override
public void call( final Subscriber<? super T> subscriber ) {
@@ -37,11 +41,15 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
while ( itr.hasNext() && !subscriber.isUnsubscribed()) {
final T next = itr.next();
- log.debug( "Emitting {}", next );
+ log.trace( "Iterator '{}' emitting item '{}'", name, next );
+
+ assert next != null;
subscriber.onNext( next );
}
+
+
subscriber.onCompleted();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
index 48890ca..14ab9c4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListenerTest.java
@@ -54,8 +54,8 @@ public class NodeDeleteListenerTest {
private static final Logger log = LoggerFactory.getLogger( NodeDeleteListenerTest.class );
- @ClassRule
- public static CassandraRule rule = new CassandraRule();
+// @ClassRule
+// public static CassandraRule rule = new CassandraRule();
@Inject
@@ -317,13 +317,13 @@ public class NodeDeleteListenerTest {
* since it has no other targets
*/
@Test
- public void testMultiDelete() throws ConnectionException {
+ public void testMultiDelete() throws ConnectionException, InterruptedException {
GraphManager em = emf.createEdgeManager( scope );
//create loads of edges to easily delete. We'll keep all the types of "test"
- final int edgeCount = graphFig.getScanPageSize() * 4;
+ final int edgeCount = graphFig.getScanPageSize() * 2;
Id toDelete = createId( "toDelete" );
final String edgeType = "test";
@@ -360,9 +360,10 @@ public class NodeDeleteListenerTest {
log.info( "Saved {} target edges", targetCount );
- //mark the node so
- UUID deleteVersion = UUIDGenerator.newTimeUUID();
+ //mark the node for deletion
+// UUID deleteVersion = UUIDGenerator.newTimeUUID();
+ UUID deleteVersion = UUID.fromString( "ffffffff-ffff-1fff-bfff-ffffffffffff" );
nodeSerialization.mark( scope, toDelete, deleteVersion ).execute();
@@ -371,6 +372,7 @@ public class NodeDeleteListenerTest {
int count = deleteListener.receive( deleteEvent ).toBlockingObservable().last();
+ //TODO T.N. THIS SHOULD WORK!!!! It fails intermittently with RX 0.17.1 with too many scheduler threads (which was wrong), try this again after the next release
assertEquals( edgeCount, count );
//now verify we can't get any of the info back
@@ -378,17 +380,19 @@ public class NodeDeleteListenerTest {
UUID now = UUIDGenerator.newTimeUUID();
- Iterator<MarkedEdge> returned =
+ //validate it's not returned by the
+
+ Iterator<MarkedEdge> returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
+
+ assertFalse( "No target should be returned", returned.hasNext() );
+
+ returned =
edgeSerialization.getEdgesFromSource( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
//no edge from source node should be returned
assertFalse( "No source should be returned", returned.hasNext() );
- //validate it's not returned by the
-
- returned = edgeSerialization.getEdgesToTarget( scope, createSearchByEdge( toDelete, edgeType, now, null ) );
- assertFalse( "No target should be returned", returned.hasNext() );
//no types from source
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
deleted file mode 100644
index c7c95a1..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.impl.stage;
-
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.OrganizationScope;
-import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
-import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-/**
- *
- *
- */
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
-public class EdgeWriteRepairTest {
-
- private static final Logger LOG = LoggerFactory.getLogger( EdgeWriteRepairTest.class );
-
- @ClassRule
- public static CassandraRule rule = new CassandraRule();
-
-
- @Inject
- @Rule
- public MigrationManagerRule migrationManagerRule;
-
-
- @Inject
- protected EdgeSerialization edgeSerialization;
-
- @Inject
- protected EdgeWriteRepair edgeWriteRepair;
-
-
- protected OrganizationScope scope;
-
-
- @Before
- public void setup() {
- scope = mock( OrganizationScope.class );
-
- Id orgId = mock( Id.class );
-
- when( orgId.getType() ).thenReturn( "organization" );
- when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
- when( scope.getOrganization() ).thenReturn( orgId );
- }
-
-
- /**
- * Test repairing with no edges
- */
- @Test
- public void noEdges() {
- Edge edge = createEdge( "source", "test", "target" );
-
- Iterator<MarkedEdge> edges = edgeWriteRepair.repair( scope, edge ).toBlockingObservable().getIterator();
-
- assertFalse( "No edges cleaned", edges.hasNext() );
- }
-
-
- /**
- * Test repairing with no edges TODO: TN. There appears to be a race condition here with ordering. Not sure if
- * this is intentional as part of the impl or if it's an issue
- */
- @Test
- public void versionTest() throws ConnectionException {
- final int size = 20;
-
- final List<Edge> versions = new ArrayList<Edge>( size );
-
- final Id sourceId = createId( "source" );
- final Id targetId = createId( "target" );
- final String edgeType = "edge";
-
- int deleteIndex = size / 2;
-
- Set<Edge> deletedEdges = new HashSet<Edge>();
-
- for ( int i = 0; i < size; i++ ) {
- final Edge edge = createEdge( sourceId, edgeType, targetId );
-
- versions.add( edge );
-
- edgeSerialization.writeEdge( scope, edge ).execute();
-
- LOG.info( "Writing edge at index [{}] {}", i, edge );
-
- if ( i < deleteIndex ) {
- deletedEdges.add( edge );
- }
- }
-
-
- Edge keep = versions.get( deleteIndex );
-
- Iterable<MarkedEdge> edges = edgeWriteRepair.repair( scope, keep ).toBlockingObservable().toIterable();
-
- Multiset<Edge> deletedStream = HashMultiset.create();
-
- for ( MarkedEdge edge : edges ) {
-
- LOG.info( "Returned edge {} for repair", edge );
-
- final boolean shouldBeDeleted = deletedEdges.contains( edge );
-
- assertTrue( "Removed matches saved index", shouldBeDeleted );
-
- deletedStream.add( edge );
- }
-
- deletedEdges.removeAll( deletedStream.elementSet() );
-
- assertEquals( 0, deletedEdges.size() );
-
- //now verify we get all the versions we expect back
- Iterator<MarkedEdge> iterator = edgeSerialization.getEdgeVersions( scope,
- new SimpleSearchByEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID(), null ) );
-
- int count = 0;
-
- for ( MarkedEdge edge : new IterableWrapper<MarkedEdge>( iterator ) ) {
-
- final Edge saved = versions.get( size - count - 1 );
-
- assertEquals( saved, edge );
-
- count++;
- }
-
- final int keptCount = size - deleteIndex;
-
- assertEquals( "Kept edge version was the minimum", keptCount, count );
- }
-
-
- private class IterableWrapper<T> implements Iterable<T> {
- private final Iterator<T> sourceIterator;
-
-
- private IterableWrapper( final Iterator<T> sourceIterator ) {
- this.sourceIterator = sourceIterator;
- }
-
-
- @Override
- public Iterator<T> iterator() {
- return this.sourceIterator;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
index 7af55da..2548864 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/TestCount.java
@@ -6,6 +6,8 @@ import java.util.Iterator;
import java.util.List;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
@@ -21,14 +23,27 @@ import static org.junit.Assert.assertEquals;
*/
public class TestCount {
+ private static final Logger log = LoggerFactory.getLogger(TestCount.class);
@Test
public void mergeTest(){
final int sizePerObservable = 2000;
- Observable<Integer> input1 = getObservables( sizePerObservable );
- Observable<Integer> input2 = getObservables( sizePerObservable );
+ Observable<Integer> input1 = getObservables( sizePerObservable ).flatMap( new Func1<Integer, Observable<?
+ extends Integer>>() {
+ @Override
+ public Observable<? extends Integer> call( final Integer integer ) {
+ return getObservables( 100 );
+ }
+ } );
+ Observable<Integer> input2 = getObservables( sizePerObservable ).flatMap( new Func1<Integer, Observable<?
+ extends Integer>>() {
+ @Override
+ public Observable<? extends Integer> call( final Integer integer ) {
+ return getObservables( 100 );
+ }
+ } );
int returned = Observable.merge(input1, input2).buffer( 1000 ).flatMap(
new Func1<List<Integer>, Observable<Integer>>() {
@@ -50,7 +65,7 @@ public class TestCount {
} ).count().defaultIfEmpty( 0 ).toBlockingObservable().last();
- assertEquals("Count was correct", sizePerObservable*2, returned);
+ assertEquals("Count was correct", sizePerObservable*2*100, returned);
}
@@ -92,19 +107,12 @@ public class TestCount {
}
}
- //Sleep for a very long time before emitting the last value
- if(i == size -1){
- try {
- Thread.sleep(5000);
- }
- catch ( InterruptedException e ) {
- subscriber.onError( e );
- return;
- }
- }
+ final Integer value = values.get( i );
+
+ log.info( "Emitting {}", value );
- subscriber.onNext( values.get( i ) );
+ subscriber.onNext( value );
}
subscriber.onCompleted();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
new file mode 100644
index 0000000..849b280
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# suppress inspection "UnusedProperty" for whole file
+log4j.rootLogger=INFO,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{3}.%M(%L)<%t>- %m%n
+
+log4j.logger.org.safehaus.chop.plugin=DEBUG
+log4j.logger.org.safehaus.guicyfig=ERROR
+log4j.logger.org.safehaus.chop.api.store.amazon=DEBUG
+log4j.logger.org.apache.http=ERROR
+log4j.logger.com.amazonaws.request=ERROR
+log4j.logger.cassandra.db=ERROR
+
+log4j.logger.org.apache.usergrid=DEBUG
+
+log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/16721c87/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 9b7b1e1..9a9b7d3 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -40,6 +40,7 @@
<surefire.version>2.16</surefire.version>
<rx.version>0.17.1</rx.version>
<hystrix.version>1.4.0-RC1</hystrix.version>
+ <cassandra.version>1.2.15</cassandra.version>
</properties>