You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/05/11 19:37:16 UTC
[02/14] incubator-usergrid git commit: First pass at refactoring mark
+ sweep
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
index 107b2a0..9ed254c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccLogEntrySerializationStrategyImplTest.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.persistence.collection.serialization.impl;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -43,6 +45,7 @@ import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.fasterxml.uuid.UUIDComparator;
import com.google.inject.Inject;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
@@ -65,15 +68,15 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
private MvccLogEntrySerializationStrategy logEntryStrategy;
+
@Before
- public void wireLogEntryStrategy(){
+ public void wireLogEntryStrategy() {
logEntryStrategy = getLogEntryStrategy();
}
/**
* Get the log entry strategy from
- * @return
*/
protected abstract MvccLogEntrySerializationStrategy getLogEntryStrategy();
@@ -181,6 +184,131 @@ public abstract class MvccLogEntrySerializationStrategyImplTest {
}
+ @Test
+ public void getReversedEntries() throws ConnectionException {
+
+ final Id applicationId = new SimpleId( "application" );
+
+ ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+ final Id id = new SimpleId( "test" );
+
+ int count = 10;
+
+ final UUID[] versions = new UUID[count];
+ final Stage COMPLETE = Stage.COMPLETE;
+ final MvccLogEntry[] entries = new MvccLogEntry[count];
+
+
+ for ( int i = 0; i < count; i++ ) {
+ versions[i] = UUIDGenerator.newTimeUUID();
+
+ entries[i] = new MvccLogEntryImpl( id, versions[i], COMPLETE, MvccLogEntry.State.COMPLETE );
+ logEntryStrategy.write( context, entries[i] ).execute();
+
+ //Read it back
+
+ MvccLogEntry returned =
+ logEntryStrategy.load( context, Collections.singleton( id ), versions[i] ).getMaxVersion( id );
+
+ assertNotNull( "Returned value should not be null", returned );
+
+ assertEquals( "Returned should equal the saved", entries[i], returned );
+ }
+
+
+ final UUID[] assertVersions = Arrays.copyOf( versions, versions.length );
+
+ Arrays.sort( assertVersions, ( v1, v2 ) -> UUIDComparator.staticCompare( v1, v2 ) * -1 );
+
+ //now do a range scan from the end
+
+ final int half = count / 2;
+
+ final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, versions[0], half );
+
+ assertEquals( half, results.size() );
+
+ for ( int i = 0; i < count / 2; i++ ) {
+ final MvccLogEntry saved = entries[i];
+ final MvccLogEntry returned = results.get( i );
+
+ assertEquals( "Entry was not equal to the saved value", saved, returned );
+ }
+
+
+ //now get the next batch
+ final List<MvccLogEntry> results2 = logEntryStrategy.loadReversed( context, id, versions[half], count );
+
+ assertEquals( half, results2.size() );
+
+ for ( int i = 0; i < half; i++ ) {
+ final MvccLogEntry saved = entries[half + i];
+ final MvccLogEntry returned = results2.get( i );
+
+ assertEquals( "Entry was not equal to the saved value", saved, returned );
+ }
+
+
+ //now delete them all and ensure we get no results back
+ for ( int i = 0; i < count; i++ ) {
+ logEntryStrategy.delete( context, id, versions[i] ).execute();
+ }
+
+ final List<MvccLogEntry> results3 = logEntryStrategy.loadReversed( context, id, null, versions.length );
+
+ assertEquals( "All log entries were deleted", 0, results3.size() );
+ }
+
+
+ @Test
+ public void createAndDeleteEntries() throws ConnectionException {
+
+ final Id applicationId = new SimpleId( "application" );
+
+ ApplicationScope context = new ApplicationScopeImpl( applicationId );
+
+
+ final Id id = new SimpleId( "test" );
+
+
+ final int size = 10;
+
+ final List<MvccLogEntry> savedEntries = new ArrayList<>( size );
+
+ for ( int i = 0; i < size; i++ ) {
+ final UUID version = UUIDGenerator.newTimeUUID();
+ MvccLogEntry saved = new MvccLogEntryImpl( id, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+ logEntryStrategy.write( context, saved ).execute();
+
+ savedEntries.add( saved );
+ }
+
+ //now test we get them all back
+
+ final List<MvccLogEntry> results = logEntryStrategy.loadReversed( context, id, null, size );
+
+ assertEquals( size, results.size() );
+
+ //assert they're the same
+ for ( int i = 0; i < size; i++ ) {
+ assertEquals( savedEntries.get( i ), results.get( i ) );
+ }
+
+ //now delete them all
+
+ for ( final MvccLogEntry mvccLogEntry : savedEntries ) {
+ logEntryStrategy.delete( context, id, mvccLogEntry.getVersion() ).execute();
+ }
+
+ //now get them back, should be empty
+ final List<MvccLogEntry> emptyResults = logEntryStrategy.loadReversed( context, id, null, size );
+
+ assertEquals( 0, emptyResults.size() );
+ }
+
+
@Test( expected = NullPointerException.class )
public void writeParamsNoContext() throws ConnectionException {
logEntryStrategy.write( null, mock( MvccLogEntry.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
index 93de9d4..cd6ad3d 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/util/LogEntryMock.java
@@ -20,15 +20,11 @@ package org.apache.usergrid.persistence.collection.util;/*
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;
import java.util.UUID;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.mvcc.entity.Stage;
import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccLogEntryImpl;
@@ -50,7 +46,11 @@ import static org.mockito.Mockito.when;
public class LogEntryMock {
- private final TreeMap<UUID, MvccLogEntry> entries = new TreeMap<>(ReversedUUIDComparator.INSTANCE);
+ private final TreeMap<UUID, MvccLogEntry> reversedEntries =
+ new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) * -1 );
+
+ private final TreeMap<UUID, MvccLogEntry> entries =
+ new TreeMap<>( ( o1, o2 ) -> UUIDComparator.staticCompare( o1, o2 ) );
private final Id entityId;
@@ -61,78 +61,92 @@ public class LogEntryMock {
* @param entityId The entity Id to use
* @param versions The versions to use
*/
- private LogEntryMock(final Id entityId, final List<UUID> versions ) {
+ private LogEntryMock( final Id entityId, final List<UUID> versions ) {
this.entityId = entityId;
- for ( UUID version: versions) {
- entries.put( version, new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE ) );
+ for ( UUID version : versions ) {
+ final MvccLogEntry mvccLogEntry =
+ new MvccLogEntryImpl( entityId, version, Stage.ACTIVE, MvccLogEntry.State.COMPLETE );
+ reversedEntries.put( version, mvccLogEntry );
+ entries.put( version, mvccLogEntry );
}
}
/**
* Init the mock with the given data structure
+ *
* @param logEntrySerializationStrategy The strategy to moc
- * @param scope
- * @throws ConnectionException
*/
- private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope )
+ private void initMock( final MvccLogEntrySerializationStrategy logEntrySerializationStrategy,
+ final ApplicationScope scope )
- throws ConnectionException {
+ throws ConnectionException {
//wire up the mocks
- when(logEntrySerializationStrategy.load( same( scope ), same( entityId ), any(UUID.class), any(Integer.class) )).thenAnswer( new Answer<List<MvccLogEntry>>() {
-
+ when( logEntrySerializationStrategy
+ .load( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
- @Override
- public List<MvccLogEntry> answer( final InvocationOnMock invocation ) throws Throwable {
+ invocation -> {
final UUID startVersion = ( UUID ) invocation.getArguments()[2];
- final int count = (Integer)invocation.getArguments()[3];
+ final int count = ( Integer ) invocation.getArguments()[3];
final List<MvccLogEntry> results = new ArrayList<>( count );
- final Iterator<MvccLogEntry> itr = entries.tailMap( startVersion, true ).values().iterator();
+ final Iterator<MvccLogEntry> itr = reversedEntries.tailMap( startVersion, true ).values().iterator();
- for(int i = 0; i < count && itr.hasNext(); i ++){
+ for ( int i = 0; i < count && itr.hasNext(); i++ ) {
results.add( itr.next() );
}
return results;
- }
- } );
- }
+ } );
- /**
- * Get the entry at the specified index from high to low
- * @param index
- * @return
- */
- public MvccLogEntry getEntryAtIndex(final int index){
+ //mock in reverse
- final Iterator<MvccLogEntry> itr = entries.values().iterator();
+ when( logEntrySerializationStrategy
+ .loadReversed( same( scope ), same( entityId ), any( UUID.class ), any( Integer.class ) ) ).thenAnswer(
- for(int i = 0; i < index; i ++){
- itr.next();
- }
+ invocation -> {
+ final UUID startVersion = ( UUID ) invocation.getArguments()[2];
+ final int count = ( Integer ) invocation.getArguments()[3];
+
+
+ final List<MvccLogEntry> results = new ArrayList<>( count );
+
+ final Iterator<MvccLogEntry> itr;
+
+ if ( startVersion == null ) {
+ itr = entries.values().iterator();
+ }
+ else {
+ itr = entries.tailMap( startVersion, true ).values().iterator();
+ }
+
+ for ( int i = 0; i < count && itr.hasNext(); i++ ) {
+ results.add( itr.next() );
+ }
- return itr.next();
+
+ return results;
+ } );
}
/**
- *
* @param logEntrySerializationStrategy The mock to use
* @param scope The scope to use
* @param entityId The entityId to use
* @param versions The versions to mock
- * @throws ConnectionException
*/
- public static LogEntryMock createLogEntryMock(final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,final Id entityId, final List<UUID> versions )
+ public static LogEntryMock createLogEntryMock(
+ final MvccLogEntrySerializationStrategy logEntrySerializationStrategy, final ApplicationScope scope,
+ final Id entityId, final List<UUID> versions )
- throws ConnectionException {
+ throws ConnectionException {
LogEntryMock mock = new LogEntryMock( entityId, versions );
mock.initMock( logEntrySerializationStrategy, scope );
@@ -141,19 +155,12 @@ public class LogEntryMock {
}
- public Collection<MvccLogEntry> getEntries() {
- return entries.values();
+ public Collection<MvccLogEntry> getReversedEntries() {
+ return reversedEntries.values();
}
- private static final class ReversedUUIDComparator implements Comparator<UUID> {
-
- public static final ReversedUUIDComparator INSTANCE = new ReversedUUIDComparator();
-
-
- @Override
- public int compare( final UUID o1, final UUID o2 ) {
- return UUIDComparator.staticCompare( o1, o2 ) * -1;
- }
+ public Collection<MvccLogEntry> getEntries() {
+ return entries.values();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/collection/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties
index acf5c39..7b55cf8 100644
--- a/stack/corepersistence/collection/src/test/resources/log4j.properties
+++ b/stack/corepersistence/collection/src/test/resources/log4j.properties
@@ -33,4 +33,5 @@ log4j.logger.cassandra.db=ERROR
#log4j.logger.org.apache.usergrid=DEBUG
#log4j.logger.org.apache.usergrid.persistence.collection=TRACE
+log4j.logger.org.apache.usergrid.persistence.collection.mvcc.stage.delete.VersionCompact=TRACE
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
new file mode 100644
index 0000000..c653458
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/executor/TaskExecutorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.executor;
+
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * A task executor that allows you to submit tasks
+ */
+public class TaskExecutorFactory {
+
+
+ /**
+ * Create a task executor
+ * @param schedulerName
+ * @param maxThreadCount
+ * @param maxQueueSize
+ * @return
+ */
+ public static ThreadPoolExecutor createTaskExecutor( final String schedulerName, final int maxThreadCount,
+ final int maxQueueSize ) {
+
+
+ final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>( maxQueueSize );
+
+
+ final MaxSizeThreadPool threadPool = new MaxSizeThreadPool( queue, schedulerName, maxThreadCount );
+
+
+ return threadPool;
+ }
+
+
+ /**
+ * Create a thread pool that will reject work if our audit tasks become overwhelmed
+ */
+ private static final class MaxSizeThreadPool extends ThreadPoolExecutor {
+
+ public MaxSizeThreadPool( final BlockingQueue<Runnable> queue, final String poolName, final int maxPoolSize ) {
+ super( maxPoolSize, maxPoolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( poolName ) );
+ }
+ }
+
+
+ /**
+ * Thread factory that will name and count threads for easier debugging
+ */
+ private static final class CountingThreadFactory implements ThreadFactory {
+
+ private final AtomicLong threadCounter = new AtomicLong();
+ private final String poolName;
+
+
+ private CountingThreadFactory( final String poolName ) {this.poolName = poolName;}
+
+
+ @Override
+ public Thread newThread( final Runnable r ) {
+ final long newValue = threadCounter.incrementAndGet();
+
+ final String threadName = poolName + "-" + newValue;
+
+ Thread t = new Thread( r, threadName );
+
+ //set it to be a daemon thread so it doesn't block shutdown
+ t.setDaemon( true );
+
+ return t;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
index d6cc5e8..f28e190 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/RxTaskScheduler.java
@@ -20,8 +20,6 @@
package org.apache.usergrid.persistence.core.rx;
-import org.apache.usergrid.persistence.core.task.Task;
-
import rx.Scheduler;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
deleted file mode 100644
index 9007167..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImpl.java
+++ /dev/null
@@ -1,286 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-
-/**
- * Implementation of the task executor with a unique name and size
- */
-public class NamedTaskExecutorImpl implements TaskExecutor {
-
- private static final Logger LOG = org.slf4j.LoggerFactory.getLogger( NamedTaskExecutorImpl.class );
-
- private final ListeningExecutorService executorService;
-
- private final String name;
- private final int poolSize;
-
-
- /**
- * @param name The name of this instance of the task executor
- * @param poolSize The size of the pool. This is the number of concurrent tasks that can execute at once.
- * @param queueLength The length of tasks to keep in the queue
- */
- public NamedTaskExecutorImpl( final String name, final int poolSize, final int queueLength ) {
- Preconditions.checkNotNull( name );
- Preconditions.checkArgument( name.length() > 0, "name must have a length" );
- Preconditions.checkArgument( poolSize > -1, "poolSize must be > than -1" );
- Preconditions.checkArgument( queueLength > -1, "queueLength must be 0 or more" );
-
- this.name = name;
- this.poolSize = poolSize;
-
- // The user has chosen to disable asynchronous execution,
- // to create an executor service that will reject all requests
- if ( poolSize == 0 ) {
- executorService = MoreExecutors.listeningDecorator( new RejectingExecutorService());
- }
-
- //queue executions as normal
- else {
- final BlockingQueue<Runnable> queue = queueLength > 0
- ? new ArrayBlockingQueue<Runnable>(queueLength) : new SynchronousQueue<Runnable>();
-
- executorService = MoreExecutors.listeningDecorator( new MaxSizeThreadPool( queue ) );
- }
- }
-
-
- @Override
- public <V> ListenableFuture<V> submit( final Task<V> task ) {
-
- final ListenableFuture<V> future;
-
- try {
- future = executorService.submit( task );
-
- // Log our success or failures for debugging purposes
- Futures.addCallback( future, new TaskFutureCallBack<V>( task ) );
- }
- catch ( RejectedExecutionException ree ) {
- return Futures.immediateFuture( task.rejected());
- }
-
- return future;
- }
-
-
- @Override
- public void shutdown() {
- this.executorService.shutdownNow();
- }
-
-
- /**
- * Callback for when the task succeeds or fails.
- */
- private static final class TaskFutureCallBack<V> implements FutureCallback<V> {
-
- private final Task<V> task;
-
-
- private TaskFutureCallBack( Task<V> task ) {
- this.task = task;
- }
-
-
- @Override
- public void onSuccess( @Nullable final V result ) {
- LOG.debug( "Successfully completed task ", task );
- }
-
-
- @Override
- public void onFailure( final Throwable t ) {
- LOG.error( "Unable to execute task. Exception is ", t );
-
- task.exceptionThrown( t );
- }
- }
-
-
- /**
- * Create a thread pool that will reject work if our audit tasks become overwhelmed
- */
- private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
- public MaxSizeThreadPool( BlockingQueue<Runnable> queue ) {
-
- super( 1, poolSize, 30, TimeUnit.SECONDS, queue, new CountingThreadFactory( ),
- new RejectedHandler() );
- }
- }
-
-
- /**
- * Thread factory that will name and count threads for easier debugging
- */
- private final class CountingThreadFactory implements ThreadFactory {
-
- private final AtomicLong threadCounter = new AtomicLong();
-
-
- @Override
- public Thread newThread( final Runnable r ) {
- final long newValue = threadCounter.incrementAndGet();
-
- Thread t = new Thread( r, name + "-" + newValue );
-
- t.setDaemon( true );
-
- return t;
- }
- }
-
-
- /**
- * The handler that will handle rejected executions and signal the interface
- */
- private final class RejectedHandler implements RejectedExecutionHandler {
-
-
- @Override
- public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor ) {
- LOG.warn( "{} task queue full, rejecting task {}", name, r );
-
- throw new RejectedExecutionException( "Unable to run task, queue full" );
- }
-
- }
-
-
- /**
- * Executor implementation that simply rejects all incoming tasks
- */
- private static final class RejectingExecutorService implements ExecutorService{
-
- @Override
- public void shutdown() {
-
- }
-
-
- @Override
- public List<Runnable> shutdownNow() {
- return Collections.EMPTY_LIST;
- }
-
-
- @Override
- public boolean isShutdown() {
- return false;
- }
-
-
- @Override
- public boolean isTerminated() {
- return false;
- }
-
-
- @Override
- public boolean awaitTermination( final long timeout, final TimeUnit unit )
- throws InterruptedException {
- return false;
- }
-
-
- @Override
- public <T> Future<T> submit( final Callable<T> task ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> Future<T> submit( final Runnable task, final T result ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public Future<?> submit( final Runnable task ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks )
- throws InterruptedException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> List<Future<T>> invokeAll( final Collection<? extends Callable<T>> tasks,
- final long timeout, final TimeUnit unit ) throws InterruptedException {
-
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> T invokeAny( final Collection<? extends Callable<T>> tasks )
- throws InterruptedException, ExecutionException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public <T> T invokeAny( final Collection<? extends Callable<T>> tasks, final long timeout,
- final TimeUnit unit ) throws InterruptedException, ExecutionException, TimeoutException {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
-
-
- @Override
- public void execute( final Runnable command ) {
- throw new RejectedExecutionException("No Asynchronous tasks allowed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
deleted file mode 100644
index 5582161..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/Task.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import java.util.concurrent.Callable;
-
-
-/**
- * The task to execute
- */
-public interface Task<V> extends Callable<V> {
-
-
- /**
- * Invoked when this task throws an uncaught exception.
- * @param throwable
- */
- void exceptionThrown(final Throwable throwable);
-
- /**
- * Invoked when we weren't able to run this task by the the thread attempting to schedule the task.
- * If this task MUST be run immediately, you can invoke the call method from within this event to invoke the
- * task in the scheduling thread. Note that this has performance implications to the user. If you can drop the
- * request and process later (lazy repair for instance ) do so.
- *
- */
- V rejected();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
deleted file mode 100644
index 5728d2e..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-
-/**
- * An interface for execution of tasks
- */
-public interface TaskExecutor {
-
- /**
- * Submit the task asynchronously
- * @param task
- */
- public <V> ListenableFuture<V> submit( Task<V> task );
-
- /**
- * Stop the task executor without waiting for scheduled threads to run
- */
- public void shutdown();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
deleted file mode 100644
index 4f95918..0000000
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/task/NamedTaskExecutorImplTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.core.task;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-
-/**
- * Tests for the namedtask execution impl
- */
-public class NamedTaskExecutorImplTest {
-
-
- @Test
- public void jobSuccess() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- final Task<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- assertEquals( 0l, exceptionLatch.getCount() );
-
- assertEquals( 0l, rejectedLatch.getCount() );
- }
-
-
- @Test
- public void exceptionThrown() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 1 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- final RuntimeException re = new RuntimeException( "throwing exception" );
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
- throw re;
- }
- };
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
- exceptionLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- assertSame( re, task.exceptions.get( 0 ) );
-
-
- assertEquals( 0l, rejectedLatch.getCount() );
- }
-
-
- @Test
- public void noCapacity() throws InterruptedException {
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", 1, 0 );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
-
- //park this thread so it takes up a task and the next is rejected
- final Object mutex = new Object();
-
- synchronized ( mutex ) {
- mutex.wait();
- }
-
- return null;
- }
- };
-
- executor.submit( task );
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //now submit the second task
-
-
- final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 1l, secondRunLatch.getCount() );
- assertEquals( 0l, secondExceptionLatch.getCount() );
- }
-
-
- @Test
- public void noCapacityWithQueue() throws InterruptedException {
-
- final int threadPoolSize = 1;
- final int queueSize = 10;
-
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 0 );
- final CountDownLatch runLatch = new CountDownLatch( 1 );
-
- int iterations = threadPoolSize + queueSize;
-
- for(int i = 0; i < iterations; i ++) {
-
- final TestTask<Void> task = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {
- @Override
- public Void call() throws Exception {
- super.call();
-
- //park this thread so it takes up a task and the next is rejected
- final Object mutex = new Object();
-
- synchronized ( mutex ) {
- mutex.wait();
- }
-
- return null;
- }
- };
- executor.submit( task );
- }
-
-
-
- runLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //now submit the second task
-
-
- final CountDownLatch secondRejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch secondExceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch secondRunLatch = new CountDownLatch( 1 );
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- secondRejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 1l, secondRunLatch.getCount() );
- assertEquals( 0l, secondExceptionLatch.getCount() );
- }
-
-
- @Test
- public void rejectingTaskExecutor() throws InterruptedException {
-
- final int threadPoolSize = 0;
- final int queueSize = 0;
-
- final TaskExecutor executor = new NamedTaskExecutorImpl( "jobSuccess", threadPoolSize, queueSize );
-
- final CountDownLatch exceptionLatch = new CountDownLatch( 0 );
- final CountDownLatch rejectedLatch = new CountDownLatch( 1 );
- final CountDownLatch runLatch = new CountDownLatch( 0 );
-
-
- //now submit the second task
-
-
-
- final TestTask<Void> testTask = new TestTask<Void>( exceptionLatch, rejectedLatch, runLatch ) {};
-
- executor.submit( testTask );
-
-
- //should be immediately rejected
- rejectedLatch.await( 1000, TimeUnit.MILLISECONDS );
-
- //if we get here we've been rejected, just double check we didn't run
-
- assertEquals( 0l, exceptionLatch.getCount() );
- assertEquals( 0l, runLatch.getCount() );
- }
-
-
- private static abstract class TestTask<V> implements Task<V> {
-
- private final List<Throwable> exceptions;
- private final CountDownLatch exceptionLatch;
- private final CountDownLatch rejectedLatch;
- private final CountDownLatch runLatch;
-
-
- private TestTask( final CountDownLatch exceptionLatch, final CountDownLatch rejectedLatch,
- final CountDownLatch runLatch ) {
- this.exceptionLatch = exceptionLatch;
- this.rejectedLatch = rejectedLatch;
- this.runLatch = runLatch;
-
- this.exceptions = new ArrayList<>();
- }
-
-
-
- @Override
- public void exceptionThrown( final Throwable throwable ) {
- this.exceptions.add( throwable );
- exceptionLatch.countDown();
- }
-
-
- @Override
- public V rejected() {
- rejectedLatch.countDown();
- return null;
- }
-
-
- @Override
- public V call() throws Exception {
- runLatch.countDown();
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index 894e74a..2a153e2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -31,46 +31,46 @@ import org.safehaus.guicyfig.Key;
public interface GraphFig extends GuicyFig {
- public static final String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
+ String SCAN_PAGE_SIZE = "usergrid.graph.scan.page.size";
- public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
+ String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
/**
* The size of the shards. This is approximate, and should be set lower than what you would like your max to be
*/
- public static final String SHARD_SIZE = "usergrid.graph.shard.size";
+ String SHARD_SIZE = "usergrid.graph.shard.size";
/**
* Number of shards we can cache.
*/
- public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
+ String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
/**
* Get the cache timeout. The local cache will exist for this amount of time max (in millis).
*/
- public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
+ String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
/**
* Number of worker threads to refresh the cache
*/
- public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
+ String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
/**
* The size of the worker count for shard auditing
*/
- public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
+ String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
/**
* The size of the worker count for shard auditing
*/
- public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+ String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
- public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+ String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
/**
@@ -80,14 +80,14 @@ public interface GraphFig extends GuicyFig {
* Note that you should also pad this for node clock drift. A good value for this would be 2x the shard cache
* timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
*/
- public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
+ String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
- public static final String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
+ String COUNTER_WRITE_FLUSH_COUNT = "usergrid.graph.shard.counter.beginFlush.count";
- public static final String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
+ String COUNTER_WRITE_FLUSH_INTERVAL = "usergrid.graph.shard.counter.beginFlush.interval";
- public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
+ String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
index 4c38c13..987a36c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManager.java
@@ -64,23 +64,23 @@ public interface GraphManager extends CPManager {
/**
- * @param edge The edge to delete
+ * @param edge Mark the edge as deleted in the graph
*
*
* EdgeDelete the edge. Implementation should also delete the incoming (reversed) edge. Only deletes the specific version
*/
- Observable<Edge> deleteEdge( Edge edge );
+ Observable<Edge> markEdge( Edge edge );
/**
*
- * Remove the node from the graph.
+ * Mark the node as removed from the graph.
*
* @param node The node to remove
* @param timestamp The timestamp to apply the delete operation. Any edges connected to this node with a timestmap
* <= the specified time will be removed from the graph
* @return
*/
- Observable<Id> deleteNode(Id node, long timestamp);
+ Observable<Id> markNode( Id node, long timestamp );
/**
* Get all versions of this edge where versions <= max version
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
index eb49711..d73f767 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphManagerFactory.java
@@ -36,7 +36,7 @@ public interface GraphManagerFactory
*
* @param collectionScope The context to use when creating the graph manager
*/
- public GraphManager createEdgeManager( ApplicationScope collectionScope );
+ GraphManager createEdgeManager( ApplicationScope collectionScope );
void invalidate();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 114440f..38f62b5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -59,6 +59,12 @@ public interface SearchByEdge {
long getMaxTimestamp();
/**
+ * Return true if we should filter edges marked for deletion
+ * @return
+ */
+ boolean filterMarked();
+
+ /**
* The optional start parameter. All edges emitted with be > the specified start edge.
* This is useful for paging. Simply use the last value returned in the previous call in the start parameter
* @return
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 5e47ae0..f213b00 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -65,6 +65,12 @@ public interface SearchByEdgeType {
*/
Order getOrder();
+ /**
+ * Return true to filter marked edges from the results
+ * @return
+ */
+ boolean filterMarked();
+
/**
* Options for ordering. By default, we want to perform descending for common use cases and read speed. This is our our data
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 6cdaef0..4b628d1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -28,8 +28,6 @@ import org.apache.usergrid.persistence.core.migration.data.MigrationPlugin;
import org.apache.usergrid.persistence.core.migration.data.MigrationRelationship;
import org.apache.usergrid.persistence.core.migration.data.VersionedMigrationSet;
import org.apache.usergrid.persistence.core.migration.schema.Migration;
-import org.apache.usergrid.persistence.core.task.NamedTaskExecutorImpl;
-import org.apache.usergrid.persistence.core.task.TaskExecutor;
import org.apache.usergrid.persistence.graph.GraphFig;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.impl.stage.EdgeDeleteListener;
@@ -196,15 +194,6 @@ public abstract class GraphModule extends AbstractModule {
}
- @Inject
- @Singleton
- @Provides
- @GraphTaskExecutor
- public TaskExecutor graphTaskExecutor( final GraphFig graphFig ) {
- return new NamedTaskExecutorImpl( "graphTaskExecutor", graphFig.getShardAuditWorkerCount(),
- graphFig.getShardAuditWorkerQueueSize() );
- }
-
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 9c0c62d..9a8a00f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -120,12 +120,9 @@ public class GraphManagerImpl implements GraphManager {
@Inject
public GraphManagerImpl( final EdgeMetadataSerialization edgeMetadataSerialization,
final EdgeSerialization storageEdgeSerialization,
- final NodeSerialization nodeSerialization,
- final GraphFig graphFig,
- final EdgeDeleteListener edgeDeleteListener,
- final NodeDeleteListener nodeDeleteListener,
- final ApplicationScope scope,
- MetricsFactory metricsFactory) {
+ final NodeSerialization nodeSerialization, final GraphFig graphFig,
+ final EdgeDeleteListener edgeDeleteListener, final NodeDeleteListener nodeDeleteListener,
+ final ApplicationScope scope, MetricsFactory metricsFactory ) {
ValidationUtils.validateApplicationScope( scope );
@@ -146,36 +143,34 @@ public class GraphManagerImpl implements GraphManager {
this.edgeDeleteSubcriber = MetricSubscriber.INSTANCE;
this.nodeDelete = MetricSubscriber.INSTANCE;
- this.writeEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "write.edge.meter");
- this.writeEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "write.edge.timer");
- this.deleteEdgeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.edge.meter");
- this.deleteEdgeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.edge.timer");
- this.deleteNodeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "delete.node.meter");
- this.deleteNodeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "delete.node.timer");
- this.loadEdgesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.meter");
- this.loadEdgesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.timer");
- this.loadEdgesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.meter");
- this.loadEdgesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.timer");
- this.loadEdgesVersionsMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.versions.meter");
- this.loadEdgesVersionsTimer = metricsFactory.getTimer(GraphManagerImpl.class,"load.versions.timer");
- this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.from.type.meter");
- this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.from.type.timer");
- this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter(GraphManagerImpl.class, "load.to.type.meter");
- this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer(GraphManagerImpl.class, "load.to.type.timer");
-
- this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.from.timer");
- this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.from.meter");
-
- this.getIdTypesFromSourceTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.idtype.from.timer");
- this.getIdTypesFromSourceMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.from.meter");
-
- this.getEdgeTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class,"get.edge.to.timer");
- this.getEdgeTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.edge.to.meter");
-
- this.getIdTypesToTargetTimer = metricsFactory.getTimer(GraphManagerImpl.class, "get.idtype.to.timer");
- this.getIdTypesToTargetMeter = metricsFactory.getMeter(GraphManagerImpl.class, "get.idtype.to.meter");
-
-
+ this.writeEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "write.edge.meter" );
+ this.writeEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "write.edge.timer" );
+ this.deleteEdgeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.edge.meter" );
+ this.deleteEdgeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.edge.timer" );
+ this.deleteNodeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "delete.node.meter" );
+ this.deleteNodeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "delete.node.timer" );
+ this.loadEdgesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.meter" );
+ this.loadEdgesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.timer" );
+ this.loadEdgesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.meter" );
+ this.loadEdgesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.timer" );
+ this.loadEdgesVersionsMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.versions.meter" );
+ this.loadEdgesVersionsTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.versions.timer" );
+ this.loadEdgesFromSourceByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.from.type.meter" );
+ this.loadEdgesFromSourceByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.from.type.timer" );
+ this.loadEdgesToTargetByTypeMeter = metricsFactory.getMeter( GraphManagerImpl.class, "load.to.type.meter" );
+ this.loadEdgesToTargetByTypeTimer = metricsFactory.getTimer( GraphManagerImpl.class, "load.to.type.timer" );
+
+ this.getEdgeTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.from.timer" );
+ this.getEdgeTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.from.meter" );
+
+ this.getIdTypesFromSourceTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.from.timer" );
+ this.getIdTypesFromSourceMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.from.meter" );
+
+ this.getEdgeTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.edge.to.timer" );
+ this.getEdgeTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.edge.to.meter" );
+
+ this.getIdTypesToTargetTimer = metricsFactory.getTimer( GraphManagerImpl.class, "get.idtype.to.timer" );
+ this.getIdTypesToTargetMeter = metricsFactory.getMeter( GraphManagerImpl.class, "get.idtype.to.meter" );
}
@@ -209,41 +204,39 @@ public class GraphManagerImpl implements GraphManager {
return edge;
}
- } )
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( new Action1<Notification<? super Edge>>() {
+ @Override
+ public void call( Notification<? super Edge> notification ) {
+ meter.mark();
+ }
+ } ).doOnCompleted( new Action0() {
+ @Override
+ public void call() {
+ timer.stop();
+ }
+ } );
}
@Override
- public Observable<Edge> deleteEdge( final Edge edge ) {
- GraphValidation.validateEdge(edge);
+ public Observable<Edge> markEdge( final Edge edge ) {
+ GraphValidation.validateEdge( edge );
- final MarkedEdge markedEdge = new SimpleMarkedEdge(edge, true);
+ final MarkedEdge markedEdge = new SimpleMarkedEdge( edge, true );
final Timer.Context timer = deleteEdgeTimer.time();
final Meter meter = deleteEdgeMeter;
- return Observable.just(markedEdge).map(new Func1<MarkedEdge, Edge>() {
+ return Observable.just( markedEdge ).map( new Func1<MarkedEdge, Edge>() {
@Override
- public Edge call(final MarkedEdge edge) {
+ public Edge call( final MarkedEdge edge ) {
final UUID timestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge(scope, edge, timestamp);
+ final MutationBatch edgeMutation = storageEdgeSerialization.writeEdge( scope, edge, timestamp );
- LOG.debug("Marking edge {} as deleted to commit log", edge);
+ LOG.debug( "Marking edge {} as deleted to commit log", edge );
try {
edgeMutation.execute();
}
@@ -254,73 +247,50 @@ public class GraphManagerImpl implements GraphManager {
//HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
// timestamp )).subscribeOn( Schedulers.io() ).subscribe( edgeDeleteSubcriber );
- edgeDeleteListener.receive(scope, markedEdge, timestamp).subscribeOn(Schedulers.io())
- .subscribe(edgeDeleteSubcriber);
+ edgeDeleteListener.receive( scope, markedEdge, timestamp ).subscribeOn( Schedulers.io() )
+ .subscribe( edgeDeleteSubcriber );
return edge;
}
- })
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@Override
- public Observable<Id> deleteNode( final Id node, final long timestamp ) {
+ public Observable<Id> markNode( final Id node, final long timestamp ) {
final Timer.Context timer = deleteNodeTimer.time();
final Meter meter = deleteNodeMeter;
- return Observable.just( node ).map( new Func1<Id, Id>() {
- @Override
- public Id call( final Id id ) {
+ return Observable.just( node ).map( id -> {
- //mark the node as deleted
+ //mark the node as deleted
- final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
+ final UUID eventTimestamp = UUIDGenerator.newTimeUUID();
- final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
+ final MutationBatch nodeMutation = nodeSerialization.mark( scope, id, timestamp );
- LOG.debug( "Marking node {} as deleted to node mark", node );
- try {
- nodeMutation.execute();
- }
- catch ( ConnectionException e ) {
- throw new RuntimeException( "Unable to execute mutation", e );
- }
+ LOG.debug( "Marking node {} as deleted to node mark", node );
+ try {
+ nodeMutation.execute();
+ }
+ catch ( ConnectionException e ) {
+ throw new RuntimeException( "Unable to execute mutation", e );
+ }
- //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
- // Schedulers.io() ).subscribe( nodeDelete );
- nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() )
- .subscribe( nodeDelete );
+ //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp )).subscribeOn(
+ // Schedulers.io() ).subscribe( nodeDelete );
+ nodeDeleteListener.receive( scope, id, eventTimestamp ).subscribeOn( Schedulers.io() )
+ .subscribe( nodeDelete );
- return id;
- }
- } )
- .doOnEach(new Action1<Notification<? super Id>>() {
- @Override
- public void call(Notification<? super Id> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ return id;
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -333,20 +303,11 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgeVersions( scope, searchByEdge );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(searchByEdge.getMaxTimestamp()))
- .cast(Edge.class)
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( searchByEdge.getMaxTimestamp(), searchByEdge.filterMarked() ) )
+ .cast( Edge.class ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -359,20 +320,11 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSource( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
- .cast(Edge.class)
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
+ .doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -385,20 +337,11 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTarget( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
- .cast(Edge.class)
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
+ .doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -411,21 +354,12 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) )
- .cast(Edge.class)
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ .cast( Edge.class ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -438,20 +372,11 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<MarkedEdge> getIterator() {
return storageEdgeSerialization.getEdgesToTargetBySourceType( scope, search );
}
- } ).buffer( graphFig.getScanPageSize() ).flatMap(new EdgeBufferFilter(search.getMaxTimestamp()))
- .cast(Edge.class)
- .doOnEach(new Action1<Notification<? super Edge>>() {
- @Override
- public void call(Notification<? super Edge> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).buffer( graphFig.getScanPageSize() )
+ .flatMap( new EdgeBufferFilter( search.getMaxTimestamp(), search.filterMarked() ) ).cast( Edge.class )
+ .doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -464,19 +389,9 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesFromSource( scope, search );
}
- } )
- .doOnEach(new Action1<Notification<? super String>>() {
- @Override
- public void call(Notification<? super String> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -489,19 +404,9 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesFromSource( scope, search );
}
- } )
- .doOnEach(new Action1<Notification<? super String>>() {
- @Override
- public void call(Notification<? super String> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -514,19 +419,9 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getEdgeTypesToTarget( scope, search );
}
- } )
- .doOnEach(new Action1<Notification<? super String>>() {
- @Override
- public void call(Notification<? super String> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -539,19 +434,9 @@ public class GraphManagerImpl implements GraphManager {
protected Iterator<String> getIterator() {
return edgeMetadataSerialization.getIdTypesToTarget( scope, search );
}
- } )
- .doOnEach(new Action1<Notification<? super String>>() {
- @Override
- public void call(Notification<? super String> notification) {
- meter.mark();
- }
- })
- .doOnCompleted(new Action0() {
- @Override
- public void call() {
- timer.stop();
- }
- });
+ } ).doOnEach( notification -> {
+ meter.mark();
+ } ).doOnCompleted( () -> timer.stop() );
}
@@ -561,10 +446,12 @@ public class GraphManagerImpl implements GraphManager {
private class EdgeBufferFilter implements Func1<List<MarkedEdge>, Observable<MarkedEdge>> {
private final long maxVersion;
+ private final boolean filterMarked;
- private EdgeBufferFilter( final long maxVersion ) {
+ private EdgeBufferFilter( final long maxVersion, final boolean filterMarked ) {
this.maxVersion = maxVersion;
+ this.filterMarked = filterMarked;
}
@@ -579,57 +466,36 @@ public class GraphManagerImpl implements GraphManager {
public Observable<MarkedEdge> call( final List<MarkedEdge> markedEdges ) {
final Map<Id, Long> markedVersions = nodeSerialization.getMaxVersions( scope, markedEdges );
- return Observable.from( markedEdges ).filter( new EdgeFilter( this.maxVersion, markedVersions ) );
- }
- }
-
-
- /**
- * Filter the returned values based on the max uuid and if it's been marked for deletion or not
- */
- private static class EdgeFilter implements Func1<MarkedEdge, Boolean> {
-
- private final long maxTimestamp;
-
- private final Map<Id, Long> markCache;
-
-
- private EdgeFilter( final long maxTimestamp, Map<Id, Long> markCache ) {
- this.maxTimestamp = maxTimestamp;
- this.markCache = markCache;
- }
-
-
- @Override
- public Boolean call( final MarkedEdge edge ) {
-
+ final long maxTimestamp = maxVersion;
- final long edgeTimestamp = edge.getTimestamp();
+ return Observable.from( markedEdges ).filter( edge -> {
+ final long edgeTimestamp = edge.getTimestamp();
- //our edge needs to not be deleted and have a version that's > max Version
- if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
- return false;
- }
+ //our edge needs to not be deleted and have a version that's > max Version
+ if ( edge.isDeleted() || Long.compare( edgeTimestamp, maxTimestamp ) > 0 ) {
+ return false;
+ }
- final Long sourceTimestamp = markCache.get( edge.getSourceNode() );
+ final Long sourceTimestamp = markedVersions.get( edge.getSourceNode() );
- //the source Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
- return false;
- }
+ //the source Id has been marked for deletion. It's version is <= to the marked version for deletion,
+ // so we need to discard it
+ if ( sourceTimestamp != null && Long.compare( edgeTimestamp, sourceTimestamp ) < 1 ) {
+ return false;
+ }
- final Long targetTimestamp = markCache.get( edge.getTargetNode() );
+ final Long targetTimestamp = markedVersions.get( edge.getTargetNode() );
- //the target Id has been marked for deletion. It's version is <= to the marked version for deletion,
- // so we need to discard it
- if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
- return false;
- }
+ //the target Id has been marked for deletion. It's version is <= to the marked version for deletion,
+ // so we need to discard it
+ if ( targetTimestamp != null && Long.compare( edgeTimestamp, targetTimestamp ) < 1 ) {
+ return false;
+ }
- return true;
+ return true;
+ } );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index a28a0bb..9a23caf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -33,7 +33,6 @@ import com.google.common.base.Preconditions;
/**
* Simple bean implementation of search by edge
- *
*/
public class SimpleSearchByEdge implements SearchByEdge {
@@ -43,24 +42,44 @@ public class SimpleSearchByEdge implements SearchByEdge {
private final long maxTimestamp;
private final Optional<Edge> last;
private final SearchByEdgeType.Order order;
+ private final boolean filterMarked;
/**
* Create the search modules
+ *
* @param sourceNode The source node of the edge
* @param targetNode The target node of the edge
* @param type The edge type
* @param maxTimestamp The maximum timestamp to seek from
* @param last The value to start seeking from. Must be >= this value
*/
- public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Optional<Edge> last ) {
+ public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp,
+ final SearchByEdgeType.Order order, final Optional<Edge> last ) {
+ this( sourceNode, type, targetNode, maxTimestamp, order, last, true );
+ }
+
+
+ /**
+ * Create the search modules
+ *
+ * @param sourceNode The source node of the edge
+ * @param type The edge type
+ * @param targetNode The target node of the edge
+ * @param maxTimestamp The maximum timestamp to seek from
+ * @param last The value to start seeking from. Must be >= this value
+ */
+ public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp,
+ final SearchByEdgeType.Order order, final Optional<Edge> last,
+ final boolean filterMarked ) {
- ValidationUtils.verifyIdentity(sourceNode);
- ValidationUtils.verifyIdentity(targetNode);
- ValidationUtils.verifyString(type, "type");
- GraphValidation.validateTimestamp(maxTimestamp, "maxTimestamp");
- Preconditions.checkNotNull(order, "order must not be null");
- Preconditions.checkNotNull(last, "last can never be null");
+
+ ValidationUtils.verifyIdentity( sourceNode );
+ ValidationUtils.verifyIdentity( targetNode );
+ ValidationUtils.verifyString( type, "type" );
+ GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+ Preconditions.checkNotNull( order, "order must not be null" );
+ Preconditions.checkNotNull( last, "last can never be null" );
this.sourceNode = sourceNode;
@@ -69,6 +88,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
this.maxTimestamp = maxTimestamp;
this.order = order;
this.last = last;
+ this.filterMarked = filterMarked;
}
@@ -97,6 +117,10 @@ public class SimpleSearchByEdge implements SearchByEdge {
@Override
+ public boolean filterMarked() { return filterMarked; }
+
+
+ @Override
public Optional<Edge> last() {
return last;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/3e2afe23/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 1687162..9392dbc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -41,6 +41,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
private final long maxTimestamp;
private final Optional<Edge> last;
private final Order order;
+ private final boolean filterMarked;
/**
@@ -55,7 +56,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
* //TODO, make last an optional
*/
public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last ) {
- this(node, type, maxTimestamp, order, Optional.fromNullable(last));
+ this(node, type, maxTimestamp, order, Optional.fromNullable(last), true);
}
@@ -70,7 +71,24 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
*
* //TODO, make last an optional
*/
- public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Optional<Edge> last ) {
+ public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order,
+ final Optional<Edge> last ) {
+ this( node, type, maxTimestamp, order, last, true );
+ }
+
+
+ /**
+ * Create the search modules
+ * @param node The node to search from
+ * @param type The edge type
+ * @param maxTimestamp The maximum timestamp to return
+ * @param order The order order. Descending is most efficient
+ * @param last The value to start seeking from. Must be >= this value
+ * @param filterMarked
+ */
+ public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order,
+ final Optional<Edge> last, final boolean filterMarked ) {
+
Preconditions.checkNotNull( order, "order is required");
ValidationUtils.verifyIdentity( node );
@@ -84,6 +102,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
this.maxTimestamp = maxTimestamp;
this.order = order;
this.last = last;
+ this.filterMarked = filterMarked;
}
@@ -118,6 +137,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
@Override
+ public boolean filterMarked() {
+ return filterMarked;
+ }
+
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;