You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/04 19:27:54 UTC
[03/11] incubator-usergrid git commit: Massive refactor. Paths for
cursor generation are now part of our I/O results. This allows the collector
to take until satisfied, then generate a serializable path.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
new file mode 100644
index 0000000..56e1c1c
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateIdFilter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.PipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.collection.EntityCollectionManager;
+import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
+import org.apache.usergrid.persistence.collection.VersionSet;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.CandidateResult;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
+import org.apache.usergrid.persistence.index.SearchEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.fasterxml.uuid.UUIDComparator;
+import com.google.inject.Inject;
+
+import rx.Observable;
+
+
+/**
+ * Responsible for verifying candidate result versions, then emitting the Ids of these versions
+ * Input is a batch of candidate results, output is a stream of validated Ids
+ */
+public class CandidateIdFilter extends AbstractFilter<Candidate, Id>
+ implements Filter<Candidate, Id> {
+
+ private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final EntityIndexFactory entityIndexFactory;
+
+
+ @Inject
+ public CandidateIdFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final EntityIndexFactory entityIndexFactory ) {
+ this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.entityIndexFactory = entityIndexFactory;
+ }
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Candidate>> filterResultObservable ) {
+
+
+ /**
+ * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
+ * objects
+ */
+
+ final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
+
+ final EntityCollectionManager entityCollectionManager =
+ entityCollectionManagerFactory.createCollectionManager( applicationScope );
+
+
+ final ApplicationEntityIndex applicationIndex =
+ entityIndexFactory.createApplicationEntityIndex( applicationScope );
+
+ final Observable<FilterResult<Id>> searchIdSetObservable = filterResultObservable.buffer( pipelineContext.getLimit() ).flatMap(
+ candidateResults -> {
+ //flatten toa list of ids to load
+ final Observable<List<Id>> candidateIds =
+ Observable.from( candidateResults ).map( candidate -> candidate.getValue().getCandidateResult().getId() )
+ .toList();
+
+ //load the ids
+ final Observable<VersionSet> versionSetObservable =
+ candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
+
+ //now we have a collection, validate our canidate set is correct.
+
+ return versionSetObservable.map(
+ entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
+ .doOnNext( entityCollector -> entityCollector.merge() ).flatMap(
+ entityCollector -> Observable.from( entityCollector.collectResults() ) );
+ } );
+
+ return searchIdSetObservable;
+ }
+
+
+
+
+
+ /**
+ * Map a new cp entity to an old entity. May be null if not present
+ */
+ private static final class EntityCollector {
+
+ private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
+ private List<FilterResult<Id>> results = new ArrayList<>();
+
+ private final EntityIndexBatch batch;
+ private final List<FilterResult<Candidate>> candidateResults;
+ private final VersionSet versionSet;
+
+
+ public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
+ final List<FilterResult<Candidate>> candidateResults ) {
+ this.batch = batch;
+ this.versionSet = versionSet;
+ this.candidateResults = candidateResults;
+ this.results = new ArrayList<>( versionSet.size() );
+ }
+
+
+ /**
+ * Merge our candidates and our entity set into results
+ */
+ public void merge() {
+
+ for ( final FilterResult<Candidate> candidateResult : candidateResults ) {
+ validate( candidateResult );
+ }
+
+ batch.execute();
+ }
+
+
+ public List<FilterResult<Id>> collectResults() {
+ return results;
+ }
+
+
+ /**
+ * Validate each candidate results vs the data loaded from cass
+ * @param filterCandidate
+ */
+ private void validate( final FilterResult<Candidate> filterCandidate ) {
+
+ final CandidateResult candidateResult = filterCandidate.getValue().getCandidateResult();
+
+ final SearchEdge searchEdge = filterCandidate.getValue().getSearchEdge();
+
+ final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
+
+ final UUID candidateVersion = candidateResult.getVersion();
+
+ final UUID entityVersion = logEntry.getVersion();
+
+ final Id entityId = logEntry.getEntityId();
+
+ //entity is newer than ES version
+ if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
+
+ logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
+ new Object[] { searchEdge, entityId, entityVersion } );
+ batch.deindex( searchEdge, entityId, entityVersion );
+ return;
+ }
+
+ //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
+ //remove the ES record, since the read in cass should cause a read repair, just ignore
+ if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
+
+ logger.warn(
+ "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
+ + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
+ }
+
+ //they're the same add it
+
+ final FilterResult<Id> result = new FilterResult<>( entityId, filterCandidate.getPath() );
+
+ results.add( result );
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
deleted file mode 100644
index 465ff22..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsEntityResultsCollector.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from an incoming CandidateResults object and return them as results
- */
-public class CandidateResultsEntityResultsCollector extends AbstractPipelineOperation<CandidateResults, ResultsPage>
- implements Collector<CandidateResults, ResultsPage> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateResultsEntityResultsCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
- @Override
- public Observable<ResultsPage> call( final Observable<CandidateResults> candidateResultsObservable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- final Observable<ResultsPage> searchIdSetObservable = candidateResultsObservable.flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds =
- Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
- //load the ids
- final Observable<EntitySet> entitySetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.load( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return entitySetObservable
- .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
- .doOnNext( entityCollector -> entityCollector.merge() )
- .map( entityCollector -> entityCollector.getResults() );
- } );
-
- //if we filter all our results, we want to continue to try the next page
- return searchIdSetObservable;
- }
-
-
- /**
- * Our collector to collect entities. Not quite a true collector, but works within our operational flow as this state is mutable and difficult to represent functionally
- */
- private static final class EntityCollector {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
- private List<Entity> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final CandidateResults candidateResults;
- private final EntitySet entitySet;
-
-
- public EntityCollector( final EntityIndexBatch batch, final EntitySet entitySet,
- final CandidateResults candidateResults ) {
- this.batch = batch;
- this.entitySet = entitySet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( entitySet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final CandidateResult candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public ResultsPage getResults() {
- return new ResultsPage( results );
- }
-
-
- public EntityIndexBatch getBatch() {
- return batch;
- }
-
-
- private void validate( final CandidateResult candidateResult ) {
-
- final Id candidateId = candidateResult.getId();
- final UUID candidateVersion = candidateResult.getVersion();
-
-
- final MvccEntity entity = entitySet.getEntity( candidateId );
-
-
- //doesn't exist warn and drop
- if ( entity == null ) {
- logger.warn(
- "Searched and received candidate with entityId {} and version {}, yet was not found in cassandra."
- + " Ignoring since this could be a region sync issue",
- candidateId, candidateVersion );
-
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
-
- }
-
-
- final UUID entityVersion = entity.getVersion();
-
-
- //entity is newer than ES version, could be an update or the entity is marked as deleted
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0) {
-
- final Id entityId = entity.getId();
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, entityVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- final Id entityId = entity.getId();
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
-
- //TODO trigger an audit after a fail count where we explicitly try to repair from other regions
-
- return;
- }
-
- //they're the same add it
-
-
- results.add( entity.getEntity().get() );
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
deleted file mode 100644
index bb9ab76..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/CandidateResultsIdVerifyFilter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.PipelineOperation;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Responsible for verifying candidate result versions, then emitting the Ids of these versions
- * Input is a batch of candidate results, output is a stream of validated Ids
- */
-public class CandidateResultsIdVerifyFilter extends AbstractPipelineOperation<CandidateResults, Id>
- implements PipelineOperation<CandidateResults, Id> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final EntityIndexFactory entityIndexFactory;
-
-
- @Inject
- public CandidateResultsIdVerifyFilter( final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EntityIndexFactory entityIndexFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.entityIndexFactory = entityIndexFactory;
- }
-
-
-
- @Override
- public Observable<Id> call( final Observable<CandidateResults> observable ) {
-
-
- /**
- * A bit kludgy from old 1.0 -> 2.0 apis. Refactor this as we clean up our lower levels and create new results
- * objects
- */
-
- final ApplicationScope applicationScope = pipelineContext.getApplicationScope();
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( applicationScope );
-
-
- final ApplicationEntityIndex applicationIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
-
- final Observable<Id> searchIdSetObservable = observable.flatMap( candidateResults -> {
- //flatten toa list of ids to load
- final Observable<List<Id>> candidateIds =
- Observable.from( candidateResults ).map( candidate -> candidate.getId() ).toList();
-
- //load the ids
- final Observable<VersionSet> versionSetObservable =
- candidateIds.flatMap( ids -> entityCollectionManager.getLatestVersion( ids ) );
-
- //now we have a collection, validate our canidate set is correct.
-
- return versionSetObservable
- .map( entitySet -> new EntityCollector( applicationIndex.createBatch(), entitySet, candidateResults ) )
- .doOnNext( entityCollector -> entityCollector.merge() )
- .flatMap( entityCollector -> Observable.from( entityCollector.collectResults() ) );
- } );
-
- return searchIdSetObservable;
- }
-
-
- /**
- * Map a new cp entity to an old entity. May be null if not present
- */
- private static final class EntityCollector {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityCollector.class );
- private List<Id> results = new ArrayList<>();
-
- private final EntityIndexBatch batch;
- private final CandidateResults candidateResults;
- private final VersionSet versionSet;
-
-
- public EntityCollector( final EntityIndexBatch batch, final VersionSet versionSet,
- final CandidateResults candidateResults ) {
- this.batch = batch;
- this.versionSet = versionSet;
- this.candidateResults = candidateResults;
- this.results = new ArrayList<>( versionSet.size() );
- }
-
-
- /**
- * Merge our candidates and our entity set into results
- */
- public void merge() {
-
- for ( final CandidateResult candidateResult : candidateResults ) {
- validate( candidateResult );
- }
-
- batch.execute();
- }
-
-
- public List<Id> collectResults() {
- return results;
- }
-
-
- /**
- * Validate each candidate results vs the data loaded from cass
- * @param candidateResult
- */
- private void validate( final CandidateResult candidateResult ) {
-
- final MvccLogEntry logEntry = versionSet.getMaxVersion( candidateResult.getId() );
-
- final UUID candidateVersion = candidateResult.getVersion();
-
- final UUID entityVersion = logEntry.getVersion();
-
- final Id entityId = logEntry.getEntityId();
-
- //entity is newer than ES version
- if ( UUIDComparator.staticCompare( entityVersion, candidateVersion ) > 0 ) {
-
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn( "Deindexing stale entity on edge {} for entityId {} and version {}",
- new Object[] { searchEdge, entityId, entityVersion } );
- batch.deindex( searchEdge, entityId, entityVersion );
- return;
- }
-
- //ES is newer than cass, it means we haven't repaired the record in Cass, we don't want to
- //remove the ES record, since the read in cass should cause a read repair, just ignore
- if ( UUIDComparator.staticCompare( candidateVersion, entityVersion ) > 0 ) {
-
- final SearchEdge searchEdge = candidateResults.getSearchEdge();
-
- logger.warn(
- "Found a newer version in ES over cassandra for edge {} for entityId {} and version {}. Repair "
- + "should be run", new Object[] { searchEdge, entityId, entityVersion } );
- }
-
- //they're the same add it
-
- results.add( entityId );
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
deleted file mode 100644
index cc96633..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionRefsVerifier.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.SimpleEntityRef;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class CollectionRefsVerifier extends VersionVerifier {
-
-
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
- List<EntityRef> refs = new ArrayList<EntityRef>(ids.size());
- for ( Id id : ids ) {
- refs.add( new SimpleEntityRef( id.getType(), id.getUuid() ) );
- }
- return Results.fromRefList( refs );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 94c91d9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/CollectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class CollectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
- private final EntityCollectionManager entityCollectionManager;
- private final ApplicationEntityIndex applicationEntityIndex;
-
-
- public CollectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
- final ApplicationEntityIndex applicationEntityIndex ) {
- this.entityCollectionManager = entityCollectionManager;
- this.applicationEntityIndex = applicationEntityIndex;
- }
-
-
- @Override
- public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
- final Query.Level resultsLevel ) {
-
- ResultsVerifier verifier;
-
- if ( resultsLevel == Query.Level.REFS ) {
- verifier = new CollectionRefsVerifier();
- }
- else if ( resultsLevel == Query.Level.IDS ) {
- verifier = new IdsVerifier();
- }
- else {
- verifier = new EntityVerifier( Query.MAX_LIMIT );
- }
-
- return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope,
- scope );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
deleted file mode 100644
index 6b7bdde..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionRefsVerifier.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.usergrid.persistence.ConnectionRef;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.cassandra.ConnectionRefImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import static org.apache.usergrid.persistence.SimpleEntityRef.ref;
-
-
-/**
- * Verifier for creating connections
- */
-public class ConnectionRefsVerifier extends VersionVerifier {
-
-
- private final EntityRef ownerId;
- private final String connectionType;
-
-
- public ConnectionRefsVerifier( final EntityRef ownerId, final String connectionType ) {
- this.ownerId = ownerId;
- this.connectionType = connectionType;
- }
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
- List<ConnectionRef> refs = new ArrayList<>();
- for ( Id id : ids ) {
- refs.add( new ConnectionRefImpl( ownerId, connectionType, ref(id.getType(), id.getUuid()) ));
- }
-
- return Results.fromConnections( refs );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
deleted file mode 100644
index 55b95a9..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ConnectionResultsLoaderFactoryImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public class ConnectionResultsLoaderFactoryImpl implements ResultsLoaderFactory {
-
- private final EntityCollectionManager entityCollectionManager;
- private final ApplicationEntityIndex applicationEntityIndex;
- private final EntityRef ownerId;
- private final String connectionType;
-
-
- public ConnectionResultsLoaderFactoryImpl( final EntityCollectionManager entityCollectionManager,
- final ApplicationEntityIndex applicationEntityIndex, final EntityRef ownerId,
- final String connectionType ) {
- this.entityCollectionManager = entityCollectionManager;
- this.applicationEntityIndex = applicationEntityIndex;
- this.ownerId = ownerId;
- this.connectionType = connectionType;
- }
-
-
- @Override
- public ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge scope,
- final Query.Level resultsLevel ) {
-
- ResultsVerifier verifier;
-
- if ( resultsLevel == Query.Level.REFS ) {
- verifier = new ConnectionRefsVerifier( ownerId, connectionType );
- }
- else if ( resultsLevel == Query.Level.IDS ) {
- verifier = new ConnectionRefsVerifier( ownerId, connectionType );
- ;
- }
- else {
- verifier = new EntityVerifier( Query.MAX_LIMIT );
- }
-
- return new FilteringLoader( entityCollectionManager, applicationEntityIndex, verifier, applicationScope, scope );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
deleted file mode 100644
index 6e170f8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ElasticSearchQueryExecutor.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.usergrid.persistence.index.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-public class ElasticSearchQueryExecutor implements Iterable<Results>, Iterator<Results> {
-
- private static final Logger logger = LoggerFactory.getLogger( ElasticSearchQueryExecutor.class );
-
- private final ResultsLoaderFactory resultsLoaderFactory;
-
- private final ApplicationScope applicationScope;
-
- private final ApplicationEntityIndex entityIndex;
-
- private final SearchEdge indexScope;
-
- private final SearchTypes types;
-
- private final String query;
-
- private final Optional<Integer> setOffsetFromCursor;
-
- private final int limit;
-
- private int offset;
-
-
- private Results currentResults;
-
- private boolean moreToLoad = true;
-
-
-
-
- public ElasticSearchQueryExecutor( final ResultsLoaderFactory resultsLoaderFactory, final ApplicationEntityIndex entityIndex,
- final ApplicationScope applicationScope, final SearchEdge indexScope,
- final SearchTypes types, final String query, final Optional<Integer> setOffsetFromCursor, final int limit ) {
- this.resultsLoaderFactory = resultsLoaderFactory;
- this.applicationScope = applicationScope;
- this.entityIndex = entityIndex;
- this.indexScope = indexScope;
- this.types = types;
- this.setOffsetFromCursor = setOffsetFromCursor;
-
- //we must deep copy the query passed. Otherwise we will modify it's state with cursors. Won't fix, not relevant
- //once we start subscribing to streams.
- this.query = query;
- this.limit = limit;
- }
-
-
- @Override
- public Iterator<Results> iterator() {
- return this;
- }
-
-
- private void loadNextPage() {
- // Because of possible stale entities, which are filtered out by buildResults(),
- // we loop until the we've got enough results to satisfy the query limit.
-
- final int maxQueries = 10; // max re-queries to satisfy query limit
-
-
- Results results = null;
- int queryCount = 0;
-
-
- CandidateResults crs = null;
-
- int newLimit = limit;
-
- while ( queryCount++ < maxQueries ) {
-
- crs = entityIndex.search( indexScope, types, query, newLimit , offset);
-
-
- logger.debug( "Calling build results with crs {}", crs );
- results = buildResults( indexScope, crs );
-
- /**
- * In an edge case where we delete stale entities, we could potentially get less results than expected.
- * This will only occur once during the repair phase.
- * We need to ensure that we short circuit before we overflow the requested limit during a repair.
- */
- if ( crs.isEmpty() || !crs.hasOffset() || results.size() > 0 ) { // no results, no cursor, can't get more
- break;
- }
-
-
- //we didn't load anything, but there was a cursor, this means a read repair occured. We have to short
- //circuit to avoid over returning the result set
-
-
- // need to query for more
- // ask for just what we need to satisfy, don't want to exceed limit
- newLimit = newLimit - results.size();
-
- logger.warn( "Satisfy query limit {}, new limit {} query count {}", new Object[] {
- limit, newLimit, queryCount
- } );
- }
-
- //now set our cursor if we have one for the next iteration
- if ( results.hasCursor() ) {
- moreToLoad = true;
- }
-
- else {
- moreToLoad = false;
- }
-
-//
-// //set our select subjects into our query if provided
-// if(crs != null){
-// query.setSelectSubjects( crs.getGetFieldMappings() );
-// }
-//
-
- //set our current results and the flag
- this.currentResults = results;
- }
-
-
-
- /**
- * Build results from a set of candidates, and discard those that represent stale indexes.
- *
- * @param indexScope The index scope to execute the search on
- * @param crs Candidates to be considered for results
- */
- private Results buildResults( final SearchEdge indexScope, final CandidateResults crs ) {
-
- logger.debug( "buildResults() from {} candidates", crs.size() );
-
- //get an instance of our results loader
- final ResultsLoader resultsLoader =
- this.resultsLoaderFactory.getLoader( applicationScope, indexScope, Query.Level.ALL_PROPERTIES );
-
- //load the results
- final Results results = resultsLoader.loadResults(crs);
-
- //signal for post processing
- resultsLoader.postProcess();
-
- //set offset into query
-
- logger.debug( "Returning results size {}", results.size() );
-
- return results;
- }
-
-
- @Override
- public boolean hasNext() {
-
- //we've tried to load and it's empty and we have more to load, load the next page
- if ( currentResults == null ) {
- //there's nothing left to load, nothing to do
- if ( !moreToLoad ) {
- return false;
- }
-
- //load the page
-
- loadNextPage();
- }
-
-
- //see if our current results are not null
- return currentResults != null;
- }
-
-
- @Override
- public Results next() {
- if ( !hasNext() ) {
- throw new NoSuchElementException( "No more results present" );
- }
-
- final Results toReturn = currentResults;
-
- currentResults = null;
-
- return toReturn;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("Remove not implemented!!");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
deleted file mode 100644
index d73c731..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/EntityVerifier.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.util.CpEntityMapUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityFactory;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Optional;
-
-
-/**
- * A loader that verifies versions are correct in cassandra and match elasticsearch
- */
-public class EntityVerifier implements ResultsVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( EntityVerifier.class );
-
- private EntitySet ids;
-
- private Map<Id, org.apache.usergrid.persistence.model.entity.Entity> entityMapping;
-
-
- public EntityVerifier( final int maxSize ) {
- this.entityMapping = new HashMap<>( maxSize );
- }
-
-
- @Override
- public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
- ids = ecm.load( idsToLoad ).toBlocking().last();
- logger.debug("loadResults() asked for {} ids and got {}", idsToLoad.size(), ids.size());
- }
-
-
- @Override
- public boolean isValid( final CandidateResult candidateResult ) {
- final Id entityId = candidateResult.getId();
-
- final MvccEntity savedEntity = ids.getEntity( entityId );
-
- //version wasn't found deindex
- if ( savedEntity == null ) {
- logger.warn( "Version for Entity {}:{} not found", entityId.getType(), entityId.getUuid() );
- return false;
- }
-
- final UUID candidateVersion = candidateResult.getVersion();
- final UUID savedVersion = savedEntity.getVersion();
-
- if ( UUIDComparator.staticCompare( savedVersion, candidateVersion ) > 0 ) {
- logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
- entityId.getUuid(), entityId.getType(), candidateVersion, savedEntity
- } );
-
- return false;
- }
-
-
- final Optional<org.apache.usergrid.persistence.model.entity.Entity> entity = savedEntity.getEntity();
-
- if ( !entity.isPresent() ) {
- logger.warn( "Entity uuid:{} version v:{} is deleted but indexed, this is a bug ",
- entityId.getUuid(), savedEntity.getEntity() );
- return false;
- }
-
- entityMapping.put( entityId, entity.get() );
-
- return true;
- }
-
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
-
- final List<Entity> ugEntities = new ArrayList<>( ids.size() );
-
- for ( final Id id : ids ) {
- final org.apache.usergrid.persistence.model.entity.Entity cpEntity = entityMapping.get( id );
-
- Entity entity = EntityFactory.newEntity( id.getUuid(), id.getType() );
-
- Map<String, Object> entityMap = CpEntityMapUtils.toMap( cpEntity );
- entity.addProperties( entityMap );
- ugEntities.add( entity );
- }
-
- return Results.fromEntities( ugEntities );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
deleted file mode 100644
index ade64a2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/FilteringLoader.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.corepersistence.ManagerCache;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndexBatch;
-import org.apache.usergrid.persistence.index.EntityIndexFactory;
-import org.apache.usergrid.persistence.index.SearchEdge;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.index.CandidateResults;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-
-
-public class FilteringLoader implements ResultsLoader {
-
- private static final Logger logger = LoggerFactory.getLogger( FilteringLoader.class );
-
- private final EntityCollectionManager entityCollectionManager;
- private final ResultsVerifier resultsVerifier;
- private final ApplicationScope applicationScope;
- private final SearchEdge indexScope;
- private final EntityIndexBatch indexBatch;
-
-
- /**
- * Create an instance of a filter loader
- *
- * @param entityCollectionManager The entityCollectionManagerFactory
- * @param resultsVerifier The verifier to verify the candidate results
- * @param applicationScope The application scope to perform the load
- * @param indexScope The index scope used in the search
- */
- protected FilteringLoader( final EntityCollectionManager entityCollectionManager, final ApplicationEntityIndex applicationEntityIndex, final ResultsVerifier resultsVerifier,
- final ApplicationScope applicationScope, final SearchEdge indexScope ) {
-
- this.entityCollectionManager = entityCollectionManager;
- this.resultsVerifier = resultsVerifier;
- this.applicationScope = applicationScope;
- this.indexScope = indexScope;
-
- indexBatch = applicationEntityIndex.createBatch();
- }
-
-
- @Override
- public Results loadResults( final CandidateResults crs ) {
-
-
- if ( crs.size() == 0 ) {
- return new Results();
- }
-
-
- // For each entity, holds the index it appears in our candidates for keeping ordering correct
- final Map<Id, Integer> orderIndex = new HashMap<>( crs.size() );
-
- // Maps the entity ids to our candidates
- final Map<Id, CandidateResult> maxCandidateMapping = new HashMap<>( crs.size() );
-
-
- final Iterator<CandidateResult> iter = crs.iterator();
-
-
- // TODO, in this case we're "optimizing" due to the limitations of collection scope.
- // Perhaps we should change the API to just be an application, then an "owner" scope?
-
- // Go through the candidates and group them by scope for more efficient retrieval.
- // Also remove duplicates before we even make a network call
- for ( int i = 0; iter.hasNext(); i++ ) {
-
- final CandidateResult currentCandidate = iter.next();
-
- final Id entityId = currentCandidate.getId();
-
- //check if we've seen this candidate by id
- final CandidateResult previousMax = maxCandidateMapping.get( entityId );
-
- //its not been seen, save it
- if ( previousMax == null ) {
- maxCandidateMapping.put( entityId, currentCandidate );
- orderIndex.put( entityId, i );
- continue;
- }
-
- //we have seen it, compare them
-
- final UUID previousMaxVersion = previousMax.getVersion();
-
- final UUID currentVersion = currentCandidate.getVersion();
-
-
- final CandidateResult toRemove;
- final CandidateResult toKeep;
-
- //current is newer than previous. Remove previous and keep current
- if ( UUIDComparator.staticCompare( currentVersion, previousMaxVersion ) > 0 ) {
- toRemove = previousMax;
- toKeep = currentCandidate;
- }
- //previously seen value is newer than current. Remove the current and keep the previously seen value
- else {
- toRemove = currentCandidate;
- toKeep = previousMax;
- }
-
- //this is a newer version, we know we already have a stale entity, add it to be cleaned up
-
-
- //de-index it
- logger.warn( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}", new Object[] {
- entityId.getUuid(), entityId.getType(), toRemove.getVersion(), toKeep.getVersion()
- } );
-
- //deindex this document, and remove the previous maxVersion
- //we have to deindex this from our ownerId, since this is what gave us the reference
- indexBatch.deindex( indexScope, toRemove );
-
-
- //TODO, fire the entity repair cleanup task here instead of de-indexing
-
- //replace the value with a more current version
- maxCandidateMapping.put( entityId, toKeep );
- orderIndex.put( entityId, i );
- }
-
-
- //now everything is ordered, and older versions are removed. Batch fetch versions to verify
- // existence and correct versions
-
- final TreeMap<Integer, Id> sortedResults = new TreeMap<>();
-
-
- final Collection<Id> idsToLoad =
- Collections2.transform( maxCandidateMapping.values(), new Function<CandidateResult, Id>() {
- @Nullable
- @Override
- public Id apply( @Nullable final CandidateResult input ) {
- //NOTE this is never null, we won't need to check
- return input.getId();
- }
- } );
-
-
- //now using the scope, load the collection
-
-
-
- //load the results into the loader for this scope for validation
- resultsVerifier.loadResults( idsToLoad, entityCollectionManager );
-
- //now let the loader validate each candidate. For instance, the "max" in this candidate
- //could still be a stale result, so it needs validated
- for ( final Id requestedId : idsToLoad ) {
-
- final CandidateResult cr = maxCandidateMapping.get( requestedId );
-
- //ask the loader if this is valid, if not discard it and de-index it
- if ( !resultsVerifier.isValid( cr ) ) {
- indexBatch.deindex( indexScope, cr );
- continue;
- }
-
- //if we get here we're good, we need to add this to our results
- final int candidateIndex = orderIndex.get( requestedId );
-
- sortedResults.put( candidateIndex, requestedId );
- }
-
-
- // NOTE DO NOT execute the batch here.
- // It changes the results and we need consistent paging until we aggregate all results
- return resultsVerifier.getResults( sortedResults.values() );
- }
-
-
- @Override
- public void postProcess() {
- this.indexBatch.execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
deleted file mode 100644
index 4a3bfcd..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/IdsVerifier.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public class IdsVerifier extends VersionVerifier {
-
- @Override
- public Results getResults( final Collection<Id> ids ) {
-
- final List<UUID> returnIds = new ArrayList<>( ids.size() );
-
- for ( final Id id : ids ) {
- returnIds.add( id.getUuid() );
- }
-
-
- return Results.fromIdList( returnIds );
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
deleted file mode 100644
index c2a3e9a..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.index.CandidateResults;
-
-
-/**
- * Interface for loading results
- */
-public interface ResultsLoader {
-
- /**
- * Using the candidate results, load our results. Should filter stale results
- * @param crs The candidate result set
- * @return Results. Null safe, but may be empty
- */
- public Results loadResults( final CandidateResults crs);
-
- /**
- * Post process the load operation
- */
- public void postProcess();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
deleted file mode 100644
index 3ccca1b..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsLoaderFactory.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.index.SearchEdge;
-
-
-/**
- * Factory for creating results
- */
-public interface ResultsLoaderFactory {
-
- /**
- * Get the loader for results
- * @param applicationScope The application scope used to load results
- * @param indexScope The index scope used in the search
- * @param
- */
- ResultsLoader getLoader( final ApplicationScope applicationScope, final SearchEdge indexScope,
- final Query.Level resultsLevel );
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
deleted file mode 100644
index fe72ca2..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/ResultsVerifier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import org.apache.usergrid.persistence.Results;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-
-public interface ResultsVerifier {
-
- /**
- * Load all the candidate ides for verification
- * @param ids The Id's to load
- * @param ecm The entity collection manager
- */
- public void loadResults(Collection<Id> ids, EntityCollectionManager ecm);
-
- /**
- * Return true if the candidate result is a valid result that should be retained. If it should
- * not it should also be removed from the list of possible return values in this loader
- * @param candidateResult
- */
- public boolean isValid(CandidateResult candidateResult);
-
-
- /**
- * Load the result set with the given ids
- * @return
- */
- public Results getResults(Collection<Id> ids);
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
deleted file mode 100644
index c49fb28..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/elasticsearch/impl/VersionVerifier.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.elasticsearch.impl;
-
-
-import java.util.Collection;
-import java.util.UUID;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.VersionSet;
-import org.apache.usergrid.persistence.index.CandidateResult;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.fasterxml.uuid.UUIDComparator;
-
-
-/**
- * A loader that verifies versions are correct in Cassandra and match ElasticSearch
- */
-public abstract class VersionVerifier implements ResultsVerifier {
-
- private static final Logger logger = LoggerFactory.getLogger( VersionVerifier.class );
-
- private VersionSet ids;
-
-
- @Override
- public void loadResults( final Collection<Id> idsToLoad, final EntityCollectionManager ecm ) {
- ids = ecm.getLatestVersion( idsToLoad ).toBlocking().last();
- }
-
-
- @Override
- public boolean isValid( final CandidateResult candidateResult ) {
- final Id entityId = candidateResult.getId();
-
- final MvccLogEntry version = ids.getMaxVersion( entityId );
-
- //version wasn't found ,deindex
- if ( version == null ) {
- logger.warn( "Version for Entity {}:{} not found",
- entityId.getUuid(), entityId.getUuid() );
-
- return false;
- }
-
- final UUID savedVersion = version.getVersion();
-
- if ( UUIDComparator.staticCompare( savedVersion, candidateResult.getVersion() ) > 0 ) {
- logger.debug( "Stale version of Entity uuid:{} type:{}, stale v:{}, latest v:{}",
- new Object[] {
- entityId.getUuid(),
- entityId.getType(),
- candidateResult.getVersion(),
- savedVersion
- } );
-
- return false;
- }
-
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
deleted file mode 100644
index 6230147..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityIdFilter.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Filter;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
-
-import rx.Observable;
-
-
-/**
- * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
- * be removed
- */
-public class EntityIdFilter extends AbstractPipelineOperation<Id, Id> implements Filter<Id, Id> {
-
- private final Id entityId;
-
-
- @Inject
- public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
-
-
-
-
- @Override
- public Observable<Id> call( final Observable<Id> idObservable ) {
- return Observable.just( entityId );
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
deleted file mode 100644
index dd6b9b8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/entity/EntityLoadCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.pipeline.read.entity;
-
-
-import java.util.List;
-
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
-import org.apache.usergrid.corepersistence.pipeline.read.Collector;
-import org.apache.usergrid.corepersistence.pipeline.read.ResultsPage;
-import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
-import org.apache.usergrid.persistence.collection.EntitySet;
-import org.apache.usergrid.persistence.collection.MvccEntity;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-
-/**
- * Loads entities from a set of Ids.
- *
- * TODO refactor this into a common command that both ES search and graphSearch can use for repair and verification
- */
-public class EntityLoadCollector extends AbstractPipelineOperation<Id, ResultsPage>
- implements Collector<Id, ResultsPage> {
-
- private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-
-
- @Inject
- public EntityLoadCollector( final EntityCollectionManagerFactory entityCollectionManagerFactory ) {
- this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- }
-
-
- @Override
- public Observable<ResultsPage> call( final Observable<Id> observable ) {
-
-
- final EntityCollectionManager entityCollectionManager =
- entityCollectionManagerFactory.createCollectionManager( pipelineContext.getApplicationScope() );
-
- final Observable<EntitySet> entitySetObservable = observable.buffer( pipelineContext.getLimit() ).flatMap(
- bufferedIds -> Observable.just( bufferedIds ).flatMap( ids -> entityCollectionManager.load( ids ) ) );
-
-
- final Observable<ResultsPage> resultsObservable = entitySetObservable
-
- .flatMap( entitySet -> {
-
- //get our entites and filter missing ones, then collect them into a results object
- final Observable<MvccEntity> mvccEntityObservable = Observable.from( entitySet.getEntities() );
-
-
- //convert them to our old entity model, then filter abscent, meaning they weren't found
- final Observable<List<Entity>> entitiesPageObservable =
- mvccEntityObservable.filter( mvccEntity -> mvccEntity.getEntity().isPresent() )
- .map( mvccEntity -> mvccEntity.getEntity().get() ).toList();
-
- //convert them to a list, then map them into results
- return entitiesPageObservable.map( entities -> new ResultsPage( entities ) );
- } );
-
-
- return resultsObservable;
- }
-
- /**
- * Map a new cp entity to an old entity. May be null if not present
- */
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
index e0f69cf..42b352b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphEdgeByIdFilter.java
@@ -20,8 +20,9 @@
package org.apache.usergrid.corepersistence.pipeline.read.graph;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractPipelineOperation;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.SearchByEdge;
@@ -39,7 +40,7 @@ import rx.Observable;
/**
* Filter should take and Id and a graph edge, and ensure the connection between the two exists
*/
-public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOperation<Id, Id> implements
+public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractFilter<Id, Id> implements
Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
@@ -55,12 +56,13 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
@Override
- public Observable<Id> call( final Observable<Id> idObservable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
final GraphManager gm = graphManagerFactory.createEdgeManager( pipelineContext.getApplicationScope() );
- return idObservable.flatMap( id -> {
+ return filterValueObservable.flatMap( filterValue -> {
final String edgeTypeName = getEdgeName();
+ final Id id = filterValue.getValue();
//create our search
final SearchByEdge searchByEdge =
@@ -68,7 +70,7 @@ public abstract class AbstractReadGraphEdgeByIdFilter extends AbstractPipelineOp
Optional.absent() );
//load the versions of the edge, take the first since that's all we need to validate existence, then emit the target node
- return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() );
+ return gm.loadEdgeVersions( searchByEdge ).take( 1 ).map( edge -> edge.getTargetNode() ).map( targetId -> new FilterResult<>(targetId, filterValue.getPath()));
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
index 4dd34fc..503fcf9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/AbstractReadGraphFilter.java
@@ -21,8 +21,9 @@ package org.apache.usergrid.corepersistence.pipeline.read.graph;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializer;
-import org.apache.usergrid.corepersistence.pipeline.read.AbstractSeekingFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractPathFilter;
import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
@@ -38,7 +39,7 @@ import rx.Observable;
/**
* Command for reading graph edges
*/
-public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id, Id, Edge> implements Filter<Id, Id> {
+public abstract class AbstractReadGraphFilter extends AbstractPathFilter<Id, Id, Edge> implements Filter<Id, Id> {
private final GraphManagerFactory graphManagerFactory;
@@ -52,7 +53,8 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
@Override
- public Observable<Id> call( final Observable<Id> observable ) {
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> previousIds ) {
+
//get the graph manager
final GraphManager graphManager =
@@ -63,10 +65,11 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
//return all ids that are emitted from this edge
- return observable.flatMap( id -> {
+ return previousIds.flatMap( previousFilterValue -> {
//set our our constant state
final Optional<Edge> startFromCursor = getSeekValue();
+ final Id id = previousFilterValue.getValue();
final SimpleSearchByEdgeType search =
@@ -78,9 +81,9 @@ public abstract class AbstractReadGraphFilter extends AbstractSeekingFilter<Id,
*/
return graphManager.loadEdgesFromSource( search )
//set our cursor every edge we traverse
- .doOnNext( edge -> setCursor( edge ) )
+
//map our id from the target edge
- .map( edge -> edge.getTargetNode() );
+ .map( edge -> createFilterResult( edge.getTargetNode(), edge, previousFilterValue.getPath() ) );
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cd983d66/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
new file mode 100644
index 0000000..5a0e026
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/graph/EntityIdFilter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read.graph;
+
+
+import org.apache.usergrid.corepersistence.pipeline.read.AbstractFilter;
+import org.apache.usergrid.corepersistence.pipeline.read.Filter;
+import org.apache.usergrid.corepersistence.pipeline.read.FilterResult;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import rx.Observable;
+
+
+/**
+ * This command is a stopgap to make migrating 1.0 code easier. Once full traversal has been implemented, this should
+ * be removed
+ */
+public class EntityIdFilter extends AbstractFilter<Id, Id> implements Filter<Id, Id> {
+
+ private final Id entityId;
+
+
+ @Inject
+ public EntityIdFilter( @Assisted final Id entityId ) {this.entityId = entityId;}
+
+
+
+ @Override
+ public Observable<FilterResult<Id>> call( final Observable<FilterResult<Id>> filterValueObservable ) {
+ //ignore what our input was, and simply emit the id specified
+ return filterValueObservable.map( idFilterResult -> new FilterResult( entityId, idFilterResult.getPath() ));
+
+ }
+}