You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/06/10 18:45:42 UTC
[20/35] incubator-usergrid git commit: new index strategy
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
index 5c926c8..d52ef21 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexServiceTest.java
@@ -40,7 +40,7 @@ import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.CandidateResults;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexFig;
@@ -86,6 +86,9 @@ public class IndexServiceTest {
public EntityIndexFactory entityIndexFactory;
@Inject
+ public IndexLocationStrategyFactory indexLocationStrategyFactory;
+
+ @Inject
public IndexFig indexFig;
public GraphManager graphManager;
@@ -172,13 +175,13 @@ public class IndexServiceTest {
assertEquals(1, batches);
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
+ final AliasedEntityIndex AliasedEntityIndex =
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
//query until the collection edge is available
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
- final CandidateResults collectionResults = getResults( applicationEntityIndex, collectionSearchEdge,
+ final CandidateResults collectionResults = getResults( AliasedEntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1);
assertEquals( 1, collectionResults.size() );
@@ -189,7 +192,7 @@ public class IndexServiceTest {
//query until the connection edge is available
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
- final CandidateResults connectionResults = getResults( applicationEntityIndex, connectionSearchEdge,
+ final CandidateResults connectionResults = getResults( AliasedEntityIndex, connectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, connectionResults.size() );
@@ -259,14 +262,14 @@ public class IndexServiceTest {
assertEquals(expectedSize, batches);
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
+ final AliasedEntityIndex AliasedEntityIndex =
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
//query until it's available
- final CandidateResults collectionResults = getResults( applicationEntityIndex, collectionSearchEdge,
+ final CandidateResults collectionResults = getResults( AliasedEntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, collectionResults.size() );
@@ -278,7 +281,7 @@ public class IndexServiceTest {
//query until it's available
- final CandidateResults connectionResults = getResults( applicationEntityIndex, connectionSearchEdge,
+ final CandidateResults connectionResults = getResults( AliasedEntityIndex, connectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, connectionResults.size() );
@@ -290,7 +293,7 @@ public class IndexServiceTest {
//query until it's available
- final CandidateResults lastConnectionResults = getResults( applicationEntityIndex, lastConnectionSearchEdge,
+ final CandidateResults lastConnectionResults = getResults( AliasedEntityIndex, lastConnectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
assertEquals( 1, lastConnectionResults.size() );
@@ -313,8 +316,8 @@ public class IndexServiceTest {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
+ final AliasedEntityIndex AliasedEntityIndex =
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
@@ -341,7 +344,7 @@ public class IndexServiceTest {
assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
//ensure that no edges remain
- final CandidateResults connectionResultsEmpty = applicationEntityIndex.search( connectionSearchEdge,
+ final CandidateResults connectionResultsEmpty = AliasedEntityIndex.search( connectionSearchEdge,
SearchTypes.fromTypes( "things" ),"select *",10,0 );
assertEquals(0,connectionResultsEmpty.size());
@@ -353,8 +356,8 @@ public class IndexServiceTest {
ApplicationScope applicationScope =
new ApplicationScopeImpl( new SimpleId( UUID.randomUUID(), "application" ) );
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
+ final AliasedEntityIndex AliasedEntityIndex =
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final GraphManager graphManager = graphManagerFactory.createEdgeManager( applicationScope );
@@ -373,14 +376,14 @@ public class IndexServiceTest {
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
- getResults( applicationEntityIndex, collectionSearchEdge,
+ getResults( AliasedEntityIndex, collectionSearchEdge,
SearchTypes.fromTypes( testEntity.getId().getType() ), 1 );
for(int i = 0; i < edgeCount; i++) {
//query until results are available for connections
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearchEdges.get( i ) );
- getResults( applicationEntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
+ getResults( AliasedEntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
}
@@ -399,7 +402,7 @@ public class IndexServiceTest {
assertEquals( 1, indexOperationMessage.getDeIndexRequests().size() );
//ensure that no edges remain
- final CandidateResults connectionResultsEmpty = applicationEntityIndex.search( connectionSearchEdge,
+ final CandidateResults connectionResultsEmpty = AliasedEntityIndex.search( connectionSearchEdge,
SearchTypes.fromTypes( "things" ),"select *",10,0 );
assertEquals(0,connectionResultsEmpty.size());
@@ -419,8 +422,8 @@ public class IndexServiceTest {
final EntityCollectionManager collectionManager =
entityCollectionManagerFactory.createCollectionManager( applicationScope );
- final ApplicationEntityIndex applicationEntityIndex =
- entityIndexFactory.createApplicationEntityIndex( applicationScope );
+ final AliasedEntityIndex AliasedEntityIndex =
+ entityIndexFactory.createEntityIndex(indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) );
final Edge collectionEdge =
createEntityandCollectionEdge( applicationScope, graphManager, testEntity );
@@ -437,12 +440,12 @@ public class IndexServiceTest {
//query until results are available for collections
final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge );
- getResults( applicationEntityIndex, collectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
+ getResults( AliasedEntityIndex, collectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
//query until results are available for connections
final SearchEdge connectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( connectionSearch );
- getResults( applicationEntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
+ getResults( AliasedEntityIndex, connectionSearchEdge, SearchTypes.fromTypes( testEntity.getId().getType() ),
1 );
return connectionSearch;
@@ -492,7 +495,7 @@ public class IndexServiceTest {
}
- private CandidateResults getResults( final ApplicationEntityIndex applicationEntityIndex,
+ private CandidateResults getResults( final AliasedEntityIndex AliasedEntityIndex,
final SearchEdge searchEdge, final SearchTypes searchTypes,
final int expectedSize ) {
final int attempts = 100;
@@ -500,7 +503,7 @@ public class IndexServiceTest {
String ql = "select *";
for ( int i = 0; i < attempts; i++ ) {
final CandidateResults candidateResults =
- applicationEntityIndex.search( searchEdge, searchTypes, ql , 100, 0 );
+ AliasedEntityIndex.search( searchEdge, searchTypes, ql , 100, 0 );
if ( candidateResults.size() == expectedSize ) {
return candidateResults;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 318a378..560971f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -37,7 +38,7 @@ import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.AliasedEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -329,11 +330,14 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private void deleteIndex( UUID appUuid ) {
Injector injector = SpringResource.getInstance().getBean( Injector.class );
+ IndexLocationStrategyFactory indexLocationStrategyFactory = injector.getInstance(IndexLocationStrategyFactory.class);
EntityIndexFactory eif = injector.getInstance( EntityIndexFactory.class );
Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
ApplicationScope scope = new ApplicationScopeImpl( appId );
- ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope );
+ AliasedEntityIndex ei = eif.createEntityIndex(
+ indexLocationStrategyFactory.getIndexLocationStrategy(scope)
+ );
ei.deleteApplication().toBlocking().lastOrDefault( null );
app.refreshIndex();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
index c4a16b6..d882f78 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/query/IteratingQueryIT.java
@@ -438,6 +438,7 @@ public class IteratingQueryIT {
}
}
+ this.app.refreshIndex();
long stop = System.currentTimeMillis();
LOG.info( "Writes took {} ms", stop - start );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
deleted file mode 100644
index b392f3c..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ApplicationEntityIndex.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import rx.Observable;
-
-/**
- * Entity Index for an Application.
- */
-public interface ApplicationEntityIndex {
-
-
- /**
- * Create the index batch.
- */
- EntityIndexBatch createBatch();
-
- /**
- * Search on every document in the specified search edge. Also search by the types if specified
- * @param searchEdge The edge to search on
- * @param searchTypes The search types to search
- * @param query The query to execute
- * @param limit The limit of values to return
- * @param offset The offset to query on
- * @return
- */
- CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
- final int limit, final int offset );
-
-
- /**
- * Same as search, just iterates all documents that match the index edge exactly.
- * @param edge The edge to search on
- * @param entityId The entity that the searchEdge is connected to.
- * @return
- */
- CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
-
- /**
- * Returns all entity documents that match the entityId and come before the marked version
- * @param entityId The entityId to match when searching
- * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
- * @param limit The limit of the values to return per search.
- * @param offset The offset to page the query on.
- * @return
- */
- CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion );
-
- /**
- * delete all application records
- * @return
- */
- Observable deleteApplication();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
index 0511b75..4d18cef 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/ElasticSearchQueryBuilder/SearchRequestBuilderStrategyV2.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.impl.EsProvider;
-import org.apache.usergrid.persistence.index.impl.IndexAlias;
+import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.impl.IndexingUtils;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
index e99a9f7..93b16c4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndex.java
@@ -22,8 +22,11 @@ package org.apache.usergrid.persistence.index;
import org.apache.usergrid.persistence.core.CPManager;
import org.apache.usergrid.persistence.core.util.Health;
+import org.apache.usergrid.persistence.model.entity.Id;
import rx.Observable;
+import java.util.UUID;
+
/**
* Provides management operations for single index
@@ -42,14 +45,12 @@ public interface EntityIndex extends CPManager {
*/
void addIndex(final String indexSuffix, final int shards, final int replicas, final String writeConsistency);
-
/**
* Refresh the index.
*/
Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync();
-
/**
* Check health of cluster.
*/
@@ -67,6 +68,47 @@ public interface EntityIndex extends CPManager {
*/
void initialize();
+
+ /**
+ * Create the index batch.
+ */
+ EntityIndexBatch createBatch();
+
+ /**
+ * Search on every document in the specified search edge. Also search by the types if specified
+ * @param searchEdge The edge to search on
+ * @param searchTypes The search types to search
+ * @param query The query to execute
+ * @param limit The limit of values to return
+ * @param offset The offset to query on
+ * @return
+ */
+ CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
+ final int limit, final int offset );
+
+
+ /**
+ * Same as search, just iterates all documents that match the index edge exactly.
+ * @param edge The edge to search on
+ * @param entityId The entity that the searchEdge is connected to.
+ * @return
+ */
+ CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId );
+
+ /**
+ * Returns all entity documents that match the entityId and come before the marked version
+ * @param entityId The entityId to match when searching
+ * @param markedVersion The version that has been marked for deletion. All version before this one must be deleted.
+ * @return
+ */
+ CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion );
+
+ /**
+ * delete all application records
+ * @return
+ */
+ Observable deleteApplication();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
index 6490bb7..e4401c5 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexFactory.java
@@ -19,11 +19,6 @@
package org.apache.usergrid.persistence.index;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import com.google.inject.assistedinject.Assisted;
-import org.apache.usergrid.persistence.index.impl.IndexLocationStrategy;
-
-
public interface EntityIndexFactory {
/**
@@ -31,7 +26,7 @@ public interface EntityIndexFactory {
* @param indexLocationStrategy
* @return
*/
- ApplicationEntityIndex createApplicationEntityIndex( IndexLocationStrategy indexLocationStrategy );
+ AliasedEntityIndex createEntityIndex( IndexLocationStrategy indexLocationStrategy );
/**
* Invalidate the cache of our factory, and force the generation of new entity index instances
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
new file mode 100644
index 0000000..f33783d
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+
+import org.apache.usergrid.persistence.index.IndexFig;
+
+
+/**
+ * Abstraction for Index alias names
+ */
+public class IndexAlias{
+ private final String readAlias;
+ private final String writeAlias;
+
+ /**
+ *
+ * @param indexFig config
+ * @param aliasPrefix alias prefix, e.g. app_id etc..
+ */
+ public IndexAlias(IndexFig indexFig,String aliasPrefix) {
+ this.writeAlias = aliasPrefix + "_write_" + indexFig.getAliasPostfix();
+ this.readAlias = aliasPrefix + "_read_" + indexFig.getAliasPostfix();
+ }
+
+ public String getReadAlias() {
+ return readAlias;
+ }
+
+ public String getWriteAlias() {
+ return writeAlias;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 39714f2..5c61928 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -199,4 +199,7 @@ public interface IndexFig extends GuicyFig {
@Key( ELASTICSEARCH_WRITE_TIMEOUT )
long getWriteTimeout();
+ @Default( "usergrid_management" )
+ @Key( "elasticsearch.managment_index" )
+ String getManagementAppIndexName();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
deleted file mode 100644
index 52c7fea..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexIdentifier.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.index;
-
-
-import org.apache.usergrid.persistence.index.impl.IndexAlias;
-
-/**
- * Identifier for where an index is in underlying server
- */
-public interface IndexIdentifier {
-
- /**
- * get the alias name
- * @return
- */
- IndexAlias getAlias();
-
- /**
- * get index name from suffix
- * @param suffix
- * @return
- */
- String getIndex( String suffix );
-
- /**
- * return unique string
- * @return
- */
- String toString();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
new file mode 100644
index 0000000..b779ae9
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+/**
+ * Classy class class.
+ */
+public interface IndexLocationStrategy {
+ /**
+ * get the alias name
+ * @return
+ */
+ IndexAlias getAlias();
+
+ /**
+ * get index name from suffix
+ * @param suffix
+ * @return
+ */
+ String getIndex( String suffix );
+
+ /**
+ * return unique string
+ * @return
+ */
+ String toString();
+
+ ApplicationScope getApplicationScope();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
index af7f814..435a94a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexRefreshCommand.java
@@ -26,7 +26,7 @@ import rx.Observable;
*/
public interface IndexRefreshCommand {
- Observable<IndexRefreshCommandInfo> execute(String[] indexes);
+ Observable<IndexRefreshCommandInfo> execute(IndexAlias alias, String[] indexes);
class IndexRefreshCommandInfo{
private final boolean hasFinished;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
index 92f1c9b..b22a461 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/guice/IndexModule.java
@@ -48,12 +48,8 @@ public abstract class IndexModule extends AbstractModule {
install(new QueueModule());
bind( EntityIndexFactory.class ).to( EsEntityIndexFactoryImpl.class );
- bind(AliasedEntityIndex.class).to(EsEntityIndexImpl.class);
- bind(EntityIndex.class).to(EsEntityIndexImpl.class).asEagerSingleton();
bind(IndexCache.class).to(EsIndexCacheImpl.class);
bind(IndexRefreshCommand.class).to(IndexRefreshCommandImpl.class);
- bind(IndexIdentifier.class).to(IndexIdentifierv2Impl.class);
-
bind(IndexBufferConsumer.class).to(EsIndexBufferConsumerImpl.class).asEagerSingleton();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
deleted file mode 100644
index 74bb33c..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsApplicationEntityIndexImpl.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.usergrid.persistence.index.*;
-import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.ListenableActionFuture;
-import org.elasticsearch.action.ShardOperationFailedException;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
-import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.search.SearchScrollRequestBuilder;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.index.query.FilterBuilder;
-import org.elasticsearch.index.query.FilterBuilders;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
-import org.apache.usergrid.persistence.index.query.ParsedQuery;
-import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
-import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
-import org.apache.usergrid.persistence.map.MapManager;
-import org.apache.usergrid.persistence.map.MapManagerFactory;
-import org.apache.usergrid.persistence.map.MapScope;
-import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-
-import rx.Observable;
-
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.APPLICATION_ID_FIELDNAME;
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.applicationId;
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseIndexDocId;
-
-
-/**
- * Classy class class.
- */
-public class EsApplicationEntityIndexImpl implements ApplicationEntityIndex {
-
- private static final Logger logger = LoggerFactory.getLogger( EsApplicationEntityIndexImpl.class );
-
-
- private final ApplicationScope applicationScope;
- private final IndexIdentifier indexIdentifier;
- private final Timer searchTimer;
- private final AliasedEntityIndex entityIndex;
- private final IndexBufferConsumer indexBatchBufferProducer;
- private final EsProvider esProvider;
- private final IndexAlias alias;
- private final Timer deleteApplicationTimer;
- private final Meter deleteApplicationMeter;
- private final SearchRequestBuilderStrategy searchRequest;
- private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
- private FailureMonitor failureMonitor;
- private final int cursorTimeout;
- private final long queryTimeout;
-
-
- @Inject
- public EsApplicationEntityIndexImpl( final AliasedEntityIndex entityIndex,
- final IndexFig config,
- final IndexBufferConsumer indexBatchBufferProducer,
- final EsProvider provider,
- final MetricsFactory metricsFactory,
- final IndexLocationStrategy indexLocationStrategy
- ) {
- this.entityIndex = entityIndex;
- this.indexBatchBufferProducer = indexBatchBufferProducer;
- this.indexIdentifier = indexLocationStrategy.getIndexIdentifier();
- this.failureMonitor = new FailureMonitorImpl( config, provider );
- this.esProvider = provider;
-
- this.searchTimer = metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "search.timer" );
- this.cursorTimeout = config.getQueryCursorTimeout();
- this.queryTimeout = config.getWriteTimeout();
-
- this.deleteApplicationTimer =
- metricsFactory.getTimer( EsApplicationEntityIndexImpl.class, "delete.application" );
- this.deleteApplicationMeter =
- metricsFactory.getMeter( EsApplicationEntityIndexImpl.class, "delete.application.meter" );
-
- this.alias = indexIdentifier.getAlias();
-
- this.applicationScope = indexLocationStrategy.getApplicationScope();
- this.searchRequest = new SearchRequestBuilderStrategy( esProvider, applicationScope, alias, cursorTimeout );
- this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, applicationScope, alias, cursorTimeout );
- }
-
-
- @Override
- public EntityIndexBatch createBatch() {
- EntityIndexBatch batch =
- new EsEntityIndexBatchImpl( applicationScope, indexBatchBufferProducer, entityIndex, indexIdentifier );
- return batch;
- }
-
- public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
- final int limit, final int offset ) {
-
- IndexValidationUtils.validateSearchEdge(searchEdge);
- Preconditions.checkNotNull( searchTypes, "searchTypes cannot be null" );
- Preconditions.checkNotNull( query, "query cannot be null" );
- Preconditions.checkArgument( limit > 0, "limit must be > 0" );
-
-
- SearchResponse searchResponse;
-
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build( query );
-
- final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
- .setTimeout( TimeValue.timeValueMillis( queryTimeout ) );
-
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
- this.alias.getReadAlias(), searchEdge.getNodeId(), searchEdge.getEdgeName(),
- searchTypes.getTypeNames( applicationScope ), srb );
- }
-
- try {
- //Added For Graphite Metrics
- Timer.Context timeSearch = searchTimer.time();
- searchResponse = srb.execute().actionGet();
- timeSearch.stop();
- }
- catch ( Throwable t ) {
- logger.error( "Unable to communicate with Elasticsearch", t );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
- failureMonitor.success();
-
- return parseResults(searchResponse, parsedQuery, limit, offset);
- }
-
-
- @Override
- public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ) {
- /**
- * Take a list of IndexEdge, with an entityId
- and query Es directly for matches
-
- */
- IndexValidationUtils.validateSearchEdge( edge );
- Preconditions.checkNotNull( entityId, "entityId cannot be null" );
-
- SearchResponse searchResponse;
-
- List<CandidateResult> candidates = new ArrayList<>();
-
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
-
- final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
- //I can't just search on the entity Id.
-
- FilterBuilder entityEdgeFilter = FilterBuilders.termFilter( IndexingUtils.EDGE_NODE_ID_FIELDNAME,
- IndexingUtils.nodeId( edge.getNodeId() ));
-
- srb.setPostFilter(entityEdgeFilter);
-
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
- this.alias.getReadAlias(),entityId, srb );
- }
-
- try {
- //Added For Graphite Metrics
- Timer.Context timeSearch = searchTimer.time();
-
- //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
- //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
- //TODO: review this and make them not magic numbers when acking this PR.
- searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
-
-
- while(true){
- //add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse );
-
- SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
- .getScrollBuilder( searchResponse.getScrollId() )
- .setScroll( new TimeValue( 6000 ) );
-
- //TODO: figure out how to log exactly what we're putting into elasticsearch
- // if ( logger.isDebugEnabled() ) {
- // logger.debug( "Scroll search using query: {} ",
- // ssrb.toString() );
- // }
-
- searchResponse = ssrb.execute().actionGet();
-
- if (searchResponse.getHits().getHits().length == 0) {
- break;
- }
-
-
- }
- timeSearch.stop();
- }
- catch ( Throwable t ) {
- logger.error( "Unable to communicate with Elasticsearch", t );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
- failureMonitor.success();
-
- return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
- }
-
-
- @Override
- public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
-
- Preconditions.checkNotNull( entityId, "entityId cannot be null" );
- Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
- ValidationUtils.verifyVersion( markedVersion );
-
- SearchResponse searchResponse;
-
- List<CandidateResult> candidates = new ArrayList<>();
-
- final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
-
- final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
-
- FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
- IndexingUtils.entityId( entityId ) );
-
- FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
-
- FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter );
-
- srb.setPostFilter(andFilter);
-
-
-
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
- this.alias.getReadAlias(),entityId, srb );
- }
-
- try {
- //Added For Graphite Metrics
- Timer.Context timeSearch = searchTimer.time();
-
- //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
- //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
- //TODO: review this and make them not magic numbers when acking this PR.
- searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
-
- //list that will hold all of the search hits
-
-
- while(true){
- //add search result hits to some sort of running tally of hits.
- candidates = aggregateScrollResults( candidates, searchResponse );
-
- SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
- .getScrollBuilder( searchResponse.getScrollId() )
- .setScroll( new TimeValue( 6000 ) );
-
- //TODO: figure out how to log exactly what we're putting into elasticsearch
-// if ( logger.isDebugEnabled() ) {
-// logger.debug( "Scroll search using query: {} ",
-// ssrb.toString() );
-// }
-
- searchResponse = ssrb.execute().actionGet();
-
- if (searchResponse.getHits().getHits().length == 0) {
- break;
- }
-
-
- }
- timeSearch.stop();
- }
- catch ( Throwable t ) {
- logger.error( "Unable to communicate with Elasticsearch", t );
- failureMonitor.fail( "Unable to execute batch", t );
- throw t;
- }
- failureMonitor.success();
-
- return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
- }
-
-
- /**
- * Completely delete an index.
- */
- public Observable deleteApplication() {
- deleteApplicationMeter.mark();
- String idString = applicationId( applicationScope.getApplication() );
- final TermQueryBuilder tqb = QueryBuilders.termQuery( APPLICATION_ID_FIELDNAME, idString );
- final String[] indexes = entityIndex.getUniqueIndexes();
- Timer.Context timer = deleteApplicationTimer.time();
- //Added For Graphite Metrics
- return Observable.from( indexes ).flatMap( index -> {
-
- final ListenableActionFuture<DeleteByQueryResponse> response =
- esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
-
- response.addListener( new ActionListener<DeleteByQueryResponse>() {
-
- @Override
- public void onResponse( DeleteByQueryResponse response ) {
- checkDeleteByQueryResponse( tqb, response );
- }
-
-
- @Override
- public void onFailure( Throwable e ) {
- logger.error( "failed on delete index", e );
- }
- } );
- return Observable.from( response );
- } ).doOnError( t -> logger.error( "Failed on delete application", t ) ).doOnCompleted( () -> timer.stop() );
- }
-
-
- /**
- * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
- */
- private void checkDeleteByQueryResponse( final QueryBuilder query, final DeleteByQueryResponse response ) {
-
- for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
- final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
-
- for ( ShardOperationFailedException failedException : failures ) {
- logger.error( String.format( "Unable to delete by query %s. "
- + "Failed with code %d and reason %s on shard %s in index %s", query.toString(),
- failedException.status().getStatus(), failedException.reason(),
- failedException.shardId(), failedException.index() ) );
- }
- }
- }
-
-
- /**
- * Parse the results and return the canddiate results
- */
- private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
- final int limit, final int from ) {
-
- final SearchHits searchHits = searchResponse.getHits();
- final SearchHit[] hits = searchHits.getHits();
-
- logger.debug( " Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
-
- List<CandidateResult> candidates = new ArrayList<>( hits.length );
-
- for ( SearchHit hit : hits ) {
-
- final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
-
- candidates.add( candidateResult );
- }
-
- final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
-
- // >= seems odd. However if we get an overflow, we need to account for it.
- if ( hits.length >= limit ) {
-
- candidateResults.initializeOffset( from + limit );
-
- }
-
- return candidateResults;
- }
-
- private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
- final SearchResponse searchResponse ){
-
- final SearchHits searchHits = searchResponse.getHits();
- final SearchHit[] hits = searchHits.getHits();
-
- for ( SearchHit hit : hits ) {
-
- final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
-
- candidates.add( candidateResult );
- }
-
- logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );
-
- return candidates;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 972a43c..dc2813d 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -37,26 +37,27 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private static final Logger log = LoggerFactory.getLogger( EsEntityIndexBatchImpl.class );
- private final ApplicationScope applicationScope;
private final IndexAlias alias;
- private final IndexIdentifier indexIdentifier;
+ private final IndexLocationStrategy indexIdentifier;
private final IndexBufferConsumer indexBatchBufferProducer;
private final AliasedEntityIndex entityIndex;
+ private final ApplicationScope applicationScope;
private IndexOperationMessage container;
- public EsEntityIndexBatchImpl( final ApplicationScope applicationScope,
+ public EsEntityIndexBatchImpl( final IndexLocationStrategy locationStrategy,
final IndexBufferConsumer indexBatchBufferProducer,
- final AliasedEntityIndex entityIndex,
- IndexIdentifier indexIdentifier ) {
+ final AliasedEntityIndex entityIndex
+ ) {
+ this.indexIdentifier = locationStrategy;
- this.applicationScope = applicationScope;
this.indexBatchBufferProducer = indexBatchBufferProducer;
this.entityIndex = entityIndex;
- this.indexIdentifier = indexIdentifier;
+ this.applicationScope = indexIdentifier.getApplicationScope();
+
this.alias = indexIdentifier.getAlias();
//constrained
this.container = new IndexOperationMessage();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
index 4b1d062..3910b22 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexFactoryImpl.java
@@ -23,7 +23,6 @@ package org.apache.usergrid.persistence.index.impl;
import java.util.concurrent.ExecutionException;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.*;
import com.google.common.cache.CacheBuilder;
@@ -37,38 +36,49 @@ import com.google.inject.Inject;
public class EsEntityIndexFactoryImpl implements EntityIndexFactory{
private final IndexFig config;
+ private final IndexCache indexCache;
private final EsProvider provider;
- private final IndexBufferConsumer indexBatchBufferProducer;
+ private final IndexBufferConsumer indexBufferConsumer;
private final MetricsFactory metricsFactory;
- private final AliasedEntityIndex entityIndex;
+ private final IndexRefreshCommand refreshCommand;
- private LoadingCache<IndexLocationStrategy, ApplicationEntityIndex> eiCache =
- CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<IndexLocationStrategy, ApplicationEntityIndex>() {
- public ApplicationEntityIndex load( IndexLocationStrategy locationStrategy ) {
- return new EsApplicationEntityIndexImpl(
- entityIndex,config, indexBatchBufferProducer, provider, metricsFactory, locationStrategy
+ private LoadingCache<IndexLocationStrategy, AliasedEntityIndex> eiCache =
+ CacheBuilder.newBuilder().maximumSize( 1000 ).build( new CacheLoader<IndexLocationStrategy, AliasedEntityIndex>() {
+ public AliasedEntityIndex load( IndexLocationStrategy locationStrategy ) {
+ AliasedEntityIndex index = new EsEntityIndexImpl(
+ provider,
+ indexCache,
+ config,
+ refreshCommand,
+ metricsFactory,
+ indexBufferConsumer,
+ locationStrategy
);
+ return index;
}
} );
@Inject
- public EsEntityIndexFactoryImpl( final IndexFig indexFig, final EsProvider provider,
- final IndexBufferConsumer indexBatchBufferProducer,
+ public EsEntityIndexFactoryImpl( final IndexFig indexFig,
+ final IndexCache indexCache,
+ final EsProvider provider,
+ final IndexBufferConsumer indexBufferConsumer,
final MetricsFactory metricsFactory,
- final AliasedEntityIndex entityIndex
+ final IndexRefreshCommand refreshCommand
){
this.config = indexFig;
+ this.indexCache = indexCache;
this.provider = provider;
- this.indexBatchBufferProducer = indexBatchBufferProducer;
+ this.indexBufferConsumer = indexBufferConsumer;
this.metricsFactory = metricsFactory;
- this.entityIndex = entityIndex;
+ this.refreshCommand = refreshCommand;
}
@Override
- public ApplicationEntityIndex createApplicationEntityIndex(final IndexLocationStrategy indexLocationStrategy) {
+ public AliasedEntityIndex createEntityIndex(final IndexLocationStrategy indexLocationStrategy) {
try{
return eiCache.get(indexLocationStrategy);
}catch (ExecutionException ee){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 5a4878f..590baf9 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index.impl;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Resources;
import com.google.inject.Inject;
@@ -29,21 +30,40 @@ import com.google.inject.Singleton;
import com.google.inject.assistedinject.Assisted;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.migration.data.VersionedData;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
+import org.apache.usergrid.persistence.core.util.ValidationUtils;
import org.apache.usergrid.persistence.index.*;
+import org.apache.usergrid.persistence.index.ElasticSearchQueryBuilder.SearchRequestBuilderStrategyV2;
import org.apache.usergrid.persistence.index.exceptions.IndexException;
import org.apache.usergrid.persistence.index.migration.IndexDataVersions;
+import org.apache.usergrid.persistence.index.query.ParsedQuery;
+import org.apache.usergrid.persistence.index.query.ParsedQueryBuilder;
+import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
+import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ListenableActionFuture;
+import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
+import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
+import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.query.*;
import org.elasticsearch.indices.IndexAlreadyExistsException;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +73,10 @@ import java.io.IOException;
import java.net.URL;
import java.util.*;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.APPLICATION_ID_FIELDNAME;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.applicationId;
+import static org.apache.usergrid.persistence.index.impl.IndexingUtils.parseIndexDocId;
+
/**
* Implements index using ElasticSearch Java API.
@@ -64,6 +88,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
private final IndexAlias alias;
private final IndexFig indexFig;
+ private final IndexLocationStrategy indexLocationStrategy;
private final Timer addTimer;
private final Timer updateAliasTimer;
@@ -84,7 +109,16 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
ImmutableMap.<String, Object>builder().put(IndexingUtils.ENTITY_ID_FIELDNAME, UUIDGenerator.newTimeUUID().toString()).build();
- private final IndexIdentifier indexIdentifier;
+ private final Timer deleteApplicationTimer;
+ private final Meter deleteApplicationMeter;
+ private final ApplicationScope applicationScope;
+ private final SearchRequestBuilderStrategy searchRequest;
+ private final SearchRequestBuilderStrategyV2 searchRequestBuilderStrategyV2;
+ private final int cursorTimeout;
+ private final long queryTimeout;
+ private final IndexBufferConsumer indexBatchBufferProducer;
+ private final FailureMonitorImpl failureMonitor;
+ private final Timer searchTimer;
private IndexCache aliasCache;
private Timer mappingTimer;
@@ -97,25 +131,37 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
final IndexFig indexFig,
final IndexRefreshCommand indexRefreshCommand,
final MetricsFactory metricsFactory,
+ final IndexBufferConsumer indexBatchBufferProducer,
@Assisted final IndexLocationStrategy indexLocationStrategy
) {
this.indexFig = indexFig;
- this.indexIdentifier = indexLocationStrategy.getIndexIdentifier();
-
+ this.indexLocationStrategy = indexLocationStrategy;
+ this.indexBatchBufferProducer = indexBatchBufferProducer;
+ this.failureMonitor = new FailureMonitorImpl( indexFig, provider );
this.esProvider = provider;
this.indexRefreshCommand = indexRefreshCommand;
- this.alias = indexIdentifier.getAlias();
+ this.alias = indexLocationStrategy.getAlias();
this.aliasCache = indexCache;
+ this.applicationScope = indexLocationStrategy.getApplicationScope();
+ this.cursorTimeout = indexFig.getQueryCursorTimeout();
+ this.queryTimeout = indexFig.getWriteTimeout();
+ this.searchRequest = new SearchRequestBuilderStrategy( esProvider, applicationScope, alias, cursorTimeout );
+ this.searchRequestBuilderStrategyV2 = new SearchRequestBuilderStrategyV2( esProvider, applicationScope, alias, cursorTimeout );
+
this.addTimer = metricsFactory
.getTimer(EsEntityIndexImpl.class, "add.timer");
this.updateAliasTimer = metricsFactory
.getTimer(EsEntityIndexImpl.class, "update.alias.timer");
this.mappingTimer = metricsFactory
.getTimer(EsEntityIndexImpl.class, "create.mapping.timer");
-
+ this.deleteApplicationTimer =
+ metricsFactory.getTimer( EsEntityIndexImpl.class, "delete.application" );
+ this.deleteApplicationMeter =
+ metricsFactory.getMeter( EsEntityIndexImpl.class, "delete.application.meter" );
this.refreshIndexMeter = metricsFactory.getMeter(EsEntityIndexImpl.class,"refresh.meter");
-
+ this.searchTimer = metricsFactory.getTimer( EsEntityIndexImpl.class, "search.timer" );
+ initialize();
}
@Override
@@ -123,8 +169,6 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
final int numberOfShards = indexFig.getNumberOfShards();
final int numberOfReplicas = indexFig.getNumberOfReplicas();
- aliasCache.invalidate(alias);
-
if (shouldInitialize()) {
addIndex( null, numberOfShards, numberOfReplicas, indexFig.getWriteConsistencyLevel() );
}
@@ -140,7 +184,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
public void addIndex(final String indexSuffix,final int numberOfShards, final int numberOfReplicas, final String writeConsistency) {
try {
//get index name with suffix attached
- String indexName = indexIdentifier.getIndex(indexSuffix);
+ String indexName = indexLocationStrategy.getIndex(indexSuffix);
//Create index
try {
@@ -199,7 +243,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
Timer.Context timer = updateAliasTimer.time();
try {
Boolean isAck;
- String indexName = indexIdentifier.getIndex(indexSuffix);
+ String indexName = indexLocationStrategy.getIndex(indexSuffix);
final AdminClient adminClient = esProvider.getClient().admin();
String[] indexNames = getIndexes(AliasType.Write);
@@ -216,11 +260,12 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
}
aliasesRequestBuilder = adminClient.indices().prepareAliases();
//Added For Graphite Metrics
- // add read alias
- aliasesRequestBuilder.addAlias(indexName, alias.getReadAlias());
- //Added For Graphite Metrics
//add write alias
aliasesRequestBuilder.addAlias(indexName, alias.getWriteAlias());
+ //Added For Graphite Metrics
+ // add read alias
+ aliasesRequestBuilder.addAlias(indexName, alias.getReadAlias());
+
isAck = aliasesRequestBuilder.execute().actionGet().isAcknowledged();
logger.info("Created new read and write aliases ACK=[{}]", isAck);
aliasCache.invalidate(alias);
@@ -321,7 +366,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
public Observable<IndexRefreshCommand.IndexRefreshCommandInfo> refreshAsync() {
refreshIndexMeter.mark();
- return indexRefreshCommand.execute(getUniqueIndexes());
+ return indexRefreshCommand.execute(alias,getUniqueIndexes());
}
@@ -335,6 +380,306 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
return indexSet.toArray(new String[0]);
}
+ @Override
+ public EntityIndexBatch createBatch() {
+ EntityIndexBatch batch =
+ new EsEntityIndexBatchImpl(indexLocationStrategy , indexBatchBufferProducer, this );
+ return batch;
+ }
+
+ public CandidateResults search( final SearchEdge searchEdge, final SearchTypes searchTypes, final String query,
+ final int limit, final int offset ) {
+
+ IndexValidationUtils.validateSearchEdge(searchEdge);
+ Preconditions.checkNotNull(searchTypes, "searchTypes cannot be null");
+ Preconditions.checkNotNull( query, "query cannot be null" );
+ Preconditions.checkArgument( limit > 0, "limit must be > 0" );
+
+
+ SearchResponse searchResponse;
+
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build(query);
+
+ final SearchRequestBuilder srb = searchRequest.getBuilder( searchEdge, searchTypes, parsedQuery, limit, offset )
+ .setTimeout( TimeValue.timeValueMillis(queryTimeout) );
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Searching index (read alias): {}\n nodeId: {}, edgeType: {}, \n type: {}\n query: {} ",
+ this.alias.getReadAlias(), searchEdge.getNodeId(), searchEdge.getEdgeName(),
+ searchTypes.getTypeNames( applicationScope ), srb );
+ }
+
+ try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = searchTimer.time();
+ searchResponse = srb.execute().actionGet();
+ timeSearch.stop();
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to communicate with Elasticsearch", t );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+ failureMonitor.success();
+
+ return parseResults(searchResponse, parsedQuery, limit, offset);
+ }
+
+
+ @Override
+ public CandidateResults getAllEdgeDocuments( final IndexEdge edge, final Id entityId ) {
+ /**
+ * Take a list of IndexEdge, with an entityId
+ and query Es directly for matches
+
+ */
+ IndexValidationUtils.validateSearchEdge( edge );
+ Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+
+ SearchResponse searchResponse;
+
+ List<CandidateResult> candidates = new ArrayList<>();
+
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+ //I can't just search on the entity Id.
+
+ FilterBuilder entityEdgeFilter = FilterBuilders.termFilter(IndexingUtils.EDGE_NODE_ID_FIELDNAME,
+ IndexingUtils.nodeId(edge.getNodeId()));
+
+ srb.setPostFilter(entityEdgeFilter);
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
+ this.alias.getReadAlias(),entityId, srb );
+ }
+
+ try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = searchTimer.time();
+
+ //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+ //TODO: review this and make them not magic numbers when acking this PR.
+ searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+
+ while(true){
+ //add search result hits to some sort of running tally of hits.
+ candidates = aggregateScrollResults( candidates, searchResponse );
+
+ SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
+ .getScrollBuilder( searchResponse.getScrollId() )
+ .setScroll( new TimeValue( 6000 ) );
+
+ //TODO: figure out how to log exactly what we're putting into elasticsearch
+ // if ( logger.isDebugEnabled() ) {
+ // logger.debug( "Scroll search using query: {} ",
+ // ssrb.toString() );
+ // }
+
+ searchResponse = ssrb.execute().actionGet();
+
+ if (searchResponse.getHits().getHits().length == 0) {
+ break;
+ }
+
+
+ }
+ timeSearch.stop();
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to communicate with Elasticsearch", t );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+ failureMonitor.success();
+
+ return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+ }
+
+
+ @Override
+ public CandidateResults getAllEntityVersionsBeforeMarkedVersion( final Id entityId, final UUID markedVersion ) {
+
+ Preconditions.checkNotNull( entityId, "entityId cannot be null" );
+ Preconditions.checkNotNull( markedVersion, "markedVersion cannot be null" );
+ ValidationUtils.verifyVersion(markedVersion);
+
+ SearchResponse searchResponse;
+
+ List<CandidateResult> candidates = new ArrayList<>();
+
+ final ParsedQuery parsedQuery = ParsedQueryBuilder.build( "select *" );
+
+ final SearchRequestBuilder srb = searchRequestBuilderStrategyV2.getBuilder();
+
+ FilterBuilder entityIdFilter = FilterBuilders.termFilter( IndexingUtils.ENTITY_ID_FIELDNAME,
+ IndexingUtils.entityId( entityId ) );
+
+ FilterBuilder entityVersionFilter = FilterBuilders.rangeFilter( IndexingUtils.ENTITY_VERSION_FIELDNAME ).lte( markedVersion );
+
+ FilterBuilder andFilter = FilterBuilders.andFilter(entityIdFilter,entityVersionFilter );
+
+ srb.setPostFilter(andFilter);
+
+
+
+ if ( logger.isDebugEnabled() ) {
+ logger.debug( "Searching for marked versions in index (read alias): {}\n nodeId: {},\n query: {} ",
+ this.alias.getReadAlias(),entityId, srb );
+ }
+
+ try {
+ //Added For Graphite Metrics
+ Timer.Context timeSearch = searchTimer.time();
+
+ //set the timeout on the scroll cursor to 6 seconds and set the number of values returned per shard to 100.
+ //The settings for the scroll aren't tested and so we aren't sure what vlaues would be best in a production enviroment
+ //TODO: review this and make them not magic numbers when acking this PR.
+ searchResponse = srb.setScroll( new TimeValue( 6000 ) ).setSize( 100 ).execute().actionGet();
+
+ //list that will hold all of the search hits
+
+
+ while(true){
+ //add search result hits to some sort of running tally of hits.
+ candidates = aggregateScrollResults( candidates, searchResponse );
+
+ SearchScrollRequestBuilder ssrb = searchRequestBuilderStrategyV2
+ .getScrollBuilder( searchResponse.getScrollId() )
+ .setScroll( new TimeValue( 6000 ) );
+
+ //TODO: figure out how to log exactly what we're putting into elasticsearch
+// if ( logger.isDebugEnabled() ) {
+// logger.debug( "Scroll search using query: {} ",
+// ssrb.toString() );
+// }
+
+ searchResponse = ssrb.execute().actionGet();
+
+ if (searchResponse.getHits().getHits().length == 0) {
+ break;
+ }
+
+
+ }
+ timeSearch.stop();
+ }
+ catch ( Throwable t ) {
+ logger.error( "Unable to communicate with Elasticsearch", t );
+ failureMonitor.fail( "Unable to execute batch", t );
+ throw t;
+ }
+ failureMonitor.success();
+
+ return new CandidateResults( candidates, parsedQuery.getSelectFieldMappings());
+ }
+
+
+ /**
+ * Completely delete an index.
+ */
+ public Observable deleteApplication() {
+ deleteApplicationMeter.mark();
+ String idString = applicationId( applicationScope.getApplication() );
+ final TermQueryBuilder tqb = QueryBuilders.termQuery(APPLICATION_ID_FIELDNAME, idString);
+ final String[] indexes = getUniqueIndexes();
+ Timer.Context timer = deleteApplicationTimer.time();
+ //Added For Graphite Metrics
+ return Observable.from( indexes ).flatMap( index -> {
+
+ final ListenableActionFuture<DeleteByQueryResponse> response =
+ esProvider.getClient().prepareDeleteByQuery( alias.getWriteAlias() ).setQuery( tqb ).execute();
+
+ response.addListener( new ActionListener<DeleteByQueryResponse>() {
+
+ @Override
+ public void onResponse( DeleteByQueryResponse response ) {
+ checkDeleteByQueryResponse( tqb, response );
+ }
+
+
+ @Override
+ public void onFailure( Throwable e ) {
+ logger.error( "failed on delete index", e );
+ }
+ } );
+ return Observable.from( response );
+ } ).doOnError( t -> logger.error( "Failed on delete application", t ) ).doOnCompleted( () -> timer.stop() );
+ }
+
+
+ /**
+ * Validate the response doesn't contain errors, if it does, fail fast at the first error we encounter
+ */
+ private void checkDeleteByQueryResponse( final QueryBuilder query, final DeleteByQueryResponse response ) {
+
+ for ( IndexDeleteByQueryResponse indexDeleteByQueryResponse : response ) {
+ final ShardOperationFailedException[] failures = indexDeleteByQueryResponse.getFailures();
+
+ for ( ShardOperationFailedException failedException : failures ) {
+ logger.error( String.format( "Unable to delete by query %s. "
+ + "Failed with code %d and reason %s on shard %s in index %s", query.toString(),
+ failedException.status().getStatus(), failedException.reason(),
+ failedException.shardId(), failedException.index() ) );
+ }
+ }
+ }
+
+
+ /**
+ * Parse the results and return the canddiate results
+ */
+ private CandidateResults parseResults( final SearchResponse searchResponse, final ParsedQuery query,
+ final int limit, final int from ) {
+
+ final SearchHits searchHits = searchResponse.getHits();
+ final SearchHit[] hits = searchHits.getHits();
+
+ logger.debug( " Hit count: {} Total hits: {}", hits.length, searchHits.getTotalHits() );
+
+ List<CandidateResult> candidates = new ArrayList<>( hits.length );
+
+ for ( SearchHit hit : hits ) {
+
+ final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
+
+ candidates.add( candidateResult );
+ }
+
+ final CandidateResults candidateResults = new CandidateResults( candidates, query.getSelectFieldMappings());
+
+ // >= seems odd. However if we get an overflow, we need to account for it.
+ if ( hits.length >= limit ) {
+
+ candidateResults.initializeOffset( from + limit );
+
+ }
+
+ return candidateResults;
+ }
+
+ private List<CandidateResult> aggregateScrollResults( List<CandidateResult> candidates,
+ final SearchResponse searchResponse ){
+
+ final SearchHits searchHits = searchResponse.getHits();
+ final SearchHit[] hits = searchHits.getHits();
+
+ for ( SearchHit hit : hits ) {
+
+ final CandidateResult candidateResult = parseIndexDocId( hit.getId() );
+
+ candidates.add( candidateResult );
+ }
+
+ logger.debug( "Aggregated {} out of {} hits ",candidates.size(),searchHits.getTotalHits() );
+
+ return candidates;
+
+ }
/**
* Do the retry operation
@@ -388,7 +733,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
try {
final ActionFuture<ClusterHealthResponse> future = esProvider.getClient().admin().cluster().health(
- new ClusterHealthRequest( new String[] { indexIdentifier.getIndex( null ) } ) );
+ new ClusterHealthRequest( new String[] { indexLocationStrategy.getIndex( null ) } ) );
//only wait 2 seconds max
ClusterHealthResponse chr = future.actionGet(2000);
@@ -408,6 +753,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex,VersionedData {
}
+
/**
* Interface for operations.
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexAlias.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexAlias.java
deleted file mode 100644
index 568ea5f..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexAlias.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index.impl;
-
-
-import org.apache.usergrid.persistence.index.IndexFig;
-
-
-/**
- * Abstraction for Index alias names
- */
-public class IndexAlias{
- private final String readAlias;
- private final String writeAlias;
-
- public IndexAlias(IndexFig indexFig,String indexBase) {
- this.writeAlias = indexBase + "_write_" + indexFig.getAliasPostfix();
- this.readAlias = indexBase + "_read_" + indexFig.getAliasPostfix();
- }
-
- public String getReadAlias() {
- return readAlias;
- }
-
- public String getWriteAlias() {
- return writeAlias;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexCache.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexCache.java
index aafa67e..61eb047 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexCache.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexCache.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.index.impl;
import org.apache.usergrid.persistence.index.AliasedEntityIndex;
+import org.apache.usergrid.persistence.index.IndexAlias;
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ab7c8e21/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierv1Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierv1Impl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierv1Impl.java
deleted file mode 100644
index 34130ce..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexIdentifierv1Impl.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-
-package org.apache.usergrid.persistence.index.impl;
-
-import com.google.inject.Inject;
-
-import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.IndexIdentifier;
-
-
-/**
- * Class is used to generate an index name and alias name
- */
-public class IndexIdentifierv1Impl implements IndexIdentifier {
- private final IndexFig config;
-
- @Inject
- public IndexIdentifierv1Impl(IndexFig config) {
- this.config = config;
- }
-
- /**
- * Get the alias name
- * @return
- */
- @Override
- public IndexAlias getAlias() {
- return new IndexAlias(config,config.getIndexPrefix());
- }
-
- /**
- * Get index name, send in additional parameter to add incremental indexes
- * @param suffix
- * @return
- */
- @Override
- public String getIndex(String suffix) {
- if (suffix != null) {
- return config.getIndexPrefix() + "_" + suffix;
- } else {
- return config.getIndexPrefix();
- }
- }
-
-
- @Override
- public String toString() {
- return "index id"+config.getIndexPrefix();
- }
-}