You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/01 02:09:06 UTC

[5/7] incubator-usergrid git commit: moving back to refresh its faster

moving back to refresh its faster


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ac6b394c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ac6b394c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ac6b394c

Branch: refs/heads/USERGRID-587
Commit: ac6b394c5059cee933a2ee7ea5dd19592338195a
Parents: 5b8a4de
Author: Shawn Feldman <sf...@apache.org>
Authored: Thu Apr 30 13:12:11 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Thu Apr 30 13:12:11 2015 -0600

----------------------------------------------------------------------
 .../index/impl/IndexRefreshCommandImpl.java     | 230 ++++++++++---------
 .../persistence/index/impl/EntityIndexTest.java |   2 +-
 2 files changed, 127 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
index 888a805..c8e96db 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexRefreshCommandImpl.java
@@ -92,121 +92,143 @@ public class IndexRefreshCommandImpl implements IndexRefreshCommand {
 
 
     @Override
-    public synchronized Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
-        final long start = System.currentTimeMillis();
-
+    public  Observable<IndexRefreshCommandInfo> execute(String[] indexes) {
         Timer.Context refreshTimer = timer.time();
-        //id to hunt for
-        final UUID uuid = UUIDUtils.newTimeUUID();
-
-        final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
-        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
-        final Id appId = new SimpleId( "ug_refresh_index" );
-        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
-        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
-
-
-        final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
-
-        final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
-
-        final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
-
-        //add a tracer record
-        IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
-
-        //save the item
-        final IndexOperationMessage message = new IndexOperationMessage();
-        message.addIndexRequest( indexRequest );
-        final Observable addRecord = producer.put(message);
-        final Observable refresh = refresh(indexes);
-
-        /**
-         * We have to search.  Get by ID returns immediately, even if search isn't ready, therefore we have to search
-         */
-
-        final SearchRequestBuilder builder =
-            esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
-
-                //set our filter for entityId fieldname
-        .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
-
+        final long start = System.currentTimeMillis();
 
-        //start our processing immediately
         final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
-            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
-                .range(0, indexFig.maxRefreshSearches())
-                .map(i ->
-                {
-                    try {
-                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
-                    } catch (Exception ee) {
-                        logger.error("Failed during refresh search for " + uuid, ee);
-                        throw new RuntimeException("Failed during refresh search for " + uuid, ee);
-                    }
-                })
-                .takeWhile(info -> info.hasFinished())
-                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS );
-
-            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
-            combined.toBlocking().last();
-
-            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
-
+            Boolean worked = refresh(indexes).toBlocking().last();
+            final IndexRefreshCommandInfo info = new IndexRefreshCommandInfo(worked, System.currentTimeMillis() - start);
             return info;
 
-        },rxTaskScheduler.getAsyncIOScheduler()).call();
-
+        }, rxTaskScheduler.getAsyncIOScheduler()).call();
+        return future.doOnNext(found -> {
+            if (!found.hasFinished()) {
+                logger.error("Couldn't find record during refresh  took ms:{} ", found.getExecutionTime());
+            } else {
+                logger.info("found record during refresh  took ms:{} ", found.getExecutionTime());
+            }
+        }).doOnCompleted(() -> {
+            refreshTimer.stop();
+        });
+    }
 
