You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/03/03 21:01:00 UTC
[21/37] incubator-usergrid git commit: add batching
add batching
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6276726c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6276726c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6276726c
Branch: refs/heads/USERGRID-422
Commit: 6276726cb20c8dd581b49a453b244dd38bc6b68c
Parents: afd22eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Wed Feb 25 10:03:28 2015 -0700
Committer: Shawn Feldman <sf...@apache.org>
Committed: Wed Feb 25 10:03:28 2015 -0700
----------------------------------------------------------------------
.../index/impl/IndexBatchBufferImpl.java | 2 +-
.../persistence/index/impl/EntityIndexTest.java | 30 +++++++++++++-------
2 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6276726c/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
index b5d9528..73af70f 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexBatchBufferImpl.java
@@ -122,7 +122,7 @@ public class IndexBatchBufferImpl implements IndexBatchBuffer {
}
final AtomicBoolean isForceRefresh = new AtomicBoolean(config.isForcedRefresh());
- //clear the queue or proceed to buffersize
+ //clear the queue or proceed to buffer size
Observable.from(containers)
.subscribeOn(Schedulers.io())
.flatMap(new Func1<RequestBuilderContainer, Observable<ShardReplicationOperationRequestBuilder>>() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6276726c/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
index e820ce1..918ad35 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/EntityIndexTest.java
@@ -24,6 +24,7 @@ import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.usergrid.persistence.core.future.BetterFuture;
import org.apache.usergrid.persistence.index.*;
import org.apache.usergrid.persistence.index.query.CandidateResult;
import org.apache.usergrid.persistence.index.utils.IndexValidationUtils;
@@ -95,18 +96,23 @@ public class EntityIndexTest extends BaseIT {
long now = System.currentTimeMillis();
final int threads = 1000;
+ final int size = 50;
final EntityIndex entityIndex = eif.createEntityIndex( applicationScope );
final IndexScope indexScope = new IndexScopeImpl(appId, "things");
final String entityType = "thing";
entityIndex.initializeIndex();
final CountDownLatch latch = new CountDownLatch(threads);
final AtomicLong failTime=new AtomicLong(0);
+ InputStream is = this.getClass().getResourceAsStream( "/sample-large.json" );
+ ObjectMapper mapper = new ObjectMapper();
+ final List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
for(int i=0;i<threads;i++) {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
- insertJsonBlob(entityIndex, entityType, indexScope, "/sample-small.json", 1, 0);
- entityIndex.refresh();
+ EntityIndexBatch batch = entityIndex.createBatch();
+ insertJsonBlob(sampleJson,batch, entityType, indexScope, size, 0);
+ batch.execute().get();
} catch (Exception e) {
synchronized (failTime) {
if (failTime.get() == 0) {
@@ -127,7 +133,7 @@ public class EntityIndexTest extends BaseIT {
}catch (InterruptedException ie){
throw new RuntimeException(ie);
}
- assertTrue("system must have failed at "+(failTime.get() - now) ,failTime.get()==0);
+ assertTrue("system must have failed at " + (failTime.get() - now), failTime.get() == 0);
}
@Test
@@ -210,17 +216,20 @@ public class EntityIndexTest extends BaseIT {
testQuery(indexScope, searchTypes, entityIndex, "name = 'Bowers Oneil'", 0);
}
-
private void insertJsonBlob(EntityIndex entityIndex, String entityType, IndexScope indexScope, String filePath,final int max,final int startIndex) throws IOException {
InputStream is = this.getClass().getResourceAsStream( filePath );
ObjectMapper mapper = new ObjectMapper();
List<Object> sampleJson = mapper.readValue( is, new TypeReference<List<Object>>() {} );
+ EntityIndexBatch batch = entityIndex.createBatch();
+ insertJsonBlob(sampleJson,batch, entityType, indexScope, max,startIndex);
+ batch.executeAndRefresh().get();
+ }
+ private void insertJsonBlob(List<Object> sampleJson, EntityIndexBatch batch, String entityType, IndexScope indexScope,final int max,final int startIndex) throws IOException {
int count = 0;
StopWatch timer = new StopWatch();
timer.start();
- final EntityIndexBatch batch = entityIndex.createBatch();
if(startIndex > 0){
for(int i =0; i<startIndex;i++){
@@ -246,10 +255,9 @@ public class EntityIndexTest extends BaseIT {
}
}
- batch.executeAndRefresh().get();
timer.stop();
- log.info( "Total time to index {} entries {}ms, average {}ms/entry",
- new Object[] { count, timer.getTime(), timer.getTime() / count } );
+ log.info("Total time to index {} entries {}ms, average {}ms/entry",
+ new Object[]{count, timer.getTime(), timer.getTime() / count } );
}
@@ -428,7 +436,7 @@ public class EntityIndexTest extends BaseIT {
user.setField( new StringField( "address2", "apt 508" ) );
batch.index( indexScope, user );
user.setField( new StringField( "address3", "apt 508" ) );
- batch.index( indexScope, user );
+ batch.index( indexScope, user);
batch.executeAndRefresh().get();
CandidateResults results = entityIndex.getEntityVersions(indexScope, user.getId() );
@@ -467,7 +475,7 @@ public class EntityIndexTest extends BaseIT {
EntityIndexBatch batch = ei.createBatch();
- batch.index( appScope, user );
+ batch.index( appScope, user);
batch.executeAndRefresh().get();
Query query = new Query();
@@ -534,7 +542,7 @@ public class EntityIndexTest extends BaseIT {
Entity fred = EntityIndexMapUtils.fromMap( fredMap );
EntityUtils.setId( fred, new SimpleId( UUIDGenerator.newTimeUUID(), "user" ) );
EntityUtils.setVersion( fred, UUIDGenerator.newTimeUUID() );
- batch.index( appScope, fred );
+ batch.index( appScope, fred);
batch.executeAndRefresh().get();