You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/01 02:09:06 UTC
[5/7] incubator-usergrid git commit: moving back to refresh its faster
moving back to refresh its faster
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac6b394c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac6b394c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac6b394c
Branch: refs/heads/USERGRID-587
Commit: ac6b394c5059cee933a2ee7ea5dd19592338195a
Parents: 5b8a4de
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Apr 30 13:12:11 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Apr 30 13:12:11 2015 -0600
----------------------------------------------------------------------
.../index/impl/IndexRefreshCommandImpl.java | 230 ++++++++++---------
.../persistence/index/impl/EntityIndexTest.java | 2 +-
2 files changed, 127 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 888a805..c8e96db 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -92,121 +92,143 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
@Override
- public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
- final long start = System.currentTimeMillis();
-
+ public Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
Timer.Context refreshTimer = timer.time();
- //id to hunt for
- final UUID uuid = UUIDUtils.newTimeUUID();
-
- final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
- EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
- final Id appId = new SimpleId( "ug_refresh_index" );
- final ApplicationScope appScope = new ApplicationScopeImpl( appId );
- final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
-
-
- final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
-
- final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
-
- final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
-
- //add a tracer record
- IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
-
- //save the item
- final IndexOperationMessage message = new IndexOperationMessage();
- message.addIndexRequest( indexRequest );
- final Observable addRecord = producer.put(message);
- final Observable refresh = refresh(indexes);
-
- /**
- * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search
- */
-
- final SearchRequestBuilder builder =
- esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
-
- //set our filter for entityId fieldname
- .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
-
+ final long start = System.currentTimeMillis();
- //start our processing immediately
final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
- final Observable<IndexRefreshCommandInfo> infoObservable = Observable
- .range(0, indexFig.maxRefreshSearches())
- .map(i ->
- {
- try {
- return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
- } catch (Exception ee) {
- logger.error("Failed during refresh search for " + uuid, ee);
- throw new RuntimeException("Failed during refresh search for " + uuid, ee);
- }
- })
- .takeWhile(info -> info.hasFinished())
- .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS );
-
- final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
- combined.toBlocking().last();
-
- final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
-
+ Boolean worked = refresh(indexes).toBlocking().last();
+ final IndexRefreshCommandInfo info = new IndexRefreshCommandInfo(worked, System.currentTimeMillis() - start);
return info;
- },rxTaskScheduler.getAsyncIOScheduler()).call();
-
+ }, rxTaskScheduler.getAsyncIOScheduler()).call();
+ return future.doOnNext(found -> {
+ if (!found.hasFinished()) {
+ logger.error("Couldn't find record during refresh took ms:{} ", found.getExecutionTime());
+ } else {
+ logger.info("found record during refresh took ms:{} ", found.getExecutionTime());
+ }
+ }).doOnCompleted(() -> {
+ refreshTimer.stop();
+ });
+ }
- return future.doOnNext(found -> {
- if (!found.hasFinished()) {
- logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
- } else {
- logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
- }
- }).doOnCompleted(() -> {
- //clean up our data
- String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
- DeIndexOperation deIndexRequest =
- new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-
- //delete the item
- IndexOperationMessage indexOperationMessage =
- new IndexOperationMessage();
- indexOperationMessage.addDeIndexRequest(deIndexRequest);
- producer.put(indexOperationMessage);
-
- refreshTimer.stop();
- });
- }
+ private void insertRecord(){
+
+// final long start = System.currentTimeMillis();
+//
+// Timer.Context refreshTimer = timer.time();
+// //id to hunt for
+// final UUID uuid = UUIDUtils.newTimeUUID();
+//
+// final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
+// EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+// final Id appId = new SimpleId( "ug_refresh_index" );
+// final ApplicationScope appScope = new ApplicationScopeImpl( appId );
+// final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
+//
+//
+// final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
+//
+// final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
+//
+// final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
+//
+// //add a tracer record
+// IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
+//
+// //save the item
+// final IndexOperationMessage message = new IndexOperationMessage();
+// message.addIndexRequest( indexRequest );
+// final Observable addRecord = producer.put(message);
+// final Observable refresh = refresh(indexes);
+//
+// /**
+// * We have to search. Get by ID returns immediately, even if search isn't ready, therefore we have to search
+// */
+//
+// final SearchRequestBuilder builder =
+// esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
+//
+// //set our filter for entityId fieldname
+// .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
+//
+//
+// //start our processing immediately
+// final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
+// final Observable<IndexRefreshCommandInfo> infoObservable = Observable
+// .range(0, indexFig.maxRefreshSearches())
+// .map(i ->
+// {
+// try {
+// return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
+// } catch (Exception ee) {
+// logger.error("Failed during refresh search for " + uuid, ee);
+// throw new RuntimeException("Failed during refresh search for " + uuid, ee);
+// }
+// })
+// .takeWhile(info -> info.hasFinished())
+// .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
+//
+// final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
+// combined.toBlocking().last();
+//
+// final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
+//
+// return info;
+//
+// },rxTaskScheduler.getAsyncIOScheduler()).call();
+//
+//
+// return future.doOnNext(found -> {
+// if (!found.hasFinished()) {
+// logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+// } else {
+// logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+// }
+// }).doOnCompleted(() -> {
+// //clean up our data
+// String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
+// DeIndexOperation deIndexRequest =
+// new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+//
+// //delete the item
+// IndexOperationMessage indexOperationMessage =
+// new IndexOperationMessage();
+// indexOperationMessage.addDeIndexRequest(deIndexRequest);
+// producer.put(indexOperationMessage);
+//
+// refreshTimer.stop();
+// });
+ }
private Observable<Boolean> refresh(final String[] indexes) {
- return Observable.create(subscriber -> {
- try {
+ return Observable.create(subscriber -> {
+ try {
- if (indexes.length == 0) {
- logger.debug("Not refreshing indexes. none found");
- }
- //Added For Graphite Metrics
- RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
- int failedShards = response.getFailedShards();
- int successfulShards = response.getSuccessfulShards();
- ShardOperationFailedException[] sfes = response.getShardFailures();
- if (sfes != null) {
- for (ShardOperationFailedException sfe : sfes) {
- logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+ if (indexes.length == 0) {
+ logger.debug("Not refreshing indexes. none found");
}
- }
- logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
+ //Added For Graphite Metrics
+ RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+ int failedShards = response.getFailedShards();
+ int successfulShards = response.getSuccessfulShards();
+ ShardOperationFailedException[] sfes = response.getShardFailures();
+ if (sfes != null) {
+ for (ShardOperationFailedException sfe : sfes) {
+ logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+ }
+ }
+ logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
- } catch (IndexMissingException e) {
- logger.error("Unable to refresh index. Waiting before sleeping.", e);
- throw e;
- }
- subscriber.onNext(true);
- subscriber.onCompleted();
- });
+ } catch (IndexMissingException e) {
+ logger.error("Unable to refresh index. Waiting before sleeping.", e);
+ throw e;
+ }
+ subscriber.onNext(true);
+ subscriber.onCompleted();
+ });
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index fee5917..41795ad 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -284,7 +284,7 @@ public class EntityIndexTest extends BaseIT {
@Test
public void testDeleteWithAlias() throws IOException {
- Id appId = new SimpleId( "application" );
+ Id appId = new SimpleId(UUID.randomUUID(), "application" );
ApplicationScope applicationScope = new ApplicationScopeImpl( appId );