-            return future.doOnNext(found -> {
-                if (!found.hasFinished()) {
-                    logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
-                } else {
-                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
-                }
-            }).doOnCompleted(() -> {
-                //clean up our data
-                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
-                DeIndexOperation deIndexRequest =
-                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
-
-                //delete the item
-                IndexOperationMessage indexOperationMessage =
-                    new IndexOperationMessage();
-                indexOperationMessage.addDeIndexRequest(deIndexRequest);
-                producer.put(indexOperationMessage);
-
-                refreshTimer.stop();
-            });
-        }
+    private void insertRecord(){
+
+//        final long start = System.currentTimeMillis();
+//
+//        Timer.Context refreshTimer = timer.time();
+//        //id to hunt for
+//        final UUID uuid = UUIDUtils.newTimeUUID();
+//
+//        final Entity entity = new Entity( new SimpleId( uuid, "ug_refresh_index_type" ) );
+//        EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+//        final Id appId = new SimpleId( "ug_refresh_index" );
+//        final ApplicationScope appScope = new ApplicationScopeImpl( appId );
+//        final IndexEdge edge = new IndexEdgeImpl( appId, "refresh", SearchEdge.NodeType.SOURCE, uuid.timestamp() );
+//
+//
+//        final String docId = IndexingUtils.createIndexDocId( appScope, entity, edge );
+//
+//        final Map<String, Object> entityData = EntityToMapConverter.convert( appScope, edge, entity );
+//
+//        final String entityId = entityData.get( IndexingUtils.ENTITY_ID_FIELDNAME ).toString();
+//
+//        //add a tracer record
+//        IndexOperation indexRequest = new IndexOperation( alias.getWriteAlias(), docId, entityData );
+//
+//        //save the item
+//        final IndexOperationMessage message = new IndexOperationMessage();
+//        message.addIndexRequest( indexRequest );
+//        final Observable addRecord = producer.put(message);
+//        final Observable refresh = refresh(indexes);
+//
+//        /**
+//         * We have to search.  Get by ID returns immediately, even if search isn't ready, therefore we have to search
+//         */
+//
+//        final SearchRequestBuilder builder =
+//            esProvider.getClient().prepareSearch(alias.getReadAlias()).setTypes(IndexingUtils.ES_ENTITY_TYPE)
+//
+//                //set our filter for entityId fieldname
+//        .setPostFilter(FilterBuilders.termFilter(IndexingUtils.ENTITY_ID_FIELDNAME, entityId));
+//
+//
+//        //start our processing immediately
+//        final Observable<IndexRefreshCommandInfo> future = Async.toAsync(() -> {
+//            final Observable<IndexRefreshCommandInfo> infoObservable = Observable
+//                .range(0, indexFig.maxRefreshSearches())
+//                .map(i ->
+//                {
+//                    try {
+//                        return new IndexRefreshCommandInfo(builder.execute().get().getHits().totalHits() > 0, System.currentTimeMillis() - start);
+//                    } catch (Exception ee) {
+//                        logger.error("Failed during refresh search for " + uuid, ee);
+//                        throw new RuntimeException("Failed during refresh search for " + uuid, ee);
+//                    }
+//                })
+//                .takeWhile(info -> info.hasFinished())
+//                .takeLast( indexFig.refreshWaitTime(), TimeUnit.MILLISECONDS);
+//
+//            final Observable<Boolean> combined = Observable.concat(addRecord, refresh);
+//            combined.toBlocking().last();
+//
+//            final IndexRefreshCommandInfo info = infoObservable.toBlocking().last();
+//
+//            return info;
+//
+//        },rxTaskScheduler.getAsyncIOScheduler()).call();
+//
+//
+//            return future.doOnNext(found -> {
+//                if (!found.hasFinished()) {
+//                    logger.error("Couldn't find record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+//                } else {
+//                    logger.info("found record during refresh uuid: {} took ms:{} ", uuid, found.getExecutionTime());
+//                }
+//            }).doOnCompleted(() -> {
+//                //clean up our data
+//                String[] aliases = indexCache.getIndexes(alias, AliasedEntityIndex.AliasType.Read);
+//                DeIndexOperation deIndexRequest =
+//                    new DeIndexOperation(aliases, appScope, edge, entity.getId(), entity.getVersion());
+//
+//                //delete the item
+//                IndexOperationMessage indexOperationMessage =
+//                    new IndexOperationMessage();
+//                indexOperationMessage.addDeIndexRequest(deIndexRequest);
+//                producer.put(indexOperationMessage);
+//
+//                refreshTimer.stop();
+//            });
+    }
 
         private Observable<Boolean> refresh(final String[] indexes) {
 
-        return Observable.create(subscriber -> {
-            try {
+            return Observable.create(subscriber -> {
+                try {
 
-                if (indexes.length == 0) {
-                    logger.debug("Not refreshing indexes. none found");
-                }
-                //Added For Graphite Metrics
-                RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
-                int failedShards = response.getFailedShards();
-                int successfulShards = response.getSuccessfulShards();
-                ShardOperationFailedException[] sfes = response.getShardFailures();
-                if (sfes != null) {
-                    for (ShardOperationFailedException sfe : sfes) {
-                        logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+                    if (indexes.length == 0) {
+                        logger.debug("Not refreshing indexes. none found");
                     }
-                }
-                logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
+                    //Added For Graphite Metrics
+                    RefreshResponse response = esProvider.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+                    int failedShards = response.getFailedShards();
+                    int successfulShards = response.getSuccessfulShards();
+                    ShardOperationFailedException[] sfes = response.getShardFailures();
+                    if (sfes != null) {
+                        for (ShardOperationFailedException sfe : sfes) {
+                            logger.error("Failed to refresh index:{} reason:{}", sfe.index(), sfe.reason());
+                        }
+                    }
+                    logger.debug("Refreshed indexes: {},success:{} failed:{} ", StringUtils.join(indexes, ", "), successfulShards, failedShards);
 
-            } catch (IndexMissingException e) {
-                logger.error("Unable to refresh index. Waiting before sleeping.", e);
-                throw e;
-            }
-            subscriber.onNext(true);
-            subscriber.onCompleted();
-        });
+                } catch (IndexMissingException e) {
+                    logger.error("Unable to refresh index. Waiting before sleeping.", e);
+                    throw e;
+                }
+                subscriber.onNext(true);
+                subscriber.onCompleted();
+            });
 
-    }
+        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ac6b394c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index fee5917..41795ad 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -284,7 +284,7 @@ public class EntityIndexTest extends BaseIT {
 
     @Test
     public void testDeleteWithAlias() throws IOException {
-        Id appId = new SimpleId( "application" );
+        Id appId = new SimpleId(UUID.randomUUID(), "application" );
 
         ApplicationScope applicationScope = new ApplicationScopeImpl( appId );