You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/20 22:23:30 UTC
[1/4] incubator-usergrid git commit: Fixed logic issue in GeoIT test.
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-486 dcf469378 -> bab5ba7f0
Fixed logic issue in GeoIT test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/384fc2b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/384fc2b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/384fc2b4
Branch: refs/heads/USERGRID-486
Commit: 384fc2b4de62e31f01b67bc91a9b0d79b9d53868
Parents: dcf4693
Author: GERey <gr...@apigee.com>
Authored: Fri Mar 20 09:49:06 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Fri Mar 20 09:49:06 2015 -0700
----------------------------------------------------------------------
.../core/src/test/java/org/apache/usergrid/persistence/GeoIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/384fc2b4/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
index ff15063..a71cef5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/GeoIT.java
@@ -177,7 +177,7 @@ public class GeoIT extends AbstractCoreIT {
Map<String, Object> restaurantProps = new LinkedHashMap<String, Object>();
restaurantProps.put("name", "Brickhouse");
restaurantProps.put("address", "426 Brannan Street");
- restaurantProps.put("location", getLocation(37.776753, -122.407846));
+ restaurantProps.put("location", getLocation(37.779632, -122.395131));
Entity restaurant = em.create("restaurant", restaurantProps);
assertNotNull(restaurant);
[3/4] incubator-usergrid git commit: Merge branch 'USERGRID-405' of
https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-405
Posted by to...@apache.org.
Merge branch 'USERGRID-405' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-405
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c864ab62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c864ab62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c864ab62
Branch: refs/heads/USERGRID-486
Commit: c864ab62c6665675da45044922405d2613f6c79f
Parents: ceaa796 384fc2b
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 11:31:14 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 11:31:14 2015 -0600
----------------------------------------------------------------------
.../core/src/test/java/org/apache/usergrid/persistence/GeoIT.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
[4/4] incubator-usergrid git commit: Updated tests to also contain
larger body for measuring performance with more accurate entity size
Posted by to...@apache.org.
Updated tests to also contain larger body for measuring performance with more accurate entity size
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/bab5ba7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/bab5ba7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/bab5ba7f
Branch: refs/heads/USERGRID-486
Commit: bab5ba7f018c18aab2ee09b98eeb74c6668a893a
Parents: c864ab6
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 15:23:29 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 15:23:29 2015 -0600
----------------------------------------------------------------------
stack/corepersistence/queryindex/pom.xml | 57 ++--
.../persistence/index/EntityIndexBatch.java | 5 +
.../index/impl/EsEntityIndexBatchImpl.java | 32 +-
.../usergrid/persistence/index/query/Query.java | 6 +-
.../persistence/index/guice/IndexTestFig.java | 7 +-
.../index/impl/IndexLoadTestsIT.java | 317 ++++++++++++++++---
6 files changed, 329 insertions(+), 95 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/pom.xml b/stack/corepersistence/queryindex/pom.xml
index 2dc40ce..04b3b1a 100644
--- a/stack/corepersistence/queryindex/pom.xml
+++ b/stack/corepersistence/queryindex/pom.xml
@@ -45,45 +45,24 @@
</executions>
</plugin>
-<!-- <plugin>
- <groupId>org.safehaus.chop</groupId>
- <artifactId>chop-maven-plugin</artifactId>
- <version>${chop.version}</version>
-
-
- NOTE: you should be putting most of these variables into your settings.xml
- as an automatically activated profile.
-
-
- <configuration>
- <accessKey>${aws.s3.key}</accessKey>
- <secretKey>${aws.s3.secret}</secretKey>
- <availabilityZone>${availabilityZone}</availabilityZone>
- <bucketName>${aws.s3.bucket}</bucketName>
- <managerAppUsername>admin</managerAppUsername>
- <managerAppPassword>${manager.app.password}</managerAppPassword>
- <testPackageBase>org.apache.usergrid</testPackageBase>
- <runnerSSHKeyFile>${runner.ssh.key.file}</runnerSSHKeyFile>
- <failIfCommitNecessary>false</failIfCommitNecessary>
- <amiID>${ami.id}</amiID>
- <instanceType>m1.large</instanceType>
- <resultsDirectory>${resultsDirectory}</resultsDirectory>
- <dumpType>${dumpType}</dumpType>
- <coldRestartTomcat>true</coldRestartTomcat>
- <awsSecurityGroup>${security.group}</awsSecurityGroup>
- <runnerKeyPairName>${runner.keypair.name}</runnerKeyPairName>
- <runnerCount>6</runnerCount>
- <securityGroupExceptions>
-
- Add your own IP address as an exception to allow access
- but please do this in the settings.xml file .. essentially
- all parameters should be in the settings.xml file.
-
- <param>${myip.address}/32:24981</param>
- <param>${myip.address}/32:22</param>
- </securityGroupExceptions>
- </configuration>
- </plugin>-->
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+
+ <configuration>
+ <includes>
+ <include>**/*IT.java</include>
+ <include>**/*Test.java</include>
+ </includes>
+ <!-- run this one manually to stress test -->
+ <excludes>
+ <exclude>**/IndexLoadTestsIT.java</exclude>
+ </excludes>
+
+ </configuration>
+
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
index 580a7f4..77b6e7a 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/EntityIndexBatch.java
@@ -68,4 +68,9 @@ public interface EntityIndexBatch {
*/
public BetterFuture execute();
+ /**
+ * Get the number of operations in the batch
+ * @return
+ */
+ public int size();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
index 92312d2..e9ba09c 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsEntityIndexBatchImpl.java
@@ -204,6 +204,13 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
return indexBatchBufferProducer.put(tempContainer);
}
+
+ @Override
+ public int size() {
+ return container.getDeIndexRequests().size() + container.getIndexRequests().size();
+ }
+
+
/**
* Set the entity as a map with the context
*
@@ -241,23 +248,24 @@ public class EsEntityIndexBatchImpl implements EntityIndexBatch {
Field field = ( Field ) f;
- if ( f instanceof ListField ) {
- List list = ( List ) field.getValue();
- entityMap.put( field.getName().toLowerCase(),
- new ArrayList( processCollectionForMap( list ) ) );
- if ( !list.isEmpty() ) {
- if ( list.get( 0 ) instanceof String ) {
- entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
- new ArrayList( processCollectionForMap( list ) ) );
- }
- }
- }
- else if ( f instanceof ArrayField ) {
+ if ( f instanceof ArrayField ) {
List list = ( List ) field.getValue();
entityMap.put( field.getName().toLowerCase(),
new ArrayList( processCollectionForMap( list ) ) );
}
+ else if ( f instanceof ListField ) {
+ List list = ( List ) field.getValue();
+ entityMap.put( field.getName().toLowerCase(),
+ new ArrayList( processCollectionForMap( list ) ) );
+
+ if ( !list.isEmpty() ) {
+ if ( list.get( 0 ) instanceof String ) {
+ entityMap.put( ANALYZED_STRING_PREFIX + field.getName().toLowerCase(),
+ new ArrayList( processCollectionForMap( list ) ) );
+ }
+ }
+ }
else if ( f instanceof SetField ) {
Set set = ( Set ) field.getValue();
entityMap.put( field.getName().toLowerCase(),
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
index da68772..67a1731 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/query/Query.java
@@ -67,7 +67,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+/**
+ * TODO, this is a copy from 1.0 and a mess. Clean this up to be clearer as we iterate on our refactor of EM/RM
+ * Query should only be used for term querying, not identity of name lookup, that should
+ * come directly from cassandra
+ */
public class Query {
private static final Logger logger = LoggerFactory.getLogger( Query.class );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
index ecf3dfa..5aacef0 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/guice/IndexTestFig.java
@@ -38,7 +38,7 @@ public interface IndexTestFig extends GuicyFig {
@Default( "16" )
public int getNumberOfWorkers();
- @Key( "stresstest.numberofRecords" )
+ @Key( "stresstest.numberOfRecords" )
@Default( "10000" )
public int getNumberOfRecords();
@@ -54,4 +54,9 @@ public interface IndexTestFig extends GuicyFig {
@Key( "stresstest.applicationId" )
@Default( "0df46683-cdab-11e4-83c2-d2be4de3081a" )
public String getApplicationId();
+
+ @Key( "stresstest.readThreads" )
+ @Default( "40" )
+ public int getConcurrentReadThreads();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/bab5ba7f/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
index 82af950..057c472 100644
--- a/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
+++ b/stack/corepersistence/queryindex/src/test/java/org/apache/usergrid/persistence/index/impl/IndexLoadTestsIT.java
@@ -19,45 +19,78 @@
package org.apache.usergrid.persistence.index.impl;
-import java.util.List;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.junit.After;
+import org.junit.Before;
import org.junit.ClassRule;
-import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.core.test.UseModules;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.index.guice.IndexTestFig;
import org.apache.usergrid.persistence.index.guice.TestIndexModule;
+import org.apache.usergrid.persistence.index.query.CandidateResults;
+import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.persistence.model.field.ArrayField;
+import org.apache.usergrid.persistence.model.field.BooleanField;
+import org.apache.usergrid.persistence.model.field.DoubleField;
import org.apache.usergrid.persistence.model.field.IntegerField;
+import org.apache.usergrid.persistence.model.field.StringField;
+import org.apache.usergrid.persistence.model.util.EntityUtils;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Timer;
import com.google.inject.Inject;
import rx.Observable;
-import rx.functions.Action1;
-import rx.functions.Func1;
import rx.schedulers.Schedulers;
+import static org.junit.Assert.assertEquals;
+
/**
- * TODO: make CorePerformanceIT configurable, add CHOP markup.
+ * This is configuration via the properties in the IndexTestFig object. Most of these values you won't need to touch.
+ * To run this against a live cluster. You execute this maven command.
+ *
+ * <command> mvn test -Dtest=IndexLoadTestsIT#testHeavyLoadValidate -Dstresstest.numWorkers=16
+ * -Dstresstest.numberOfRecords=10000 </command>
+ *
+ * This will insert 10000 records for each worker thread. There will be 16 worker threads. Validation will occur after
+ * the wait timeout (stresstest.validate.wait) of 2 seconds. Up to 40 concurrent queries (stresstest.readThreads) will
+ * be executed to validate each result.
+ *
+ * By default this test is excluded from surefire, and will need to be run manually
*/
@RunWith( EsRunner.class )
@UseModules( { TestIndexModule.class } )
-@Ignore( "Should only be run during load tests of elasticsearch" )
public class IndexLoadTestsIT extends BaseIT {
private static final Logger log = LoggerFactory.getLogger( IndexLoadTestsIT.class );
+ public static final String FIELD_WORKER_INDEX = "workerIndex";
+ private static final String FIELD_ORDINAL = "ordinal";
+ private static final String FIELD_UNIQUE_IDENTIFIER = "uniqueIdentifier";
+
+ @Inject
+ @Rule
+ public MigrationManagerRule migrationManagerRule;
@ClassRule
public static ElasticSearchResource es = new ElasticSearchResource();
@@ -69,70 +102,270 @@ public class IndexLoadTestsIT extends BaseIT {
@Inject
public EntityIndexFactory entityIndexFactory;
+ @Inject
+ public MetricsFactory metricsFactory;
+
+ private Meter batchWriteTPS;
+ private Timer batchWriteTimer;
+
+ private Meter queryTps;
+ private Timer queryTimer;
+
+ private Slf4jReporter reporter;
+
+
+ @Before
+ public void setupMeters() {
+ batchWriteTPS = metricsFactory.getMeter( IndexLoadTestsIT.class, "write.tps" );
+
+ batchWriteTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, "write.timer" );
+
+ queryTps = metricsFactory.getMeter( IndexLoadTestsIT.class, "query.tps" );
+
+ queryTimer = metricsFactory.getTimer( IndexLoadTestsIT.class, "query.timer" );
+
+ reporter =
+ Slf4jReporter.forRegistry( metricsFactory.getRegistry() ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+ .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+
+ reporter.start( 30, TimeUnit.SECONDS );
+ }
+
+ @After
+ public void printMetricsBeforeShutdown() {
+ //stop the log reporter and print the last report
+ reporter.stop();
+ reporter.report();
+ }
+
+
+ /**
+ * Perform the following 1, spin up the specified number of workers For each worker, insert the specified number of
+ * elements
+ *
+ * Wait the wait time after buffer execution before beginning validate
+ *
+ * Validate every entity inserted is returned by a search.
+ */
@Test
- public void testHeavyLoad() {
+ public void testHeavyLoadValidate() {
+ final String userAppId = indexTestFig.getApplicationId();
+
+
+ //if it's unset, generate one
+ final String uniqueIdentifier = UUIDGenerator.newTimeUUID().toString();
- final UUID applicationUUID = UUID.fromString( indexTestFig.getApplicationId() );
+ //use the appId supplied, or generate one
+ final UUID applicationUUID = UUID.fromString( userAppId );
final Id applicationId = new SimpleId( applicationUUID, "application" );
final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
final EntityIndex index = entityIndexFactory.createEntityIndex( scope );
+ final IndexScope indexScope = new IndexScopeImpl( applicationId, "test" );
+
//create our index if it doesn't exist
index.initializeIndex();
- final Observable<Entity> createEntities = createStreamFromWorkers( index, applicationId );
- //run them all
- createEntities.toBlocking().last();
+ //delay our verification for indexing to happen
+ final Observable<DataLoadResult> dataLoadResults =
+ createStreamFromWorkers( index, indexScope, uniqueIdentifier ).buffer( indexTestFig.getBufferSize() )
+ //perform a delay to let ES index from our batches
+ .delay( indexTestFig.getValidateWait(), TimeUnit.MILLISECONDS )
+ //do our search in parallel, otherwise this test will take far too long
+ .flatMap( entitiesToValidate -> {
+ return Observable.from( entitiesToValidate ).map( entityObservable -> {
+
+
+ final int workerIndex = ( int ) entityObservable.getField( FIELD_WORKER_INDEX ).getValue();
+ final int ordinal = ( int ) entityObservable.getField( FIELD_ORDINAL ).getValue();
+
+
+ final Timer.Context queryTimerContext = queryTimer.time();
+
+
+ //execute our search
+ final CandidateResults results = index
+ .search( indexScope, SearchTypes.fromTypes( indexScope.getName() ), Query.fromQLNullSafe(
+ "select * where " + FIELD_WORKER_INDEX + " = " + workerIndex + " AND " + FIELD_ORDINAL
+ + " = " + ordinal + " AND " + FIELD_UNIQUE_IDENTIFIER + " = '" + uniqueIdentifier
+ + "'" ) );
+
+ queryTps.mark();
+ queryTimerContext.stop();
+
+ boolean found;
+
+ if ( !results.isEmpty() && results.get( 0 ).getId().equals( entityObservable.getId() ) ) {
+ found = true;
+ }
+ else {
+ found = false;
+ }
+
+ return new EntitySearchResult( entityObservable, found );
+ } ).subscribeOn( Schedulers.io() );
+ }, indexTestFig.getConcurrentReadThreads() )
+
+ //collect all the results into a single data load result
+ .collect( () -> new DataLoadResult(), ( dataloadResult, entitySearchResult ) -> {
+ if ( entitySearchResult.found ) {
+ dataloadResult.success();
+ return;
+ }
+
+ final int ordinal = ( int ) entitySearchResult.searched.getField( FIELD_ORDINAL ).getValue();
+ final int worker = ( int ) entitySearchResult.searched.getField( FIELD_WORKER_INDEX ).getValue();
+
+ dataloadResult.failed();
+
+ log.error(
+ "Could not find entity with worker {}, ordinal {}, and Id {} after waiting {} milliseconds",
+ worker, ordinal, entitySearchResult.searched.getId(), indexTestFig.getValidateWait() );
+ } );
+
+
+ //wait for processing to finish
+ final DataLoadResult result = dataLoadResults.toBlocking().last();
+
+ final long expectedCount = indexTestFig.getNumberOfRecords() * indexTestFig.getNumberOfWorkers();
+
+ assertEquals( "Excepted to have no failures", 0, result.getFailCount() );
+
+ assertEquals( "Excepted to find all records", expectedCount, result.getSuccessCount() );
}
- public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final Id ownerId ) {
+ public Observable<Entity> createStreamFromWorkers( final EntityIndex entityIndex, final IndexScope indexScope,
+ final String uniqueIdentifier ) {
//create a sequence of observables. Each index will be it's own worker thread using the Schedulers.newthread()
return Observable.range( 0, indexTestFig.getNumberOfWorkers() ).flatMap(
- integer -> createWriteObservable( entityIndex, ownerId, integer ).subscribeOn( Schedulers.newThread() ) );
+ integer -> createWriteObservable( entityIndex, indexScope, uniqueIdentifier, integer )
+ .subscribeOn( Schedulers.newThread() ) );
}
- private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final Id ownerId,
- final int workerIndex ) {
-
-
- final IndexScope scope = new IndexScopeImpl( ownerId, "test" );
+ private Observable<Entity> createWriteObservable( final EntityIndex entityIndex, final IndexScope indexScope,
+ final String uniqueIdentifier, final int workerIndex ) {
return Observable.range( 0, indexTestFig.getNumberOfRecords() )
//create our entity
- .map( new Func1<Integer, Entity>() {
- @Override
- public Entity call( final Integer integer ) {
- final Entity entity = new Entity( "test" );
-
- entity.setField( new IntegerField( "workerIndex", workerIndex ) );
- entity.setField( new IntegerField( "ordinal", integer ) );
-
- return entity;
- }
- } ).buffer( indexTestFig.getBufferSize() ).doOnNext( new Action1<List<Entity>>() {
- @Override
- public void call( final List<Entity> entities ) {
- //take our entities and roll them into a batch
- Observable.from( entities )
- .collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
-
- entityIndexBatch.index( scope, entity );
- } ).doOnNext( entityIndexBatch -> {
- entityIndexBatch.execute();
- } ).toBlocking().last();
- }
- } )
+ .map( integer -> {
+ final Entity entity = new Entity( indexScope.getName() );
+
+ entity.setField( new IntegerField( FIELD_WORKER_INDEX, workerIndex ) );
+ entity.setField( new IntegerField( FIELD_ORDINAL, integer ) );
+ entity.setField( new StringField( FIELD_UNIQUE_IDENTIFIER, uniqueIdentifier ) );
+
+
+ EntityUtils.setVersion( entity, UUIDGenerator.newTimeUUID() );
+
+ //add some fields for indexing
+
+ entity.setField( new StringField( "emtpyField", "" ) );
+ entity.setField( new StringField( "singleCharField1", "L" ) );
+ entity.setField( new StringField( "longStringField", "000000000000001051" ) );
+ entity.setField( new StringField( "singleCharField2", "0" ) );
+ entity.setField( new StringField( "singleCharField3", "0" ) );
+ entity.setField( new StringField( "singleCharField4", "0" ) );
+ entity.setField( new StringField( "dept", "VALUE" ) );
+ entity.setField( new StringField( "description", "I'm a longer description" ) );
+
+ ArrayField<Long> array = new ArrayField<>("longs");
+
+ array.add( 9315321008910l );
+ array.add( 9315321009016l );
+ array.add( 9315321009115l );
+ array.add( 9315321009313l );
+ array.add( 9315321009320l );
+ array.add( 9315321984955l );
+
+ entity.setField( array );
+
+ entity.setField( new StringField( "singleCharField5", "N" ) );
+ entity.setField( new BooleanField( "booleanField1", true ) );
+ entity.setField( new BooleanField( "booleanField2", false ) );
+ entity.setField( new StringField( "singleCharField5", "N" ) );
+ entity.setField( new StringField( "singleCharField6", "N" ) );
+ entity.setField( new StringField( "stringField", "ALL CAPS)); I MEAN IT" ) );
+ entity.setField( new DoubleField( "doubleField1", 750.0 ) );
+ entity.setField( new StringField( "charField", "AB" ) );
+ entity.setField( new StringField( "name", "000000000000001051-1004" ) );
+
+ return entity;
+ } )
+ //buffer up a batch size
+ .buffer( indexTestFig.getBufferSize() ).doOnNext( entities -> {
+
+ //take our entities and roll them into a batch
+ Observable.from( entities ).collect( () -> entityIndex.createBatch(), ( entityIndexBatch, entity ) -> {
+ entityIndexBatch.index( indexScope, entity );
+ } ).doOnNext( entityIndexBatch -> {
+ log.info( "Indexing next {} in batch", entityIndexBatch.size() );
+ //gather the metrics
+ final Timer.Context time = batchWriteTimer.time();
+ batchWriteTPS.mark();
+
+
+ //execute
+ entityIndexBatch.execute();
+ //stop
+ time.close();
+ } ).toBlocking().last();
+ } )
//translate back into a stream of entities for the caller to use
- .flatMap(entities -> Observable.from( entities ) );
+ .flatMap( entities -> Observable.from( entities ) );
+ }
+
+
+ /**
+ * Class for entity search results
+ */
+ private static class EntitySearchResult {
+
+ public final Entity searched;
+ public final boolean found;
+
+
+ private EntitySearchResult( final Entity searched, final boolean found ) {
+ this.searched = searched;
+ this.found = found;
+ }
+ }
+
+
+ /**
+ * Class for collecting results
+ */
+ private static final class DataLoadResult {
+ private final AtomicLong successCount = new AtomicLong( 0 );
+ private final AtomicLong failCount = new AtomicLong( 0 );
+
+
+ public void success() {
+ successCount.addAndGet( 1 );
+ }
+
+
+ public long getSuccessCount() {
+ return successCount.get();
+ }
+
+
+ public void failed() {
+ failCount.addAndGet( 1 );
+ }
+
+
+ public long getFailCount() {
+ return failCount.get();
+ }
}
}
[2/4] incubator-usergrid git commit: Updated surefire to be a single
fork to avoid ClassNotFoundException
Posted by to...@apache.org.
Updated surefire to be a single fork to avoid ClassNotFoundException
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ceaa796e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ceaa796e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ceaa796e
Branch: refs/heads/USERGRID-486
Commit: ceaa796e0e8fb1293b6d8e099ed664cf6b862493
Parents: dcf4693
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 20 11:31:06 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 20 11:31:06 2015 -0600
----------------------------------------------------------------------
stack/core/pom.xml | 13 +++++++------
stack/pom.xml | 2 +-
2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ceaa796e/stack/core/pom.xml
----------------------------------------------------------------------
diff --git a/stack/core/pom.xml b/stack/core/pom.xml
index ba73150..6f3c381 100644
--- a/stack/core/pom.xml
+++ b/stack/core/pom.xml
@@ -90,7 +90,8 @@
<reuseForks>true</reuseForks>
<argLine>-Dtest.barrier.timestamp=${maven.build.timestamp} -Dtest.clean.storage=true -Xmx${ug.heapmax} -Xms${ug.heapmin} -Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8 -javaagent:${settings.localRepository}/com/github/stephenc/jamm/0.2.5/jamm-0.2.5.jar ${ug.argline} </argLine>
<!-- see this page for documentation on classloading issues http://maven.apache.org/surefire/maven-surefire-plugin/examples/class-loading.html -->
- <useSystemClassLoader>false</useSystemClassLoader>
+ <!--<useSystemClassLoader>false</useSystemClassLoader>-->
+ <!--<useManifestOnlyJar>false</useManifestOnlyJar>-->
<includes>
<include>**/*IT.java</include>
<include>**/*Test.java</include>
@@ -103,11 +104,11 @@
<artifactId>${surefire.plugin.artifactName}</artifactId>
<version>${surefire.plugin.version}</version>
</dependency>
- <dependency>
- <groupId>org.codehaus.plexus</groupId>
- <artifactId>plexus-utils</artifactId>
- <version>3.0.21</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.codehaus.plexus</groupId>-->
+ <!--<artifactId>plexus-utils</artifactId>-->
+ <!--<version>3.0.21</version>-->
+ <!--</dependency>-->
</dependencies>
</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ceaa796e/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index a107114..19573d3 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -121,7 +121,7 @@
<!-- only use half the cores on the machine for testing -->
<usergrid.it.parallel>methods</usergrid.it.parallel>
<usergrid.it.reuseForks>true</usergrid.it.reuseForks>
- <usergrid.it.forkCount>4</usergrid.it.forkCount>
+ <usergrid.it.forkCount>1</usergrid.it.forkCount>
<usergrid.it.threads>8</usergrid.it.threads>
<metrics.version>3.0.0</metrics.version>