You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/10/01 01:15:43 UTC

[6/7] usergrid git commit: Fix re-index memory leak with flatmap observable and speed up re-index.

Fix re-index memory leak with flatmap observable and speed up re-index.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ef8899a1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ef8899a1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ef8899a1

Branch: refs/heads/master
Commit: ef8899a100b8488d4dfd528ce94a1cb8bea582fe
Parents: 33319f3
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:14:37 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:14:37 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      | 20 ++++-------
 .../index/IndexProcessorFig.java                |  2 +-
 .../index/ReIndexServiceImpl.java               | 37 +++++++++-----------
 .../EntityCollectionManagerFactoryImpl.java     |  2 ++
 4 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..a108e40 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -876,23 +876,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
 
     public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
 
-        IndexOperationMessage batch = new IndexOperationMessage();
+        final List<EntityIndexEvent> batch = new ArrayList<>();
+        edges.forEach(e -> {
 
-        for ( EdgeScope e : edges){
+            //change to id scope to avoid serialization issues
+            batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
 
-            EntityIndexOperation entityIndexOperation =
-                new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
-
-            IndexOperationMessage indexOperationMessage =
-                eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
-
-            if (indexOperationMessage != null){
-                batch.ingest(indexOperationMessage);
-            }
-
-        }
+        });
 
-        queueIndexOperationMessage(batch);
+        offerBatch( batch );
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c05c047..1038408 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,7 +90,7 @@ public interface IndexProcessorFig extends GuicyFig {
     @Key(ELASTICSEARCH_QUEUE_IMPL)
     String getQueueImplementation();
 
-    @Default("100")
+    @Default("500")
     @Key(REINDEX_BUFFER_SIZE)
     int getReindexBufferSize();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index e3b179d..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -24,6 +24,7 @@ import java.time.Instant;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -113,6 +114,8 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         //load our last emitted Scope if a cursor is present
 
+        final AtomicInteger count = new AtomicInteger();
+
         final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
 
 
@@ -161,29 +164,21 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         }
 
-        Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+        allEntityIdsObservable.getEdgesToEntities( applicationScopes,
             reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-
             .buffer( indexProcessorFig.getReindexBufferSize())
-            .flatMap( edgeScopes -> Observable.just(edgeScopes)
-                .doOnNext(edges -> {
-
-                    logger.info("Sending batch of {} to be indexed.", edges.size());
-                    indexService.indexBatch(edges, modifiedSince);
-                })
-                .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor());
-
-
-        // start our sampler and state persistence
-        // take a sample every sample interval to allow us to resume state with minimal loss
-        // create our flushing collector and flush the edge scopes to it
-        runningReIndex.collect(() -> new FlushingCollector(jobId),
-            ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes)))
-                .doOnNext( flushingCollector-> flushingCollector.complete() )
-                //subscribe on our I/O scheduler and run the task
-                .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
-
-
+            .doOnNext( edgeScopes -> {
+                logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
+                indexService.indexBatch(edgeScopes, modifiedSince);
+                count.addAndGet(edgeScopes.size() );
+                if( edgeScopes.size() > 0 ) {
+                    writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+                }
+                writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+            .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+            .subscribeOn( Schedulers.io() ).subscribe();
+
+        
         return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fcaa51d..aa962dd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -44,6 +44,8 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutionException;