You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 01:22:05 UTC
[17/27] incubator-usergrid git commit: undo change
undo change
remove synchronous calls
new promises structure, remove executeandrefresh
add refresh
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/090902a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/090902a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/090902a1
Branch: refs/heads/USERGRID-280
Commit: 090902a10638b2a6b0c449a06b1316c055039a2a
Parents: 649a50e
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 11:50:54 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 18:38:01 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 3 +-
.../corepersistence/CpRelationManager.java | 7 +--
.../batch/job/AbstractSchedulerRuntimeIT.java | 2 +-
.../usergrid/persistence/CollectionIT.java | 2 +-
.../persistence/index/EntityIndexBatch.java | 5 --
.../persistence/index/IndexBatchBuffer.java | 10 +--
.../usergrid/persistence/index/IndexFig.java | 4 +-
.../index/IndexOperationMessage.java | 57 +++++++++++++++++
.../index/RequestBuilderContainer.java | 65 --------------------
.../index/impl/EsEntityIndexBatchImpl.java | 28 ++-------
.../index/impl/EsEntityIndexImpl.java | 10 ++-
.../index/impl/IndexBatchBufferImpl.java | 37 +++++------
.../impl/EntityConnectionIndexImplTest.java | 7 ++-
.../persistence/index/impl/EntityIndexTest.java | 26 +++++---
14 files changed, 112 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index 7905c43..8f432a2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -980,7 +980,6 @@ public class CpEntityManager implements EntityManager {
// update in all containing collections and connection indexes
CpRelationManager rm = ( CpRelationManager ) getRelationManager( entityRef );
rm.updateContainingCollectionAndCollectionIndexes( cpEntity );
- future.get();
}
@@ -2830,7 +2829,7 @@ public class CpEntityManager implements EntityManager {
//
// batch.index(appAllTypesScope, memberEntity);
- batch.execute().get();
+ batch.execute();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 07fc45e..302f18d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -442,7 +442,7 @@ public class CpRelationManager implements RelationManager {
} ).count().toBlocking().lastOrDefault( 0 );
- entityIndexBatch.execute().get();
+ entityIndexBatch.execute();
logger.debug( "updateContainingCollectionsAndCollections() updated {} indexes", count );
}
@@ -871,7 +871,6 @@ public class CpRelationManager implements RelationManager {
}
}
}
- future.get();
}
@Override
@@ -1068,8 +1067,6 @@ public class CpRelationManager implements RelationManager {
batchUpdateEntityConnection( m, false, connection, UUIDGenerator.newTimeUUID() );
batchExecute( m, CassandraService.RETRY_COUNT );
- future.get();
-
return connection;
}
@@ -1294,7 +1291,7 @@ public class CpRelationManager implements RelationManager {
//
// batch.deindex( allTypesIndexScope, targetEntity );
- batch.execute().get();
+ batch.execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
index 81836ae..1a15d92 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/AbstractSchedulerRuntimeIT.java
@@ -96,7 +96,7 @@ public class AbstractSchedulerRuntimeIT {
JobSchedulerService jobScheduler = springResource.getBean( JobSchedulerService.class );
jobScheduler.setJobListener( listener );
if ( jobScheduler.state() != State.RUNNING ) {
-// jobScheduler.startAndWait();
+ jobScheduler.startAndWait();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
index 1881fff..641d3ad 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CollectionIT.java
@@ -911,7 +911,7 @@ public class CollectionIT extends AbstractCoreIT {
properties.put( "keywords", "Action, New" );
Entity thirdGame = em.create( "game", properties );
- em.refreshIndex();
+ em.refreshIndex();//need to track all batches then resolve promises
Query query = Query.fromQL( "select * where keywords contains 'new' and title contains 'extreme'" );
Results r = em.searchCollection( em.getApplicationRef(), "games", query );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index bf606bc..580a7f4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -68,9 +68,4 @@ public interface EntityIndexBatch {
*/
public BetterFuture execute();
- /**
- * Execute the batch and force the refresh
- * @return future to guarantee execution
- */
- public BetterFuture executeAndRefresh();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
index 1d71bcd..d1015a0 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexBatchBuffer.java
@@ -17,14 +17,6 @@
package org.apache.usergrid.persistence.index;
import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.index.impl.IndexBatchBufferImpl;
-import org.elasticsearch.action.delete.DeleteRequestBuilder;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import rx.Observable;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
/**
* * Buffer index requests into sets to send,
@@ -36,7 +28,7 @@ public interface IndexBatchBuffer {
*
* @param container
*/
- public BetterFuture put(RequestBuilderContainer container);
+ public BetterFuture put(IndexOperationMessage container);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
index 7434b99..8040a62 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexFig.java
@@ -133,7 +133,7 @@ public interface IndexFig extends GuicyFig {
* size of the buffer to build up before you send results
* @return
*/
- @Default("300")
+ @Default("1000")
@Key( INDEX_BUFFER_SIZE )
int getIndexBufferSize();
@@ -141,7 +141,7 @@ public interface IndexFig extends GuicyFig {
* Request batch size for ES
* @return
*/
- @Default("100")
+ @Default("1000")
@Key( INDEX_BATCH_SIZE)
int getIndexBatchSize();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
new file mode 100644
index 0000000..5acb17e
--- /dev/null
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexOperationMessage.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.persistence.index;
+
+import org.apache.usergrid.persistence.core.future.BetterFuture;
+import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
+
+import java.util.Iterator;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Classy class class.
+ */
+public class IndexOperationMessage {
+ private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
+ private final BetterFuture<IndexOperationMessage> containerFuture;
+
+ public IndexOperationMessage(){
+ final IndexOperationMessage parent = this;
+ builders = new ConcurrentLinkedQueue<>();
+ this.containerFuture = new BetterFuture<>(new Callable<IndexOperationMessage>() {
+ @Override
+ public IndexOperationMessage call() throws Exception {
+ return parent;
+ }
+ });
+ }
+
+ public void add(ShardReplicationOperationRequestBuilder builder){
+ builders.add(builder);
+ }
+ public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
+ return builders;
+ }
+ public void done(){
+ containerFuture.done();
+ }
+ public BetterFuture<IndexOperationMessage> getFuture(){
+ return containerFuture;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
deleted file mode 100644
index e7928ff..0000000
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/RequestBuilderContainer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.persistence.index;
-
-import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
-
-import java.util.Iterator;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Classy class class.
- */
-public class RequestBuilderContainer{
- private final ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> builders;
- private final BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> containerFuture;
-
- private boolean forceRefresh = false;
-
- public RequestBuilderContainer(){
- final RequestBuilderContainer parent = this;
- builders = new ConcurrentLinkedQueue<>();
- this.containerFuture = new BetterFuture<>(new Callable<Iterator<ShardReplicationOperationRequestBuilder>>() {
- @Override
- public Iterator<ShardReplicationOperationRequestBuilder> call() throws Exception {
- return parent.getBuilder().iterator();
- }
- });
- }
-
- public void add(ShardReplicationOperationRequestBuilder builder){
- builders.add(builder);
- }
- public ConcurrentLinkedQueue<ShardReplicationOperationRequestBuilder> getBuilder(){
- return builders;
- }
- public void done(){
- containerFuture.done();
- }
- public BetterFuture<Iterator<ShardReplicationOperationRequestBuilder>> getFuture(){
- return containerFuture;
- }
-
- public boolean isForceRefresh() {
- return forceRefresh;
- }
- public void setForceRefresh(boolean forceRefresh) {
- this.forceRefresh = forceRefresh;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index c0ddfdc..455dba6 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -19,16 +19,9 @@
package org.apache.usergrid.persistence.index.impl;
import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.util.concurrent.Futures;
import org.apache.usergrid.persistence.core.future.BetterFuture;
-import org.apache.usergrid.persistence.core.rx.ObservableIterator;
import org.apache.usergrid.persistence.index.*;
-import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Client;
@@ -56,11 +49,8 @@ import org.apache.usergrid.persistence.model.field.StringField;
import org.apache.usergrid.persistence.model.field.UUIDField;
import org.apache.usergrid.persistence.model.field.value.EntityObject;
-import com.google.common.base.Joiner;
import rx.Observable;
-import rx.functions.Action1;
import rx.functions.Func1;
-import rx.schedulers.Schedulers;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.ANALYZED_STRING_PREFIX;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.BOOLEAN_PREFIX;
@@ -71,7 +61,6 @@ import static org.apache.usergrid.persistence.index.impl.IndexingUtils.NUMBER_PR
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.STRING_PREFIX;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createIndexDocId;
-import static org.apache.usergrid.persistence.index.impl.IndexingUtils.createContextName;
public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@@ -90,7 +79,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
private final IndexBatchBuffer indexBatchBuffer;
private final AliasedEntityIndex entityIndex;
- private RequestBuilderContainer container;
+ private IndexOperationMessage container;
public EsEntityIndexBatchImpl(final ApplicationScope applicationScope, final Client client,final IndexBatchBuffer indexBatchBuffer,
@@ -104,7 +93,7 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
this.alias = indexIdentifier.getAlias();
this.refresh = config.isForcedRefresh();
//constrained
- this.container = new RequestBuilderContainer();
+ this.container = new IndexOperationMessage();
}
@@ -213,16 +202,8 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
@Override
public BetterFuture execute() {
- RequestBuilderContainer tempContainer = container;
- container = new RequestBuilderContainer();
- return indexBatchBuffer.put(tempContainer);
- }
-
- @Override
- public BetterFuture executeAndRefresh() {
- container.setForceRefresh(true);
- RequestBuilderContainer tempContainer = container;
- container = new RequestBuilderContainer();
+ IndexOperationMessage tempContainer = container;
+ container = new IndexOperationMessage();
return indexBatchBuffer.put(tempContainer);
}
@@ -364,5 +345,4 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return processed;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
index 2881ce4..693b168 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexImpl.java
@@ -23,6 +23,7 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -280,8 +281,9 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
@Override
public EntityIndexBatch createBatch() {
- return new EsEntityIndexBatchImpl(
+ EntityIndexBatch batch = new EsEntityIndexBatchImpl(
applicationScope, esProvider.getClient(),indexBatchBuffer, config, this );
+ return batch;
}
@@ -432,6 +434,10 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
public void refresh() {
+ BetterFuture future = indexBatchBuffer.put(new IndexOperationMessage());
+ future.get();
+ //loop through all batches and retrieve promises and call get
+
final RetryOperation retryOperation = new RetryOperation() {
@Override
public boolean doOp() {
@@ -579,7 +585,7 @@ public class EsEntityIndexImpl implements AliasedEntityIndex {
if ( response.isAcknowledged() ) {
logger.info( "Deleted index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
//invlaidate the alias
- aliasCache.invalidate( alias );
+ aliasCache.invalidate(alias);
}
else {
logger.info( "Failed to delete index: read {} write {}", alias.getReadAlias(), alias.getWriteAlias());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index 73af70f..92ab582 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -24,7 +24,7 @@ import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.index.IndexBatchBuffer;
import org.apache.usergrid.persistence.index.IndexFig;
-import org.apache.usergrid.persistence.index.RequestBuilderContainer;
+import org.apache.usergrid.persistence.index.IndexOperationMessage;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -37,14 +37,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
-import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -64,7 +60,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
private final IndexFig config;
private final Timer flushTimer;
private final Counter bufferCounter;
- private Observable<List<RequestBuilderContainer>> consumer;
+ private Observable<List<IndexOperationMessage>> consumer;
private Producer producer;
@Inject
@@ -84,16 +80,16 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
//batch up sets of some size and send them in batch
this.consumer = Observable.create(producer)
.subscribeOn(Schedulers.io())
- .doOnNext(new Action1<RequestBuilderContainer>() {
+ .doOnNext(new Action1<IndexOperationMessage>() {
@Override
- public void call(RequestBuilderContainer requestBuilderContainer) {
+ public void call(IndexOperationMessage requestBuilderContainer) {
queueSize.addAndGet(requestBuilderContainer.getBuilder().size());
}
})
.buffer(config.getIndexBufferTimeout(), TimeUnit.MILLISECONDS, config.getIndexBufferSize())
- .doOnNext(new Action1<List<RequestBuilderContainer>>() {
+ .doOnNext(new Action1<List<IndexOperationMessage>>() {
@Override
- public void call(List<RequestBuilderContainer> containerList) {
+ public void call(List<IndexOperationMessage> containerList) {
flushTimer.time();
indexSizeCounter.dec(containerList.size());
if(containerList.size()>0){
@@ -105,7 +101,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
@Override
- public BetterFuture put(RequestBuilderContainer container){
+ public BetterFuture put(IndexOperationMessage container){
bufferCounter.inc();
producer.put(container);
return container.getFuture();
@@ -115,7 +111,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
/**
* Execute the request, check for errors, then re-init the batch for future use
*/
- private void execute(final List<RequestBuilderContainer> containers) {
+ private void execute(final List<IndexOperationMessage> containers) {
if (containers == null || containers.size() == 0) {
return;
@@ -125,12 +121,9 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
//clear the queue or proceed to buffer size
Observable.from(containers)
.subscribeOn(Schedulers.io())
- .flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
+ .flatMap(new Func1<IndexOperationMessage, Observable<ShardReplicationOperationRequestBuilder>>() {
@Override
- public Observable<ShardReplicationOperationRequestBuilder> call(RequestBuilderContainer requestBuilderContainer) {
- if (requestBuilderContainer.isForceRefresh()){
- isForceRefresh.set(true);
- }
+ public Observable<ShardReplicationOperationRequestBuilder> call(IndexOperationMessage requestBuilderContainer) {
return Observable.from(requestBuilderContainer.getBuilder())
.map(new Func1<ShardReplicationOperationRequestBuilder, ShardReplicationOperationRequestBuilder>() {
@Override
@@ -157,7 +150,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
}).toBlocking().lastOrDefault(null);
- for (RequestBuilderContainer container : containers) {
+ for (IndexOperationMessage container : containers) {
container.done();
}
}
@@ -196,16 +189,16 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
- private static class Producer implements Observable.OnSubscribe<RequestBuilderContainer> {
+ private static class Producer implements Observable.OnSubscribe<IndexOperationMessage> {
- private Subscriber<? super RequestBuilderContainer> subscriber;
+ private Subscriber<? super IndexOperationMessage> subscriber;
@Override
- public void call(Subscriber<? super RequestBuilderContainer> subscriber) {
+ public void call(Subscriber<? super IndexOperationMessage> subscriber) {
this.subscriber = subscriber;
}
- public void put(RequestBuilderContainer r){
+ public void put(IndexOperationMessage r){
subscriber.onNext(r);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
index c65f106..37b5e90 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityConnectionIndexImplTest.java
@@ -136,7 +136,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.index( searchScope, oj );
batch.index( otherIndexScope, oj );
- batch.executeAndRefresh().get();
+ batch.execute().get();
personLikesIndex.refresh();
@@ -267,7 +267,7 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.index( searchScope, oj );
batch.index( otherIndexScope, oj );
- batch.executeAndRefresh().get();
+ batch.execute().get();
personLikesIndex.refresh();
EsTestUtils.waitForTasks( personLikesIndex );
@@ -287,7 +287,8 @@ public class EntityConnectionIndexImplTest extends BaseIT {
batch.deindex( searchScope, egg );
batch.deindex( searchScope, muffin );
batch.deindex( searchScope, oj );
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ personLikesIndex.refresh();
likes = personLikesIndex.search( searchScope,
SearchTypes.fromTypes( muffin.getId().getType(), egg.getId().getType(), oj.getId().getType() ),
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/090902a1/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index 918ad35..83ba1ec 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -95,8 +95,8 @@ public class EntityIndexTest extends BaseIT {
final ApplicationScope applicationScope = new ApplicationScopeImpl( appId );
long now = System.currentTimeMillis();
- final int threads = 1000;
- final int size = 50;
+ final int threads = 20;
+ final int size = 20;
final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
final IndexScope indexScope = new IndexScopeImpl(appId, "things");
final String entityType = "thing";
@@ -209,7 +209,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch entityIndexBatch = entityIndex.createBatch();
entityIndexBatch.deindex(indexScope, crs.get(0));
entityIndexBatch.deindex(indexScope, crs.get(1));
- entityIndexBatch.executeAndRefresh().get();
+ entityIndexBatch.execute().get();
entityIndex.refresh();
//Hilda Youn
@@ -222,7 +222,8 @@ public class EntityIndexTest extends BaseIT {
List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
EntityIndexBatch batch = entityIndex.createBatch();
insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex);
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ entityIndex.refresh();
}
private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException {
@@ -285,7 +286,8 @@ public class EntityIndexTest extends BaseIT {
EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
entity.setField(new UUIDField(IndexingUtils.ENTITYID_ID_FIELDNAME, UUID.randomUUID()));
- entityIndex.createBatch().index(indexScope , entity ).executeAndRefresh().get();
+ entityIndex.createBatch().index(indexScope , entity ).execute().get();
+ entityIndex.refresh();
CandidateResults candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()),
Query.fromQL( "name contains 'Ferrari*'" ) );
@@ -293,7 +295,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = entityIndex.createBatch();
batch.deindex(indexScope, entity).execute().get();
- batch.executeAndRefresh().get();
+ batch.execute().get();
entityIndex.refresh();
candidateResults = entityIndex.search( indexScope, SearchTypes.fromTypes(entity.getId().getType()), Query.fromQL( "name contains 'Ferrari*'" ) );
@@ -437,7 +439,8 @@ public class EntityIndexTest extends BaseIT {
batch.index( indexScope, user );
user.setField( new StringField( "address3", "apt 508" ) );
batch.index( indexScope, user);
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ entityIndex.refresh();
CandidateResults results = entityIndex.getEntityVersions(indexScope, user.getId() );
@@ -476,7 +479,8 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = ei.createBatch();
batch.index( appScope, user);
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ ei.refresh();
Query query = new Query();
query.addEqualityFilter( "username", "edanuff" );
@@ -484,7 +488,8 @@ public class EntityIndexTest extends BaseIT {
assertEquals( user.getId(), r.get( 0 ).getId() );
batch.deindex(appScope, user.getId(), user.getVersion() );
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ ei.refresh();
// EntityRef
query = new Query();
@@ -544,7 +549,8 @@ public class EntityIndexTest extends BaseIT {
EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
batch.index( appScope, fred);
- batch.executeAndRefresh().get();
+ batch.execute().get();
+ ei.refresh();
final SearchTypes searchTypes = SearchTypes.fromTypes( "user" );