You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/21 03:24:58 UTC
[2/3] incubator-usergrid git commit: Refactors operations into easier
build pattern. Pipeline still need some work.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
index 84654aa..91773c4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/collect/ResultsPageCollector.java
@@ -23,12 +23,12 @@ package org.apache.usergrid.corepersistence.pipeline.read.collect;
import java.util.ArrayList;
import java.util.List;
+import org.apache.usergrid.corepersistence.pipeline.PipelineContext;
import org.apache.usergrid.corepersistence.pipeline.cursor.ResponseCursor;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.model.entity.Entity;
import com.google.common.base.Optional;
@@ -37,21 +37,32 @@ import rx.Observable;
/**
* Takes entities and collects them into results. This mostly exists for 1.0 compatibility. Eventually this will
- * become the only collector in our pipline and be used when rendering results, both on GET, PUT and POST.
+ * become the only collector in our pipeline and be used when rendering results, both on GET, PUT and POST.
+ *
+ *
+ *
+ * @param T the type of element to be collected
*/
-public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
- implements Collector<Entity, ResultsPage> {
+public class ResultsPageCollector<T> extends AbstractFilter<FilterResult<T>, ResultsPage<T>> {
+
+
+ protected PipelineContext pipelineContext;
+
+
+ @Override
+ public void setContext( final PipelineContext pipelineContext ) {
+ this.pipelineContext = pipelineContext;
+ }
+
@Override
- public Observable<ResultsPage> call( final Observable<FilterResult<Entity>> filterResultObservable ) {
+ public Observable<ResultsPage<T>> call( final Observable<FilterResult<T>> filterResultObservable ) {
final int limit = pipelineContext.getLimit();
return filterResultObservable.buffer( limit ).flatMap( buffer -> Observable.from( buffer ).collect(
- () -> new ResultsPageWithCursorCollector( limit ), ( collector, entity ) -> {
- collector.add( entity );
- } ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
+ () -> new ResultsPageWithCursorCollector( limit ), ( collector, element ) -> collector.add( element ) ) ).map( resultsPageCollector -> new ResultsPage( resultsPageCollector.results,
new ResponseCursor( resultsPageCollector.lastPath ), pipelineContext.getLimit() ) );
}
@@ -59,10 +70,10 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
/**
* A collector that will aggregate our results together
*/
- private static class ResultsPageWithCursorCollector {
+ private class ResultsPageWithCursorCollector {
- private final List<Entity> results;
+ private final List<T> results;
private Optional<EdgePath> lastPath;
@@ -72,7 +83,7 @@ public class ResultsPageCollector extends AbstractCollector<Entity, ResultsPage>
}
- public void add( final FilterResult<Entity> result ) {
+ public void add( final FilterResult<T> result ) {
this.results.add( result.getValue() );
this.lastPath = result.getPath();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
deleted file mode 100644
index f403e21..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/AbstractElasticSearchFilter.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.codahale.metrics.Timer;
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * Command for reading graph edges
- */
-public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer>
- implements Filter<Id, Candidate> {
-
- private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
-
- private final EntityIndexFactory entityIndexFactory;
- private final String query;
- private final Timer searchTimer;
-
-
- /**
- * Create a new instance of our command
- */
- public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory,
- final MetricsFactory metricsFactory, final String query ) {
- this.entityIndexFactory = entityIndexFactory;
- this.query = query;
- this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" );
- }
-
-
- @Override
- public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
-
- //get the graph manager
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() );
-
-
- final int limit = pipelineContext.getLimit();
-
-
- final SearchTypes searchTypes = getSearchTypes();
-
-
- //return all ids that are emitted from this edge
- return observable.flatMap( idFilterResult -> {
-
- final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
-
-
- final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
-
- //our offset to our start value. This will be set the first time we emit
- //after we receive new ids, we want to reset this to 0
- //set our our constant state
- final Optional<Integer> startFromCursor = getSeekValue();
-
- final int startOffset = startFromCursor.or( 0 );
-
- int currentOffSet = startOffset;
-
- subscriber.onStart();
-
- //emit while we have values from ES and someone is subscribed
- while ( !subscriber.isUnsubscribed() ) {
-
-
- try {
- final CandidateResults candidateResults =
- applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
-
-
-
- for( CandidateResult candidateResult: candidateResults){
-
- //our subscriber unsubscribed, break out
- if(subscriber.isUnsubscribed()){
- return;
- }
-
- final Candidate candidate = new Candidate( candidateResult, searchEdge );
-
- final FilterResult<Candidate>
- result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
-
- subscriber.onNext( result );
-
- currentOffSet++;
- }
-
- /**
- * No candidates, we're done
- */
- if (candidateResults.size() < limit) {
- subscriber.onCompleted();
- return;
- }
-
- }
- catch ( Throwable t ) {
-
- log.error( "Unable to search candidates", t );
- subscriber.onError( t );
- }
- }
- } );
-
-
- //add a timer around our observable
- ObservableTimer.time( candidates, searchTimer );
-
- return candidates;
- } );
- }
-
-
- @Override
- protected CursorSerializer<Integer> getCursorSerializer() {
- return ElasticsearchCursorSerializer.INSTANCE;
- }
-
-
- /**
- * Get the search edge from the id
- */
- protected abstract SearchEdge getSearchEdge( final Id id );
-
- /**
- * Get the search types
- */
- protected abstract SearchTypes getSearchTypes();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
deleted file mode 100644
index ab9d5d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Candidate.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Create a candidate. This holds the original candidate, as well as the search scope it was found it
- */
-public class Candidate {
-
- private final CandidateResult candidateResult;
- private final SearchEdge searchEdge;
-
-
- /**
- * Create a new Candidate for further processing
- * @param candidateResult The candidate result
- * @param searchEdge The search edge this was searched on
- */
- public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
- this.candidateResult = candidateResult;
- this.searchEdge = searchEdge;
- }
-
-
- public CandidateResult getCandidateResult() {
- return candidateResult;
- }
-
-
- public SearchEdge getSearchEdge() {
- return searchEdge;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
deleted file mode 100644
index 4304b37..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateEntityFilter.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResult emissions into entities, then streams them on
- * performs internal buffering for efficiency. Note that all entities may not be emitted if our load crosses page boundaries. It is up to the
- * collector to determine when to stop streaming entities.
- */
-public class CandidateEntityFilter extends AbstractFilter<Candidate, Entity>
- implements Filter<Candidate, Entity> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateEntityFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
- @Override
- public Observable<FilterResult<Entity>> call(
- final Observable<FilterResult<Candidate>> candidateResultsObservable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- //buffer them to get a page size we can make 1 network hop
- final Observable<FilterResult<Entity>> searchIdSetObservable = candidateResultsObservable.buffer( pipelineContext.getLimit() )
-
- //load them
- .flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds =
- Observable.from( candidateResults ).map( filterResultCandidate -> filterResultCandidate.getValue().getCandidateResult().getId() ).toList();
-
- //load the ids
- final Observable<EntitySet> entitySetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return entitySetObservable.map(
- entitySet -> new EntityVerifier( applicationIndex.createBatch(), entitySet,
- candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() )
- .flatMap(
- entityCollector -> Observable.from( entityCollector.getResults() ) );
- } );
-
- //if we filter all our results, we want to continue to try the next page
- return searchIdSetObservable;
- }
-
-
-
-
- /**
- * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
- */
- private static final class EntityVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
- private List<FilterResult<Entity>> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final List<FilterResult<Candidate>> candidateResults;
- private final EntitySet entitySet;
-
-
- public EntityVerifier( final EntityIndexBatch batch, final EntitySet entitySet,
- final List<FilterResult<Candidate>> candidateResults ) {
- this.batch = batch;
- this.entitySet = entitySet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( entitySet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public List<FilterResult<Entity>> getResults() {
- return results;
- }
-
-
- public EntityIndexBatch getBatch() {
- return batch;
- }
-
-
- private void validate( final FilterResult<Candidate> filterResult ) {
-
- final Candidate candidate = filterResult.getValue();
- final CandidateResult candidateResult = candidate.getCandidateResult();
- final SearchEdge searchEdge = candidate.getSearchEdge();
- final Id candidateId = candidateResult.getId();
- final UUID candidateVersion = candidateResult.getVersion();
-
-
- final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
- //doesn't exist warn and drop
- if ( entity == null ) {
- logger.warn(
- "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue",
- candidateId, candidateVersion );
-
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
-
- }
-
-
- final UUID entityVersion = entity.getVersion();
- final Id entityId = entity.getId();
-
-
-
-
-
- //entity is newer than ES version, could be an update or the entity is marked as deleted
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 || !entity.getEntity().isPresent()) {
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, candidateVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
- }
-
- //they're the same add it
-
- final Entity returnEntity = entity.getEntity().get();
-
- final Optional<EdgePath> parent = filterResult.getPath();
-
- final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
-
- results.add( toReturn );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
deleted file mode 100644
index 0e87141..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions Input is a batch of
- * candidate results, output is a stream of validated Ids
- */
-public class CandidateIdFilter extends AbstractFilter<Candidate, Id> implements Filter<Candidate, Id> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
- @Override
- public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- final Observable<FilterResult<Id>> searchIdSetObservable =
- filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds = Observable.from( candidateResults ).map(
- candidate -> candidate.getValue().getCandidateResult().getId() ).toList();
-
- //load the ids
- final Observable<VersionSet> versionSetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return versionSetObservable.map(
- entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet,
- candidateResults ) ).doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
- entityCollector -> Observable.from( entityCollector.collectResults() ) );
- } );
-
- return searchIdSetObservable;
- }
-
-
- /**
- * Map a new cp entity to an old entity. May be null if not present
- */
- private static final class EntityCollector {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
- private List<FilterResult<Id>> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final List<FilterResult<Candidate>> candidateResults;
- private final VersionSet versionSet;
-
-
- public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
- final List<FilterResult<Candidate>> candidateResults ) {
- this.batch = batch;
- this.versionSet = versionSet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( versionSet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public List<FilterResult<Id>> collectResults() {
- return results;
- }
-
-
- /**
- * Validate each candidate results vs the data loaded from cass
- */
- private void validate( final FilterResult<Candidate> filterCandidate ) {
-
- final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
-
- final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
-
- final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
- final UUID candidateVersion = candidateResult.getVersion();
-
- final UUID entityVersion = logEntry.getVersion();
-
- final Id entityId = logEntry.getEntityId();
-
- //entity is newer than ES version
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, entityVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
- }
-
- //they're the same add it
-
- final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
-
- results.add( result );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
deleted file mode 100644
index 702b2d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchCollectionFilter.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createCollectionSearchEdge;
-
-
-public class ElasticSearchCollectionFilter extends AbstractElasticSearchFilter {
-
- private final String collectionName;
- private final String entityType;
-
- /**
- * Create a new instance of our command
- *
- * @param entityIndexFactory The entity index factory used to search
- * @param metricsFactory The metrics factory for metrics
- * @param collectionName The name of the collection
- * @param entityType The entity type
- */
- @Inject
- public ElasticSearchCollectionFilter( final EntityIndexFactory entityIndexFactory,
- final MetricsFactory metricsFactory, @Assisted( "query" ) final String query,
- @Assisted( "collectionName" ) final String collectionName,
- @Assisted( "entityType" ) final String entityType ) {
- super( entityIndexFactory, metricsFactory, query );
- this.collectionName = collectionName;
- this.entityType = entityType;
- }
-
-
-
- @Override
- protected SearchTypes getSearchTypes() {
- final SearchTypes types = SearchTypes.fromTypes( entityType );
-
- return types;
- }
-
-
- @Override
- protected SearchEdge getSearchEdge( final Id incomingId ) {
- final SearchEdge searchEdge = createCollectionSearchEdge( incomingId, collectionName );
-
- return searchEdge;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
deleted file mode 100644
index cc40530..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticSearchConnectionFilter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createConnectionSearchEdge;
-
-
-public class ElasticSearchConnectionFilter extends AbstractElasticSearchFilter {
-
-
- private final String connectionName;
- private final Optional<String> connectedEntityType;
-
-
- /**
- * Create a new instance of our command
- */
- @Inject
- public ElasticSearchConnectionFilter( final EntityIndexFactory entityIndexFactory,
- final MetricsFactory metricsFactory, @Assisted( "query" ) final String query,
- @Assisted( "connectionName" ) final String connectionName,
- @Assisted( "connectedEntityType" )
- final Optional<String> connectedEntityType ) {
- super( entityIndexFactory, metricsFactory, query );
-
- this.connectionName = connectionName;
- this.connectedEntityType = connectedEntityType;
- }
-
-
- @Override
- protected SearchTypes getSearchTypes() {
- final SearchTypes searchTypes = SearchTypes.fromNullableTypes( connectedEntityType.orNull() );
-
- return searchTypes;
- }
-
-
- @Override
- protected SearchEdge getSearchEdge( final Id id ) {
- final SearchEdge searchEdge = createConnectionSearchEdge( id, connectionName );
-
- return searchEdge;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
deleted file mode 100644
index a4e7746..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/ElasticsearchCursorSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-
-
-/**
- * ElasticSearch cursor serializer
- */
-public class ElasticsearchCursorSerializer extends AbstractCursorSerializer<Integer> {
-
-
- public static final ElasticsearchCursorSerializer INSTANCE = new ElasticsearchCursorSerializer();
-
- @Override
- protected Class<Integer> getType() {
- return Integer.class;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg
deleted file mode 100644
index 08970e3..0000000
Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/Elasticsearchdiagram.jpg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
deleted file mode 100644
index 42b352b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdge;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * Filter should take and Id and a graph edge, and ensure the connection between the two exists
- */
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
- Filter<Id, Id> {
-
- private final GraphManagerFactory graphManagerFactory;
- private final Id targetId;
-
-
- @Inject
- public AbstractReadGraphEdgeByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final Id
- targetId ) {
- this.graphManagerFactory = graphManagerFactory;
- this.targetId = targetId;
- }
-
-
- @Override
- public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
-
- final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
- return filterValueObservable.flatMap( filterValue -> {
- final String edgeTypeName = getEdgeName();
- final Id id = filterValue.getValue();
-
- //create our search
- final SearchByEdge searchByEdge =
- new SimpleSearchByEdge( id, edgeTypeName, targetId, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- Optional.absent() );
-
- //load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
- return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
- } );
- }
-
-
- /**
- * Get the name of the edge to be used in the seek
- */
- protected abstract String getEdgeName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
deleted file mode 100644
index 303bc5b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-
-
-/**
- * Command for reading graph edges
- */
-public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
-
- private final GraphManagerFactory graphManagerFactory;
-
-
- /**
- * Create a new instance of our command
- */
- public AbstractReadGraphFilter( final GraphManagerFactory graphManagerFactory ) {
- this.graphManagerFactory = graphManagerFactory;
- }
-
-
- @Override
- public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
-
-
- //get the graph manager
- final GraphManager graphManager =
- graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
-
- final String edgeName = getEdgeTypeName();
- final EdgeState edgeCursorState = new EdgeState();
-
-
- //return all ids that are emitted from this edge
- return previousIds.flatMap( previousFilterValue -> {
-
- //set our our constant state
- final Optional<Edge> startFromCursor = getSeekValue();
- final Id id = previousFilterValue.getValue();
-
-
- final SimpleSearchByEdgeType search =
- new SimpleSearchByEdgeType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- startFromCursor );
-
- /**
- * TODO, pass a message with pointers to our cursor values to be generated later
- */
- return graphManager.loadEdgesFromSource( search )
- //set the edge state for cursors
- .doOnNext( edge -> edgeCursorState.update( edge ) )
-
- //map our id from the target edge and set our cursor every edge we traverse
- .map( edge -> createFilterResult( edge.getTargetNode(), edgeCursorState.getCursorEdge(),
- previousFilterValue.getPath() ) );
- } );
- }
-
-
- @Override
- protected FilterResult<Id> createFilterResult( final Id emit, final Edge cursorValue,
- final Optional<EdgePath> parent ) {
-
- //if it's our first pass, there's no cursor to generate
- if(cursorValue == null){
- return new FilterResult<>( emit, parent );
- }
-
- return super.createFilterResult( emit, cursorValue, parent );
- }
-
-
- @Override
- protected CursorSerializer<Edge> getCursorSerializer() {
- return EdgeCursorSerializer.INSTANCE;
- }
-
-
- /**
- * Get the edge type name we should use when traversing
- */
- protected abstract String getEdgeTypeName();
-
-
- /**
- * Wrapper class. Because edges seek > the last returned, we need to keep our n-1 value. This will be our cursor We
- * always try to seek to the same position as we ended. Since we don't deal with a persistent read result, if we
- * seek to a value = to our last, we may skip data.
- */
- private final class EdgeState {
-
- private Edge cursorEdge = null;
- private Edge currentEdge = null;
-
-
- /**
- * Update the pointers
- */
- private void update( final Edge newEdge ) {
- cursorEdge = currentEdge;
- currentEdge = newEdge;
- }
-
-
- /**
- * Get the edge to use in cursors for resume
- */
- private Edge getCursorEdge() {
- return cursorEdge;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
deleted file mode 100644
index 769a67e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EdgeCursorSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
-
-
-/**
- * Edge cursor serializer
- */
-public class EdgeCursorSerializer extends AbstractCursorSerializer<Edge> {
-
-
- public static final EdgeCursorSerializer INSTANCE = new EdgeCursorSerializer();
-
- @Override
- protected Class<SimpleEdge> getType() {
- return SimpleEdge.class;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
deleted file mode 100644
index 5a0e026..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
-
- private final Id entityId;
-
-
- @Inject
- public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
- @Override
- public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
- //ignore what our input was, and simply emit the id specified
- return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() ));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
deleted file mode 100644
index d598a2e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityLoadFilter.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadFilter extends AbstractFilter<Id, Entity> implements Filter<Id, Entity> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
- @Inject
- public EntityLoadFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- }
-
-
- @Override
- public Observable<FilterResult<Entity>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
-
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
- //it's more efficient to make 1 network hop to get everything, then drop our results if required
- final Observable<FilterResult<Entity>> entityObservable =
- filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap( bufferedIds -> {
-
- final Observable<EntitySet> entitySetObservable =
- Observable.from( bufferedIds ).map( filterResultId -> filterResultId.getValue() ).toList()
- .flatMap( ids -> entityCollectionManager.load( ids ) );
-
-
- //now we have a collection, validate our canidate set is correct.
-
- return entitySetObservable.map( entitySet -> new EntityVerifier( entitySet, bufferedIds ) )
- .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
- entityCollector -> Observable.from( entityCollector.getResults() ) );
- } );
-
- return entityObservable;
- }
-
-
- /**
- * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this
- * state is mutable and difficult to represent functionally
- */
- private static final class EntityVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
- private List<FilterResult<Entity>> results = new ArrayList<>();
-
- private final List<FilterResult<Id>> candidateResults;
- private final EntitySet entitySet;
-
-
- public EntityVerifier( final EntitySet entitySet, final List<FilterResult<Id>> candidateResults ) {
- this.entitySet = entitySet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( entitySet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final FilterResult<Id> candidateResult : candidateResults ) {
- validate( candidateResult );
- }
- }
-
-
- public List<FilterResult<Entity>> getResults() {
- return results;
- }
-
-
- private void validate( final FilterResult<Id> filterResult ) {
-
- final Id candidateId = filterResult.getValue();
-
-
- final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
- //doesn't exist warn and drop
- if ( entity == null || !entity.getEntity().isPresent() ) {
- logger.warn( "Read graph edge and received candidate with entityId {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue", candidateId );
-
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
- }
-
- //it exists, add it
-
- final Entity returnEntity = entity.getEntity().get();
-
- final Optional<EdgePath> parent = filterResult.getPath();
-
- final FilterResult<Entity> toReturn = new FilterResult<>( returnEntity, parent );
-
- results.add( toReturn );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg
deleted file mode 100644
index c0308bd..0000000
Binary files a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/GraphDiagram.jpg and /dev/null differ
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
deleted file mode 100644
index da6ad29..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionByIdFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-
-/**
- * Read an edge in the graph to verify it's existence by id
- */
-public class ReadGraphCollectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
-
- private final String collectionName;
-
- @Inject
- public ReadGraphCollectionByIdFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName, @Assisted final Id targetId ) {
- super( graphManagerFactory, targetId );
- this.collectionName = collectionName;
- }
-
-
- @Override
- protected String getEdgeName() {
- return CpNamingUtils.getEdgeTypeFromCollectionName( collectionName );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
deleted file mode 100644
index 91ae7c3..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphCollectionFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromCollectionName;
-
-
-/**
- * Command for reading graph edges on a collection
- */
-public class ReadGraphCollectionFilter extends AbstractReadGraphFilter {
-
- private final String collectionName;
-
-
- /**
- * Create a new instance of our command
- */
- @Inject
- public ReadGraphCollectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String collectionName ) {
- super( graphManagerFactory );
- this.collectionName = collectionName;
- }
-
-
- @Override
- protected String getEdgeTypeName() {
- return getEdgeTypeFromCollectionName( collectionName );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
deleted file mode 100644
index 4756d33..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByIdFilter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-
-/**
- * Read an edge in the graph to verify it's existence by id
- */
-public class ReadGraphConnectionByIdFilter extends AbstractReadGraphEdgeByIdFilter{
-
- private final String connectionName;
-
- @Inject
- public ReadGraphConnectionByIdFilter( final GraphManagerFactory graphManagerFactory,
- @Assisted final String connectionName, @Assisted final Id targetId ) {
- super( graphManagerFactory, targetId );
- this.connectionName = connectionName;
- }
-
-
- @Override
- protected String getEdgeName() {
- return CpNamingUtils.getEdgeTypeFromConnectionType( connectionName );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
deleted file mode 100644
index 7371579..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionByTypeFilter.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.GraphManager;
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Optional;
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
-
-
-/**
- * Command for reading graph edges on a connection
- */
-public class ReadGraphConnectionByTypeFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
-
- private final GraphManagerFactory graphManagerFactory;
- private final String connectionName;
- private final String entityType;
-
-
- /**
- * Create a new instance of our command
- */
- @Inject
- public ReadGraphConnectionByTypeFilter( final GraphManagerFactory graphManagerFactory,
- @Assisted("connectionName") final String connectionName, @Assisted("entityType") final String entityType ) {
- this.graphManagerFactory = graphManagerFactory;
- this.connectionName = connectionName;
- this.entityType = entityType;
- }
-
-
-
- @Override
- public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterResultObservable ) {
-
- //get the graph manager
- final GraphManager graphManager = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
-
-
-
- final String edgeName = getEdgeTypeFromConnectionType( connectionName );
-
-
- //return all ids that are emitted from this edge
- return filterResultObservable.flatMap( idFilterResult -> {
-
- //set our our constant state
- final Optional<Edge> startFromCursor = getSeekValue();
- final Id id = idFilterResult.getValue();
-
- final SimpleSearchByIdType search =
- new SimpleSearchByIdType( id, edgeName, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- entityType, startFromCursor );
-
- return graphManager.loadEdgesFromSourceByType( search ).map(
- edge -> createFilterResult( edge.getTargetNode(), edge, idFilterResult.getPath() ));
- } );
- }
-
-
- @Override
- protected CursorSerializer<Edge> getCursorSerializer() {
- return EdgeCursorSerializer.INSTANCE;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
deleted file mode 100644
index 0d4971b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/ReadGraphConnectionFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.graph;
-
-
-import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getEdgeTypeFromConnectionType;
-
-
-/**
- * Command for reading graph edges on a connection
- */
-public class ReadGraphConnectionFilter extends AbstractReadGraphFilter {
-
- private final String connectionName;
-
-
- /**
- * Create a new instance of our command
- */
- @Inject
- public ReadGraphConnectionFilter( final GraphManagerFactory graphManagerFactory, @Assisted final String connectionName ) {
- super( graphManagerFactory );
- this.connectionName = connectionName;
- }
-
-
- @Override
- protected String getEdgeTypeName() {
- return getEdgeTypeFromConnectionType( connectionName );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
new file mode 100644
index 0000000..eaf74c1
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/AbstractElasticSearchFilter.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.core.metrics.ObservableTimer;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.CandidateResults;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.index.SearchTypes;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Optional;
+
+import rx.Observable;
+
+
+/**
+ * Command for reading graph edges
+ */
+public abstract class AbstractElasticSearchFilter extends AbstractPathFilter<Id, Candidate, Integer> {
+
+ private static final Logger log = LoggerFactory.getLogger( AbstractElasticSearchFilter.class );
+
+ private final EntityIndexFactory entityIndexFactory;
+ private final String query;
+ private final Timer searchTimer;
+
+
+ /**
+ * Create a new instance of our command
+ */
+ public AbstractElasticSearchFilter( final EntityIndexFactory entityIndexFactory,
+ final MetricsFactory metricsFactory, final String query ) {
+ this.entityIndexFactory = entityIndexFactory;
+ this.query = query;
+ this.searchTimer = metricsFactory.getTimer( AbstractElasticSearchFilter.class, "query" );
+ }
+
+
+ @Override
+ public Observable<FilterResult<Candidate>> call( final Observable<FilterResult<Id>> observable ) {
+
+ //get the graph manager
+ final ApplicationEntityIndex applicationEntityIndex =
+ entityIndexFactory.createApplicationEntityIndex( pipelineContext.getApplicationScope() );
+
+
+ final int limit = pipelineContext.getLimit();
+
+
+ final SearchTypes searchTypes = getSearchTypes();
+
+
+ //return all ids that are emitted from this edge
+ return observable.flatMap( idFilterResult -> {
+
+ final SearchEdge searchEdge = getSearchEdge( idFilterResult.getValue() );
+
+
+ final Observable<FilterResult<Candidate>> candidates = Observable.create( subscriber -> {
+
+ //our offset to our start value. This will be set the first time we emit
+ //after we receive new ids, we want to reset this to 0
+ //set our our constant state
+ final Optional<Integer> startFromCursor = getSeekValue();
+
+ final int startOffset = startFromCursor.or( 0 );
+
+ int currentOffSet = startOffset;
+
+ subscriber.onStart();
+
+ //emit while we have values from ES and someone is subscribed
+ while ( !subscriber.isUnsubscribed() ) {
+
+
+ try {
+ final CandidateResults candidateResults =
+ applicationEntityIndex.search( searchEdge, searchTypes, query, limit, currentOffSet );
+
+
+
+ for( CandidateResult candidateResult: candidateResults){
+
+ //our subscriber unsubscribed, break out
+ if(subscriber.isUnsubscribed()){
+ return;
+ }
+
+ final Candidate candidate = new Candidate( candidateResult, searchEdge );
+
+ final FilterResult<Candidate>
+ result = createFilterResult( candidate, currentOffSet, idFilterResult.getPath() );
+
+ subscriber.onNext( result );
+
+ currentOffSet++;
+ }
+
+ /**
+ * No candidates, we're done
+ */
+ if (candidateResults.size() < limit) {
+ subscriber.onCompleted();
+ return;
+ }
+
+ }
+ catch ( Throwable t ) {
+
+ log.error( "Unable to search candidates", t );
+ subscriber.onError( t );
+ }
+ }
+ } );
+
+
+ //add a timer around our observable
+ ObservableTimer.time( candidates, searchTimer );
+
+ return candidates;
+ } );
+ }
+
+
+ @Override
+ protected CursorSerializer<Integer> getCursorSerializer() {
+ return ElasticsearchCursorSerializer.INSTANCE;
+ }
+
+
+ /**
+ * Get the search edge from the id
+ */
+ protected abstract SearchEdge getSearchEdge( final Id id );
+
+ /**
+ * Get the search types
+ */
+ protected abstract SearchTypes getSearchTypes();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d54dffc/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
new file mode 100644
index 0000000..7ada4ba
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/search/Candidate.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.search;
+
+
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.SearchEdge;
+
+
+/**
+ * Create a candidate. This holds the original candidate, as well as the search scope it was found it
+ */
+public class Candidate {
+
+ private final CandidateResult candidateResult;
+ private final SearchEdge searchEdge;
+
+
+ /**
+ * Create a new Candidate for further processing
+ * @param candidateResult The candidate result
+ * @param searchEdge The search edge this was searched on
+ */
+ public Candidate( final CandidateResult candidateResult, final SearchEdge searchEdge ) {
+ this.candidateResult = candidateResult;
+ this.searchEdge = searchEdge;
+ }
+
+
+ public CandidateResult getCandidateResult() {
+ return candidateResult;
+ }
+
+
+ public SearchEdge getSearchEdge() {
+ return searchEdge;
+ }
+}