You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2014/03/27 18:20:51 UTC
[04/27] git commit: Removed scheduler to instead use the default RX
scheduler.
Removed scheduler to instead use the default RX scheduler.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/457528f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/457528f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/457528f5
Branch: refs/heads/entity-manager
Commit: 457528f50e26166a2c157897a128ef135d5a2991
Parents: df0091b
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 21 10:15:22 2014 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 21 10:15:22 2014 -0700
----------------------------------------------------------------------
.../collection/guice/CollectionModule.java | 8 +-
.../impl/EntityCollectionManagerImpl.java | 19 +-
.../mvcc/stage/write/WriteUniqueVerify.java | 14 +-
.../collection/rx/CassandraThreadScheduler.java | 107 -----
.../persistence/collection/rx/RxFig.java | 42 --
.../stage/write/WriteUniqueVerifyStageTest.java | 3 +-
.../mvcc/stage/write/WriteUniqueVerifyTest.java | 6 +-
.../rx/CassandraThreadSchedulerTest.java | 413 -------------------
.../persistence/collection/rx/ParallelTest.java | 6 +-
.../graph/consistency/AsyncProcessorImpl.java | 6 +-
.../graph/hystrix/HystrixGraphObservable.java | 52 +++
.../persistence/graph/impl/EdgeManagerImpl.java | 58 +--
.../graph/impl/NodeDeleteListener.java | 23 +-
.../graph/impl/stage/AbstractEdgeRepair.java | 11 +-
.../graph/impl/stage/EdgeDeleteRepairImpl.java | 4 +-
.../graph/impl/stage/EdgeMetaRepairImpl.java | 13 +-
.../graph/impl/stage/EdgeWriteRepairImpl.java | 4 +-
.../persistence/graph/EdgeManagerTimeoutIT.java | 125 ++++--
.../graph/consistency/AsyncProcessorTest.java | 2 +-
.../graph/test/util/EdgeTestUtils.java | 17 +-
20 files changed, 240 insertions(+), 693 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index e0744c3..8f41a2b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -29,8 +29,6 @@ import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSy
import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
@@ -38,8 +36,6 @@ import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
import com.google.inject.AbstractModule;
import com.google.inject.assistedinject.FactoryModuleBuilder;
-import rx.Scheduler;
-
/**
* Simple module for wiring our collection api
@@ -52,8 +48,7 @@ public class CollectionModule extends AbstractModule {
@Override
protected void configure() {
//noinspection unchecked
- install( new GuicyFigModule(
- RxFig.class,
+ install( new GuicyFigModule(
MigrationManagerFig.class,
CassandraFig.class,
SerializationFig.class ) );
@@ -67,7 +62,6 @@ public class CollectionModule extends AbstractModule {
.implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class )
.build( EntityCollectionManagerFactory.class ) );
- bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index ef95a75..0a099e8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -46,10 +46,10 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
/**
@@ -65,7 +65,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
private final CollectionScope collectionScope;
private final UUIDService uuidService;
- private final Scheduler scheduler;
//start stages
@@ -87,7 +86,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
public EntityCollectionManagerImpl(
final UUIDService uuidService,
final WriteStart writeStart,
- final Scheduler scheduler,
final WriteUniqueVerify writeVerifyUnique,
final WriteOptimisticVerify writeOptimisticVerify,
final WriteCommit writeCommit,
@@ -108,7 +106,6 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
this.markCommit = markCommit;
this.uuidService = uuidService;
- this.scheduler = scheduler;
this.collectionScope = collectionScope;
}
@@ -138,7 +135,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// fire the stages
- // TODO use our own scheduler to help with multitenancy here.
+ // TODO use our own Schedulers.io() to help with multitenancy here.
// TODO writeOptimisticVerify and writeVerifyUnique should be concurrent to reduce wait time
// these 3 lines could be done in a single line, but they are on multiple lines for clarity
@@ -146,7 +143,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>( collectionScope, entity );
Observable<CollectionIoEvent<MvccEntity>> observable =
- Observable.from( writeData ).subscribeOn( scheduler ).map( writeStart ).flatMap(
+ Observable.from( writeData ).subscribeOn( Schedulers.io() ).map( writeStart ).flatMap(
new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>() {
@Override
@@ -163,13 +160,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Observable<CollectionIoEvent<MvccEntity>> unique =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
.flatMap( writeVerifyUnique);
// optimistic verification
Observable<CollectionIoEvent<MvccEntity>> optimistic =
- Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( scheduler )
+ Observable.from( mvccEntityCollectionIoEvent ).subscribeOn( Schedulers.io() )
.map( writeOptimisticVerify );
@@ -193,7 +190,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
// execute all validation stages concurrently. Needs refactored when this is done.
// https://github.com/Netflix/RxJava/issues/627
- // observable = Concurrent.concurrent( observable, scheduler, new WaitZip(),
+ // observable = Concurrent.concurrent( observable, Schedulers.io(), new WaitZip(),
// writeVerifyUnique, writeOptimisticVerify );
//return the commit result.
@@ -209,7 +206,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this stage" );
return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
- .subscribeOn( scheduler ).map( markStart ).map( markCommit );
+ .subscribeOn( Schedulers.io() ).map( markStart ).map( markCommit );
}
@@ -221,7 +218,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager {
Preconditions.checkNotNull( entityId.getType(), "Entity id type required in load stage");
return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId ) )
- .subscribeOn( scheduler ).map( load );
+ .subscribeOn( Schedulers.io() ).map( load );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index fe53b13..d7733c2 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -39,9 +39,9 @@ import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
-import rx.Scheduler;
import rx.functions.Func1;
import rx.functions.FuncN;
+import rx.schedulers.Schedulers;
/**
@@ -54,22 +54,16 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
private final UniqueValueSerializationStrategy uniqueValueStrat;
-
- private final Scheduler scheduler;
-
protected final SerializationFig serializationFig;
@Inject
- public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
- final Scheduler scheduler, final SerializationFig serializationFig ) {
+ public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy, final SerializationFig serializationFig ) {
Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
- Preconditions.checkNotNull( scheduler, "scheduler is required" );
Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
this.uniqueValueStrat = uniqueValueSerializiationStrategy;
- this.scheduler = scheduler;
this.serializationFig = serializationFig;
}
@@ -101,7 +95,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
//if it's unique, create a function to validate it and add it to the list of concurrent validations
if ( field.isUnique() ) {
- Observable<FieldUniquenessResult> result = Observable.from( field ).subscribeOn( scheduler ).map(new Func1<Field, FieldUniquenessResult>() {
+ Observable<FieldUniquenessResult> result = Observable.from( field ).subscribeOn( Schedulers.io() ).map(new Func1<Field, FieldUniquenessResult>() {
@Override
public FieldUniquenessResult call(Field field ) {
@@ -139,7 +133,7 @@ public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, O
//short circuit. If we zip up nothing, we block forever.
if(fields.size() == 0){
- return Observable.from(ioevent ).subscribeOn( scheduler );
+ return Observable.from(ioevent ).subscribeOn( Schedulers.io() );
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
deleted file mode 100644
index b2f2584..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.name.Named;
-
-import rx.Scheduler;
-import rx.schedulers.Schedulers;
-
-
-public class CassandraThreadScheduler implements Provider<Scheduler> {
-
- private static final Logger LOG = LoggerFactory.getLogger(CassandraThreadScheduler.class);
-
- private final RxFig rxFig;
-
-
- @Inject
- public CassandraThreadScheduler( final RxFig rxFig ) {
- this.rxFig = rxFig;
- }
-
-
- @Override
- @Named( "cassandraScheduler" )
- public Scheduler get() {
-
- //create our thread factory so we can label our threads in case we need to dump them
- final ThreadFactory factory = new ThreadFactory() {
-
- private final AtomicLong counter = new AtomicLong();
-
- @Override
- public Thread newThread( final Runnable r ) {
-
- final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
- LOG.debug( "Allocating new IO thread with name {}", threadName );
-
- Thread t = new Thread( r, threadName );
- t.setDaemon( true );
- return t;
- }
- };
-
-
- /**
- * Create a threadpool that will reclaim unused threads after 60 seconds.
- * It uses the max thread count set here. It intentionally uses the
- * DynamicProperty, so that when it is updated, the listener updates the
- * pool size. Additional allocation is trivial. Shrinking the size
- * will require all currently executing threads to run to completion,
- * without allowing additional tasks to be queued.
- */
- final ThreadPoolExecutor pool = new ThreadPoolExecutor(
- 0, rxFig.getMaxThreadCount(), 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>(), factory, new ThreadPoolExecutor.AbortPolicy() );
-
-
- // if our max thread count is updated, we want to immediately update the pool.
- // Per the javadoc if the size is smaller, existing threads will continue to run
- // until they become idle and time out
- rxFig.addPropertyChangeListener( new PropertyChangeListener() {
- @Override
- public void propertyChange( final PropertyChangeEvent evt ) {
- if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount" ) ) ) {
- LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {}, new = {} ",
- evt.getOldValue(), evt.getNewValue() );
- pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
- }
- }
- } );
-
- return Schedulers.executor( pool );
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
deleted file mode 100644
index 54a5211..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- * Configuration interface for RxJava classes.
- */
-@FigSingleton
-public interface RxFig extends GuicyFig {
-
- public static final String PROP_THREAD = "rx.cassandra.io.threads";
-
- /**
- * Max number of threads a pool can allocate. Can be dynamically changed after starting
- */
- @Key( PROP_THREAD )
- @Default( "100" )
- int getMaxThreadCount();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
index 78b7d13..863e3ce 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyStageTest.java
@@ -40,9 +40,8 @@ public class WriteUniqueVerifyStageTest extends AbstractMvccEntityStageTest {
@Override
protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
UniqueValueSerializationStrategy uvstrat = mock( UniqueValueSerializationStrategy.class );
- Scheduler scheduler = mock( Scheduler.class );
SerializationFig fig = mock( SerializationFig.class );
- new WriteUniqueVerify( uvstrat, scheduler, fig ).call( event );
+ new WriteUniqueVerify( uvstrat, fig ).call( event );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 20dfa91..1a12fef 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -81,8 +81,6 @@ public class WriteUniqueVerifyTest {
public MigrationManagerRule migrationManagerRule;
- @Inject
- private Scheduler scheduler;
@Inject
private SerializationFig fig;
@@ -103,7 +101,7 @@ public class WriteUniqueVerifyTest {
final MvccEntity mvccEntity = fromEntity( entity );
// run the stage
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, scheduler, fig );
+ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
CollectionIoEvent<MvccEntity> result = newStage.call(
new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
@@ -134,7 +132,7 @@ public class WriteUniqueVerifyTest {
final MvccEntity mvccEntity = fromEntity( entity );
// run the stage
- WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, scheduler, fig );
+ WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig );
CollectionIoEvent<MvccEntity> result = newStage.call(
new CollectionIoEvent<MvccEntity>( collectionScope, mvccEntity ) )
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
deleted file mode 100644
index c700e06..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
+++ /dev/null
@@ -1,413 +0,0 @@
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.jukito.JukitoRunner;
-import org.jukito.UseModules;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
-
-import com.google.inject.Inject;
-
-import rx.Scheduler;
-import rx.functions.Action1;
-import rx.schedulers.Schedulers;
-import rx.util.functions.Action0;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-
-/**
- * Test for our scheduler
- */
-@RunWith( JukitoRunner.class )
-@UseModules( TestCollectionModule.class )
-public class CassandraThreadSchedulerTest {
-
-
- private static final Logger LOG = LoggerFactory.getLogger( CassandraThreadSchedulerTest.class );
-
- /**
- * Number of milliseconds to wait when trying to acquire the semaphore
- */
- private static final long TEST_TIMEOUT = 30000;
-
- @Inject
- private RxFig rxFig;
-
-
- @Test
- public void testMaxLimit() throws InterruptedException {
-
- final int maxCount = 10;
-
- rxFig.override( RxFig.PROP_THREAD, ""+ maxCount );
-
- final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
- final Scheduler rxScheduler = cassSchedulerSetup.get();
-
- //we want a fair semaphore so we can release in acquire order
- final Semaphore semaphore = new Semaphore( 0, true );
-
- //we should not have maxCount actions running in the scheduler
- CountDownLatch result = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore, TEST_TIMEOUT );
-
- //schedule and we should fail
-
- try {
- rxScheduler.schedule( new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- }
- });
-
- fail( "This should have thrown an exception" );
- }
- catch ( RejectedExecutionException ree ) {
- //swallow, we just want to ensure we catch this to continue the test
- }
-
- //now release the semaphore so all 10 can run
- semaphore.release( rxFig.getMaxThreadCount() );
-
- //wait for completion
- boolean completed = result.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
-
- //verify we can schedule and execute a new operation
- result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT );
-
- semaphore.release( 1 );
-
- completed = result.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
- }
-
-
- /**
- * Test running from a max limit to a lower limit and fails to schedule new threads
- */
- @Test
- public void testMaxLimitShrink() throws InterruptedException {
-
- final int maxCount = 10;
-
- final int half = maxCount / 2;
-
- //kind of a hack, but necessary with the way properties are singletons. Otherwise you get side effects from
- // other tests
- rxFig.override( RxFig.PROP_THREAD, "" + maxCount );
-
- final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
- final Scheduler rxScheduler = cassSchedulerSetup.get();
-
- //we want a fair semaphore so we can release in acquire order
- final Semaphore semaphore = new Semaphore( 0, true );
-
- //we should not have maxCount actions running in the scheduler
- CountDownLatch firstHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT );
-
- //schedule the second half
- CountDownLatch secondHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT );
-
- //schedule and we should fail
-
- try {
-
- rxScheduler.schedule( new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- //no op
- }
- } );
-
- fail( "This should have thrown an exception" );
- }
- catch ( RejectedExecutionException ree ) {
- //swallow, we just want to ensure we catch this to continue the test
- }
-
-
- //update the property to shrink the size
- rxFig.override( RxFig.PROP_THREAD, "" + half );
-
- //now release the first half of executors
- semaphore.release( half );
-
-
- //wait for completion
- boolean completed = firstHalf.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
-
- //verify we can't schedule b/c we're still at capacity
-
-
- try {
-
- rxScheduler.schedule( new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- //no op
- }
- } );
-
- fail( "This should have thrown an exception. We still don't have capacity for new threads" );
- }
- catch ( RejectedExecutionException ree ) {
- //swallow, we just want to ensure we catch this to continue the test
- }
-
-
- //now release the rest of the semaphores
- semaphore.release( maxCount - half );
-
- completed = secondHalf.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
-
- //verify we can schedule and execute a new operation
- CountDownLatch newJob = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT );
-
- semaphore.release( 1 );
-
- completed = newJob.await( 20, TimeUnit.SECONDS );
- assertTrue( "Completed executing actions", completed );
- }
-
-
- /**
- * Test that when we're fully blocked, if we expand we have capacity
- */
- @Test
- public void testExpandLimit() throws InterruptedException {
-
- final int startCount = 10;
-
- //kind of a hack, but necessary with the way properties are singletons. Otherwise you get side effects from
- // other tests
- rxFig.override( RxFig.PROP_THREAD, "" + startCount );
-
- final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler( rxFig );
-
- final Scheduler rxScheduler = cassSchedulerSetup.get();
-
- //we want a fair semaphore so we can release in acquire order
- final Semaphore semaphore = new Semaphore( 0, true );
-
- //we should not have maxCount actions running in the scheduler
- CountDownLatch firstBatch = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore, TEST_TIMEOUT );
-
- //schedule and we should fail
-
- try {
-
- rxScheduler.schedule( new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- //no op
- }
- } );
-
- fail( "This should have thrown an exception" );
- }
- catch ( RejectedExecutionException ree ) {
- //swallow, we just want to ensure we catch this to continue the test
- }
-
-
- //now allocate more capacity
- final int doubleMaxCount = startCount * 2;
-
- //update the property to shrink the size
- rxFig.override( RxFig.PROP_THREAD, "" + doubleMaxCount );
-
-
- //now schedule 10 more
-
- CountDownLatch secondBatch = schedule( rxScheduler, rxFig.getMaxThreadCount() - startCount, semaphore, TEST_TIMEOUT );
-
- //this should fail. We're at capacity
-
- try {
-
- rxScheduler.schedule( new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- //no op
- }
- } );
-
- fail( "This should have thrown an exception" );
- }
- catch ( RejectedExecutionException ree ) {
- //swallow, we just want to ensure we catch this to continue the test
- }
-
-
- //now release the semaphores so all
- semaphore.release( rxFig.getMaxThreadCount() );
-
- //wait for completion
- boolean completed = firstBatch.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
-
- completed = secondBatch.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
-
- //verify we can schedule and execute a new operation
- CountDownLatch result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT );
-
- semaphore.release( 1 );
-
- completed = result.await( 20, TimeUnit.SECONDS );
-
- assertTrue( "Completed executing actions", completed );
- }
-
- @Test(expected = RejectedExecutionException.class)
- public void schedulerPoc() throws InterruptedException {
-
- final int size = 10;
-
- //create our thread factory so we can label our threads in case we need to dump them
- final ThreadFactory factory = new ThreadFactory() {
-
- private final AtomicLong counter = new AtomicLong();
-
- @Override
- public Thread newThread( final Runnable r ) {
-
- final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
- LOG.debug( "Allocating new IO thread with name {}", threadName );
-
- Thread t = new Thread( r, threadName );
- t.setDaemon( true );
- return t;
- }
- };
-
-
- /**
- * Create a threadpool that will reclaim unused threads after 60 seconds.
- * It uses the max thread count set here. It intentionally uses the
- * DynamicProperty, so that when it is updated, the listener updates the
- * pool size. Additional allocation is trivial. Shrinking the size
- * will require all currently executing threads to run to completion,
- * without allowing additional tasks to be queued.
- */
- final ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(10, factory, new ThreadPoolExecutor.AbortPolicy());
- //set the max thread size
- pool.setMaximumPoolSize( 10 );
- pool.setKeepAliveTime( 60, TimeUnit.SECONDS );
-
- final CountDownLatch latch = new CountDownLatch( size );
- final Semaphore semaphore = new Semaphore( 0 );
-
- for(int i = 0; i < size; i ++){
- pool.schedule( new Callable<Object>() {
- @Override
- public Object call() throws Exception {
-
- latch.countDown();
-
- //block so this is still running in the scheduled execute
- semaphore.acquire();
-
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }, 0, TimeUnit.MILLISECONDS );
-
-
-
-
- }
-
-
- //wait for all our threads to get to semaphore acquisition and block to ensure we're running at capacity
- latch.await();
-
- //now schedule 1 more, we should blow up
-
- pool.schedule( new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- return null; //To change body of implemented methods use File | Settings | File Templates.
- }
- }, 0, TimeUnit.MILLISECONDS );
-
-
-
- }
-
-
- /**
- * Schedule actions into the semaphore.
- *
- * @param rxScheduler the Scheduler to use
- * @param totalCount The total count of actions to invoke
- * @param semaphore The semaphore to take on acquire
- *
- * @return The latch to block on. When all jobs have been executed this will be tripped
- */
- private CountDownLatch schedule( Scheduler rxScheduler, final int totalCount, final Semaphore semaphore, final long timeout ) {
-
- final CountDownLatch latch = new CountDownLatch( totalCount );
-
- for ( int i = 0; i < totalCount; i++ ) {
-
- final Action1<Scheduler.Inner> action = new Action1<Scheduler.Inner>() {
- @Override
- public void call( final Scheduler.Inner inner ) {
- try {
- final String threadName = Thread.currentThread().getName();
-
- LOG.info( "{} trying to acquire semaphore", threadName );
- //get and release the lock
- semaphore.tryAcquire( timeout, TimeUnit.MILLISECONDS );
-
- LOG.info( "{} has acquired sempahore", threadName );
-
- //countdown the latch
- latch.countDown();
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException( e );
- }
- }
- };
-
- rxScheduler.schedule( action );
- }
-
-
- return latch;
- }
-
-
-
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index 5d72ae6..471fe1f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -79,7 +79,7 @@ public class ParallelTest {
// final Scheduler scheduler = Schedulers.threadPoolForComputation();
//use the I/O scheduler to allow enough thread, otherwise our pool will be the same size as the # of cores
- final Scheduler scheduler = Schedulers.io();
+
//set our size equal
// ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
@@ -102,7 +102,7 @@ public class ParallelTest {
* non blocking?
*/
- final Observable<String> observable = Observable.from( input ).observeOn( scheduler );
+ final Observable<String> observable = Observable.from( input ).observeOn( Schedulers.io() );
Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>() {
@@ -124,7 +124,7 @@ public class ParallelTest {
/**
* QUESTION: Should this again be the process thread, not the I/O
*/
- Observable<String> newObservable = Observable.from( input ).subscribeOn( scheduler );
+ Observable<String> newObservable = Observable.from( input ).subscribeOn( Schedulers.io() );
Observable<Integer> transformed = newObservable.map( new Func1<String, Integer>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
index b949d55..2b00d1a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorImpl.java
@@ -36,7 +36,6 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
private static final Logger LOG = LoggerFactory.getLogger( AsyncProcessor.class );
protected final TimeoutQueue<T> queue;
- protected final Scheduler scheduler;
protected final GraphFig graphFig;
protected final List<MessageListener<T, T>> listeners = new ArrayList<MessageListener<T, T>>();
@@ -46,9 +45,8 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
@Inject
- public AsyncProcessorImpl( final TimeoutQueue<T> queue, final Scheduler scheduler, final GraphFig graphFig ) {
+ public AsyncProcessorImpl( final TimeoutQueue<T> queue, final GraphFig graphFig ) {
this.queue = queue;
- this.scheduler = scheduler;
this.graphFig = graphFig;
//we purposefully use a new thread. We don't want to use one of the I/O threads to run this task
@@ -72,7 +70,7 @@ public class AsyncProcessorImpl<T> implements AsyncProcessor<T> {
List<Observable<?>> observables = new ArrayList<Observable<?>>( listeners.size() );
for ( MessageListener<T, T> listener : listeners ) {
- observables.add( listener.receive( data ).subscribeOn( scheduler ) );
+ observables.add( listener.receive( data ).subscribeOn( Schedulers.io() ) );
}
//run everything in parallel and zip it up
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
new file mode 100644
index 0000000..8a44389
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/hystrix/HystrixGraphObservable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.graph.hystrix;
+
+
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixObservableCommand;
+
+import rx.Observable;
+import rx.schedulers.Schedulers;
+
+
+/**
+ *
+ *
+ */
+public class HystrixGraphObservable {
+
+ /**
+ * Wrap the observable in the timeout
+ * @param observable
+ * @param <T>
+ * @return
+ */
+ public static <T> Observable<T> wrap(final Observable<T> observable){
+ return new HystrixObservableCommand<T>( HystrixCommandGroupKey.Factory.asKey( "Graph" ) ){
+
+ @Override
+ protected Observable<T> run() {
+ return observable;
+ }
+ }.toObservable( Schedulers.io() );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index f825767..4e5746a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -57,6 +57,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -68,8 +69,6 @@ public class EdgeManagerImpl implements EdgeManager {
private final OrganizationScope scope;
- private final Scheduler scheduler;
-
private final EdgeMetadataSerialization edgeMetadataSerialization;
@@ -85,7 +84,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Inject
- public EdgeManagerImpl( final Scheduler scheduler, final EdgeMetadataSerialization edgeMetadataSerialization,
+ public EdgeManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization edgeSerialization, final NodeSerialization nodeSerialization,
final GraphFig graphFig, @EdgeWrite final AsyncProcessor edgeWrite,
@EdgeDelete final AsyncProcessor edgeDelete, @NodeDelete final AsyncProcessor nodeDelete,
@@ -95,7 +94,6 @@ public class EdgeManagerImpl implements EdgeManager {
this.scope = scope;
- this.scheduler = scheduler;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeSerialization = edgeSerialization;
this.nodeSerialization = nodeSerialization;
@@ -116,7 +114,7 @@ public class EdgeManagerImpl implements EdgeManager {
public Observable<Edge> writeEdge( final Edge edge ) {
EdgeUtils.validateEdge( edge );
- return Observable.from( edge ).subscribeOn( scheduler ).map( new Func1<Edge, Edge>() {
+ return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
@Override
public Edge call( final Edge edge ) {
final MutationBatch mutation = edgeMetadataSerialization.writeEdge( scope, edge );
@@ -146,7 +144,7 @@ public class EdgeManagerImpl implements EdgeManager {
public Observable<Edge> deleteEdge( final Edge edge ) {
EdgeUtils.validateEdge( edge );
- return Observable.from( edge ).subscribeOn( scheduler ).map( new Func1<Edge, Edge>() {
+ return Observable.from( edge ).subscribeOn( Schedulers.io() ).map( new Func1<Edge, Edge>() {
@Override
public Edge call( final Edge edge ) {
final MutationBatch edgeMutation = edgeSerialization.markEdge( scope, edge );
@@ -172,7 +170,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<Id> deleteNode( final Id node ) {
- return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Id>() {
+ return Observable.from( node ).subscribeOn( Schedulers.io() ).map( new Func1<Id, Id>() {
@Override
public Id call( final Id id ) {
@@ -201,33 +199,37 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<Edge> loadEdgesFromSource( final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
- @Override
- protected Iterator<MarkedEdge> getIterator() {
- return edgeSerialization.getEdgesFromSource( scope, search );
- }
- } )//we intentionally use distinct until changed. This way we won't store all the keys since this
- //would hog far too much ram.
- .distinctUntilChanged( new Func1<Edge, Id>() {
+
+
+ return
+
+ Observable.create( new ObservableIterator<MarkedEdge>() {
@Override
- public Id call( final Edge edge ) {
- return edge.getTargetNode();
+ protected Iterator<MarkedEdge> getIterator() {
+ return edgeSerialization.getEdgesFromSource( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap( new EdgeBufferFilter( search.getMaxVersion() ) )
- .cast( Edge.class );
+ } )//we intentionally use distinct until changed. This way we won't store all the keys since this
+ //would hog far too much memory.
+ .distinctUntilChanged( new Func1<Edge, Id>() {
+ @Override
+ public Id call( final Edge edge ) {
+ return edge.getTargetNode();
+ }
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( search.getMaxVersion() ) ).cast( Edge.class );
}
@Override
public Observable<Edge> loadEdgesToTarget( final SearchByEdgeType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
}
} )
//we intentionally use distinct until changed. This way we won't store all the keys since this
- //would hog far too much ram.
+ //would hog far too much memory.
.distinctUntilChanged( new Func1<Edge, Id>() {
@Override
public Id call( final Edge edge ) {
@@ -240,7 +242,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<Edge> loadEdgesFromSourceByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSourceByTargetType( scope, search );
@@ -258,7 +260,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<Edge> loadEdgesToTargetByType( final SearchByIdType search ) {
- return Observable.create( new ObservableIterator<MarkedEdge>( ) {
+ return Observable.create( new ObservableIterator<MarkedEdge>() {
@Override
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTargetBySourceType( scope, search );
@@ -276,7 +278,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<String> getEdgeTypesFromSource( final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>() {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
@@ -287,7 +289,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<String> getIdTypesFromSource( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>() {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
@@ -299,7 +301,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<String> getEdgeTypesToTarget( final SearchEdgeType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>() {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
@@ -310,7 +312,7 @@ public class EdgeManagerImpl implements EdgeManager {
@Override
public Observable<String> getIdTypesToTarget( final SearchIdType search ) {
- return Observable.create( new ObservableIterator<String>( ) {
+ return Observable.create( new ObservableIterator<String>() {
@Override
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
@@ -351,7 +353,7 @@ public class EdgeManagerImpl implements EdgeManager {
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
final Map<Id, UUID> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- return Observable.from( markedEdges ).subscribeOn( scheduler )
+ return Observable.from( markedEdges ).subscribeOn( Schedulers.io() )
.filter( new EdgeFilter( this.maxVersion, markedVersions ) );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
index 0927ba8..190bbff 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/NodeDeleteListener.java
@@ -32,6 +32,7 @@ import rx.Observable;
import rx.Scheduler;
import rx.functions.Action0;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -47,29 +48,23 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
private final EdgeMetadataSerialization edgeMetadataSerialization;
private final EdgeDeleteRepair edgeDeleteRepair;
private final EdgeMetaRepair edgeMetaRepair;
- private final GraphFig graphFig;
-
- private final Scheduler scheduler;
/**
* Wire the serialization dependencies
*/
@Inject
- public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization,
- final Scheduler scheduler, @NodeDelete final AsyncProcessor nodeDelete,
+ public NodeDeleteListener( final NodeSerialization nodeSerialization, final EdgeSerialization edgeSerialization, @NodeDelete final AsyncProcessor nodeDelete,
final EdgeMetadataSerialization edgeMetadataSerialization,
- final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair,
- final GraphFig graphFig ) {
+ final EdgeDeleteRepair edgeDeleteRepair, final EdgeMetaRepair edgeMetaRepair
+ ) {
this.nodeSerialization = nodeSerialization;
this.edgeSerialization = edgeSerialization;
- this.scheduler = scheduler;
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeDeleteRepair = edgeDeleteRepair;
this.edgeMetaRepair = edgeMetaRepair;
- this.graphFig = graphFig;
nodeDelete.addListener( this );
}
@@ -88,7 +83,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
final UUID version = edgeEvent.getVersion();
- return Observable.from( node ).subscribeOn( scheduler ).map( new Func1<Id, Optional<UUID>>() {
+ return Observable.from( node ).subscribeOn( Schedulers.io() ).map( new Func1<Id, Optional<UUID>>() {
@Override
public Optional<UUID> call( final Id id ) {
return nodeSerialization.getMaxVersion( scope, node );
@@ -215,7 +210,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -229,7 +224,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -243,7 +238,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesToTarget( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -257,7 +252,7 @@ public class NodeDeleteListener implements MessageListener<EdgeEvent<Id>, Intege
protected Iterator<MarkedEdge> getIterator() {
return edgeSerialization.getEdgesFromSource( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
index fb31a33..3ff7178 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/AbstractEdgeRepair.java
@@ -44,6 +44,7 @@ import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;
+import rx.schedulers.Schedulers;
/**
@@ -59,16 +60,14 @@ public abstract class AbstractEdgeRepair {
protected final EdgeSerialization edgeSerialization;
protected final GraphFig graphFig;
protected final Keyspace keyspace;
- protected final Scheduler scheduler;
@Inject
public AbstractEdgeRepair( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
- final Keyspace keyspace, final Scheduler scheduler ) {
+ final Keyspace keyspace) {
this.edgeSerialization = edgeSerialization;
this.graphFig = graphFig;
this.keyspace = keyspace;
- this.scheduler = scheduler;
}
@@ -106,7 +105,7 @@ public abstract class AbstractEdgeRepair {
throw new RuntimeException( "Unable to issue write to cassandra", e );
}
- return Observable.from( markedEdges ).subscribeOn( scheduler );
+ return Observable.from( markedEdges ).subscribeOn( Schedulers.io() );
}
} );
}
@@ -133,7 +132,7 @@ public abstract class AbstractEdgeRepair {
return edgeSerialization.getEdgeFromSource( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -150,7 +149,7 @@ public abstract class AbstractEdgeRepair {
return edgeSerialization.getEdgeToTarget( scope, search );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index bf5d3d2..e900a1d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -47,8 +47,8 @@ public class EdgeDeleteRepairImpl extends AbstractEdgeRepair implements EdgeDele
@Inject
public EdgeDeleteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
- final Keyspace keyspace, final Scheduler scheduler ) {
- super( edgeSerialization, graphFig, keyspace, scheduler );
+ final Keyspace keyspace) {
+ super( edgeSerialization, graphFig, keyspace );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0bbc91a..8b81d0a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -51,6 +51,7 @@ import rx.Scheduler;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observables.MathObservable;
+import rx.schedulers.Schedulers;
/**
@@ -65,18 +66,16 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
private final EdgeSerialization edgeSerialization;
private final Keyspace keyspace;
private final GraphFig graphFig;
- private final Scheduler scheduler;
@Inject
public EdgeMetaRepairImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization edgeSerialization, final Keyspace keyspace,
- final GraphFig graphFig, final Scheduler scheduler ) {
+ final GraphFig graphFig ) {
this.edgeMetadataSerialization = edgeMetadataSerialization;
this.edgeSerialization = edgeSerialization;
this.keyspace = keyspace;
this.graphFig = graphFig;
- this.scheduler = scheduler;
}
@@ -257,7 +256,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
return edgeMetadataSerialization
.getIdTypesToTarget( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -270,7 +269,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
return edgeSerialization.getEdgesToTargetBySourceType( scope,
new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -302,7 +301,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
return edgeMetadataSerialization
.getIdTypesFromSource( scope, new SimpleSearchIdType( nodeId, edgeType, null ) );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
@@ -315,7 +314,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
return edgeSerialization.getEdgesFromSourceByTargetType( scope,
new SimpleSearchByIdType( nodeId, edgeType, version, subType, null ) );
}
- } ).subscribeOn( scheduler );
+ } ).subscribeOn( Schedulers.io() );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
index e8420a8..f77ef95 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeWriteRepairImpl.java
@@ -47,8 +47,8 @@ public class EdgeWriteRepairImpl extends AbstractEdgeRepair implements EdgeWrite
@Inject
public EdgeWriteRepairImpl( final EdgeSerialization edgeSerialization, final GraphFig graphFig,
- final Keyspace keyspace, final Scheduler scheduler ) {
- super( edgeSerialization, graphFig, keyspace, scheduler );
+ final Keyspace keyspace) {
+ super( edgeSerialization, graphFig, keyspace );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
index e4591cd..6ee4183 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/EdgeManagerTimeoutIT.java
@@ -20,10 +20,13 @@
package org.apache.usergrid.persistence.graph;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jukito.All;
import org.jukito.JukitoRunner;
@@ -33,7 +36,8 @@ import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.safehaus.guicyfig.GuicyFig;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.usergrid.persistence.collection.OrganizationScope;
import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
@@ -41,14 +45,16 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import com.google.inject.Inject;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
import rx.Observable;
-import rx.functions.Action1;
+import rx.Subscriber;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
@@ -57,13 +63,14 @@ import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.crea
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-@RunWith(JukitoRunner.class)
-@UseModules({ TestGraphModule.class })
+@RunWith( JukitoRunner.class )
+@UseModules( { TestGraphModule.class } )
//@UseModules( { TestGraphModule.class, EdgeManagerIT.InvalidInput.class } )
public class EdgeManagerTimeoutIT {
@@ -101,50 +108,117 @@ public class EdgeManagerTimeoutIT {
when( scope.getOrganization() ).thenReturn( orgId );
- if(graphFig.getReadTimeout() > TIMEOUT){
- fail("Graph read timeout must be <= " + TIMEOUT + ". Otherwise tests are invalid");
+ if ( graphFig.getReadTimeout() > TIMEOUT ) {
+ fail( "Graph read timeout must be <= " + TIMEOUT + ". Otherwise tests are invalid" );
}
}
- @Test(timeout = TIMEOUT, expected = TimeoutException.class)
- public void testWriteReadEdgeTypeSource() throws InterruptedException {
+ // @Test(timeout = TIMEOUT, expected = TimeoutException.class)
+ @Test
+ public void testWriteReadEdgeTypeSource( EdgeSerialization serialization ) throws InterruptedException {
- EdgeManager em = emf.createEdgeManager( scope );
+ final EdgeManager em = emf.createEdgeManager( scope );
- Edge edge = createEdge( "source", "test", "target" );
- em.writeEdge( edge ).toBlockingObservable().last();
+ final MarkedEdge edge = createEdge( "source", "edge", "target" );
//now test retrieving it
SearchByEdgeType search = createSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getVersion(), null );
+
+ final MockingIterator<MarkedEdge> itr = new MockingIterator<>( Collections.singletonList( edge ) );
+
+
+ when( serialization.getEdgesFromSource( scope, search ) ).thenReturn( itr );
+
Observable<Edge> edges = em.loadEdgesFromSource( search );
//retrieve the edge, ensure that if we block indefinitely, it times out
- final Semaphore blocker = new Semaphore(0);
+ final AtomicInteger onNextCounter = new AtomicInteger();
+ final CountDownLatch errorLatch = new CountDownLatch( 1 );
+
+ final Throwable[] thrown = new Throwable[1];
- edges.subscribe( new Action1<Edge>() {
+
+
+ edges.subscribe( new Subscriber<Edge>() {
@Override
- public void call( final Edge edge ) {
- //block indefinitely, we want to ensure we timeout
- try {
- blocker.acquire();
- }
- catch ( InterruptedException e ) {
- throw new RuntimeException(e);
+ public void onCompleted() {
+
+ }
+
+
+ @Override
+ public void onError( final Throwable e ) {
+ thrown[0] = e;
+ errorLatch.countDown();
+ }
+
+
+ @Override
+ public void onNext( final Edge edge ) {
+ {
+ onNextCounter.incrementAndGet();
}
}
} );
- blocker.acquire();
+
+ errorLatch.await();
+
+
+ assertEquals( "One lement was produced", 1,onNextCounter.intValue() );
+ assertTrue(thrown[0] instanceof HystrixRuntimeException);
}
+ private class MockingIterator<T> implements Iterator<T> {
+
+ private final Iterator<T> items;
+
+ private final Semaphore semaphore = new Semaphore( 0 );
+
+
+ private MockingIterator( final Collection<T> items ) {
+ this.items = items.iterator();
+ }
+
+
+ @Override
+ public boolean hasNext() {
+ return true;
+ }
+
+
+ @Override
+ public T next() {
+ if ( items.hasNext() ) {
+ return items.next();
+ }
+
+ //block indefinitely
+ try {
+ semaphore.acquire();
+ }
+ catch ( InterruptedException e ) {
+ throw new RuntimeException( e );
+ }
+
+ return null;
+ }
+
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException( "Cannot remove" );
+ }
+ }
+
@Test
public void testWriteReadEdgeTypeTarget() {
@@ -1336,8 +1410,6 @@ public class EdgeManagerTimeoutIT {
}
-
-
@Test
public void markTargetNode() {
@@ -1420,8 +1492,7 @@ public class EdgeManagerTimeoutIT {
}
-
- @Test(expected = NullPointerException.class)
+ @Test( expected = NullPointerException.class )
public void invalidEdgeTypesWrite( @All Edge edge ) {
final EdgeManager em = emf.createEdgeManager( scope );
@@ -1429,7 +1500,7 @@ public class EdgeManagerTimeoutIT {
}
- @Test(expected = NullPointerException.class)
+ @Test( expected = NullPointerException.class )
public void invalidEdgeTypesDelete( @All Edge edge ) {
final EdgeManager em = emf.createEdgeManager( scope );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
index 3652757..eaade41 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/consistency/AsyncProcessorTest.java
@@ -280,7 +280,7 @@ public class AsyncProcessorTest {
when(fig.getScanPageSize()).thenReturn( 0 );
- AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, Schedulers.io(), fig );
+ AsyncProcessorImpl<T> processor = new AsyncProcessorImpl( queue, fig );
return processor;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/457528f5/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index e9f05e0..68a67d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.persistence.graph.test.util;
import java.util.UUID;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
import org.apache.usergrid.persistence.graph.SearchByEdge;
import org.apache.usergrid.persistence.graph.SearchByEdgeType;
import org.apache.usergrid.persistence.graph.SearchByIdType;
@@ -53,7 +54,7 @@ public class EdgeTestUtils {
*
* @return an Edge for testing
*/
- public static Edge createEdge( final String sourceType, final String edgeType, final String targetType ) {
+ public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType ) {
return createEdge( createId( sourceType ), edgeType, createId( targetType ), UUIDGenerator.newTimeUUID() );
}
@@ -61,7 +62,7 @@ public class EdgeTestUtils {
/**
* Create an edge for testing
*/
- public static Edge createEdge( final Id sourceId, final String edgeType, final Id targetId ) {
+ public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId ) {
return createEdge( sourceId, edgeType, targetId, UUIDGenerator.newTimeUUID() );
}
@@ -69,12 +70,22 @@ public class EdgeTestUtils {
/**
* Create an edge with the specified params
*/
- public static Edge createEdge( final Id sourceId, final String edgeType, final Id targetId, final UUID version ) {
+ public static MarkedEdge createEdge( final Id sourceId, final String edgeType, final Id targetId,
+ final UUID version ) {
return new SimpleMarkedEdge( sourceId, edgeType, targetId, version, false );
}
/**
+ * Create an edge with the specified params
+ */
+ public static MarkedEdge createEdge( final String sourceType, final String edgeType, final String targetType,
+ final UUID version ) {
+ return new SimpleMarkedEdge( createId( sourceType ), edgeType, createId( targetType ), version, false );
+ }
+
+
+ /**
* Create the id
*/
public static Id createId( String type ) {