You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/06 15:01:28 UTC
[06/11] usergrid git commit: Fix re-index memory leak with flatmap
observable and speed up re-index.
Fix re-index memory leak with flatmap observable and speed up re-index.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/ef8899a1
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/ef8899a1
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/ef8899a1
Branch: refs/heads/usergrid-1318-queue
Commit: ef8899a100b8488d4dfd528ce94a1cb8bea582fe
Parents: 33319f3
Author: Michael Russo <mr...@apigee.com>
Authored: Fri Sep 30 18:14:37 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Fri Sep 30 18:14:37 2016 -0700
----------------------------------------------------------------------
.../asyncevents/AsyncEventServiceImpl.java | 20 ++++-------
.../index/IndexProcessorFig.java | 2 +-
.../index/ReIndexServiceImpl.java | 37 +++++++++-----------
.../EntityCollectionManagerFactoryImpl.java | 2 ++
4 files changed, 25 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 0bff887..a108e40 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -876,23 +876,15 @@ public class AsyncEventServiceImpl implements AsyncEventService {
public void indexBatch(final List<EdgeScope> edges, final long updatedSince) {
- IndexOperationMessage batch = new IndexOperationMessage();
+ final List<EntityIndexEvent> batch = new ArrayList<>();
+ edges.forEach(e -> {
- for ( EdgeScope e : edges){
+ //change to id scope to avoid serialization issues
+ batch.add(new EntityIndexEvent(queueFig.getPrimaryRegion(), new EntityIdScope(e.getApplicationScope(), e.getEdge().getTargetNode()), updatedSince));
- EntityIndexOperation entityIndexOperation =
- new EntityIndexOperation( e.getApplicationScope(), e.getEdge().getTargetNode(), updatedSince);
-
- IndexOperationMessage indexOperationMessage =
- eventBuilder.buildEntityIndex( entityIndexOperation ).toBlocking().lastOrDefault(null);
-
- if (indexOperationMessage != null){
- batch.ingest(indexOperationMessage);
- }
-
- }
+ });
- queueIndexOperationMessage(batch);
+ offerBatch( batch );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index c05c047..1038408 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -90,7 +90,7 @@ public interface IndexProcessorFig extends GuicyFig {
@Key(ELASTICSEARCH_QUEUE_IMPL)
String getQueueImplementation();
- @Default("100")
+ @Default("500")
@Key(REINDEX_BUFFER_SIZE)
int getReindexBufferSize();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index e3b179d..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -24,6 +24,7 @@ import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -113,6 +114,8 @@ public class ReIndexServiceImpl implements ReIndexService {
//load our last emitted Scope if a cursor is present
+ final AtomicInteger count = new AtomicInteger();
+
final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
@@ -161,29 +164,21 @@ public class ReIndexServiceImpl implements ReIndexService {
}
- Observable<List<EdgeScope>> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes,
reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
-
.buffer( indexProcessorFig.getReindexBufferSize())
- .flatMap( edgeScopes -> Observable.just(edgeScopes)
- .doOnNext(edges -> {
-
- logger.info("Sending batch of {} to be indexed.", edges.size());
- indexService.indexBatch(edges, modifiedSince);
- })
- .subscribeOn( Schedulers.io() ), indexProcessorFig.getReindexConcurrencyFactor());
-
-
- // start our sampler and state persistence
- // take a sample every sample interval to allow us to resume state with minimal loss
- // create our flushing collector and flush the edge scopes to it
- runningReIndex.collect(() -> new FlushingCollector(jobId),
- ((flushingCollector, edgeScopes) -> flushingCollector.flushBuffer(edgeScopes)))
- .doOnNext( flushingCollector-> flushingCollector.complete() )
- //subscribe on our I/O scheduler and run the task
- .subscribeOn( Schedulers.io() ).subscribe(); //want reindex to continually run so leave subscribe.
-
-
+ .doOnNext( edgeScopes -> {
+ logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
+ indexService.indexBatch(edgeScopes, modifiedSince);
+ count.addAndGet(edgeScopes.size() );
+ if( edgeScopes.size() > 0 ) {
+ writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
+ }
+ writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); })
+ .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
+ .subscribeOn( Schedulers.io() ).subscribe();
+
+
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/ef8899a1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
index fcaa51d..aa962dd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerFactoryImpl.java
@@ -44,6 +44,8 @@ import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;